Кеш Spark 1.6 Dataframe работает некорректно

Я понимаю, что если у меня есть фрейм данных, если я кэширую его () и запускаю действие, подобное df.take(1) или df.count (), он должен вычислять фрейм данных и сохранять его в памяти, и всякий раз, когда этот кешированный фрейм данных вызывается в программе, которую он использует уже вычисленный фрейм данных из кеша.

но моя программа работает не так.

У меня есть фрейм данных, как показано ниже, я кэширую его, а затем сразу же запускаю действие df.count.

  1. val df = inputDataFrame.select().where().withColumn("newcol" , "").cache()

  2. df.count

Когда запускаю программу. В пользовательском интерфейсе Spark я вижу, что первая строка выполняется в течение 4 минут, а когда дело доходит до второй строки, она снова выполняется 4 минуты, в основном первая строка пересчитывается дважды?

Разве первая строка не должна вычисляться и кэшироваться при срабатывании второй строки?

как решить эту проблему. Я застрял, посоветуйте, пожалуйста.


person rubiks    schedule 05.09.2017    source источник


Ответы (2)


Я понимаю, что если у меня есть фрейм данных, если я кэширую его () и запускаю действие, подобное df.take (1) или df.count (), он должен вычислить фрейм данных и сохранить его в памяти,

Это не так. Простые cache и count (take также не будут работать с RDD) - допустимый метод для RDD, но это не относится к Datasets, которые используют гораздо более продвинутые оптимизации. С запросом:

df.select(...).where(...).withColumn("newcol" , "").count()

любой столбец, который не используется в предложении where, можно игнорировать.

Есть важный обсуждение в списке разработчиков и цитирование Шона Оуэна

Я думаю, что правильный ответ - «не делайте этого», но если бы вам действительно пришлось, вы могли бы запустить операцию набора данных, которая ничего не делает для каждого раздела. Я предполагаю, что это было бы более надежно, потому что весь раздел должен быть вычислен, чтобы сделать его доступным на практике. Или зайдите так далеко, что переберите каждый элемент.

Переведено в код:

df.foreach(_ => ())

Там есть

df.registerAsTempTable("df")
sqlContext.sql("CACHE TABLE df")

который очень хочет, но он больше не документирован (Spark 2 и более поздние версии), и его следует избегать.

person Alper t. Turker    schedule 05.09.2017
comment
Привет, Альпер, у меня есть список фреймов данных: df1, df2, df3. каждый является результатом объединения, группировки и преобразований. Я постепенно объединяю их в единый df. Когда я запускаю его, я вижу, что для вычисления df он всегда одновременно вычисляет df1, df2, df3, что вызывает использование перетасовки жесткого диска. Я попытался заставить Spark материализовать df1, df2, df3 перед созданием df с помощью save / load, cache.count, persist.count или df.rdd.count, но работает только первое решение. В первом решении мне нужно сохранить фреймы данных во временное место и загрузить их. У вас есть другие решения, которые не требуют сохранения / загрузки? - person lam; 25.03.2020

Нет, если вы вызываете cache в DataFrame, он не кэшируется в этот момент, он только «помечен» для потенциального будущего кэширования. Фактическое кэширование выполняется только тогда, когда действие выполняется позже. Вы также можете увидеть свой кэшированный DataFrame в пользовательском интерфейсе Spark в разделе «Хранилище».

Другая проблема в вашем коде заключается в том, что count в DataFrame не вычисляет весь DataFrame, потому что для этого не нужно вычислять все столбцы. Вы можете использовать df.rdd.count() для принудительного выполнения всей оценки (см. Как заставить оценку DataFrame в Spark).

Вопрос в том, почему ваша первая операция занимает так много времени, даже если действие не вызывается. Я думаю, это связано с логикой кеширования (например, оценки размера и т.д.), вычисляемой при вызове кеша (см., Например, Почему rdd.map (identity) .cache работает медленно, когда элементы rdd большие?)

person Raphael Roth    schedule 06.09.2017