Pyspark Group: слишком медленный счет

Я запускаю pyspark в кластере dataproc с 4 узлами, каждый из которых имеет 2 ядра и 8 ГБ ОЗУ. У меня есть фреймворк со столбцом, содержащим список слов. Я взорвал этот столбец и подсчитал количество повторов, используя -

df.groupBy("exploded_col").count()

До взрыва было ~ 78 млн рядов. Но выполнение приведенного выше кода занимает слишком много времени (более 4 часов). Почему искра длится необычно долго? Я все еще новичок в Spark, поэтому не знаю, какие настройки подходят для работы с огромными данными.

У меня есть следующие настройки для sparkContext

enter code here
SparkSession.builder \
    .appName("Spark NLP Licensed") \
    .master("yarn") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.5.1") 
spark.conf.set("spark.sql.shuffle.partitions",20)
spark.conf.set("spark.num.executors",100)
spark.conf.set("spark.executor.cores",1)
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")

Я даже установил для spark.sql.shuffle.partitions значение 2001, но это тоже не сработало. Пожалуйста помоги.


person Jor_El    schedule 16.07.2020    source источник
comment
Я думаю, проблема возникает из-за того, что данные не распределяются должным образом. Когда вы запускаете предложение group by, если данные присутствуют случайным образом на всех узлах, будет много перетасовки, что, как я считаю, является проблемой, с которой вы столкнулись. Вы видите это в пользовательском интерфейсе Spark? Попробуйте перейти к этапам и посмотрите, сколько перетасовок происходит на узлах.   -  person Oscar Lopez M.    schedule 16.07.2020


Ответы (1)


Основная причина низкой производительности заключается в том, что groupBy обычно вызывает перемешивание данных между исполнителями. Вы можете использовать встроенную функцию искры countDistinct следующим образом:

from spark.sql.functions import countDistinct
df.agg(countDistinct("exploded_col"))
person David Rabinowitz    schedule 16.07.2020
comment
Спасибо за ответ. Я пробовал countDistinct (), но это тоже занимает много времени. - person Jor_El; 16.07.2020
comment
Каков размер ваших данных? каков размер и конфигурация кластера? - person David Rabinowitz; 16.07.2020
comment
В пользовательском интерфейсе dataproc ожидаемая память Yarn увеличивается до 2,9 ТБ. В кластере по 4 узла типа n1-standard2. - person Jor_El; 16.07.2020
comment
Тогда это, вероятно, означает, что кластер слишком мал для обработки данных. Какой это формат? Если он поступает из BigQuery, Parquet или аналогичного, вы можете загрузить только соответствующий столбец, который должен уменьшить (в зависимости от схемы) размер данных. Если это не помогает, увеличьте размер кластера. - person David Rabinowitz; 16.07.2020
comment
Он загружается из формата .orc, я работаю всего с 3 столбцами. Кстати, я выполнил count (), и это дало быстрый результат. Я не могу увеличить размер кластера, поэтому есть ли какие-либо изменения в конфигурации Spark (которыми я поделился в вопросах), которые улучшили бы обработку? - person Jor_El; 16.07.2020
comment
Счетчик работает намного быстрее, так как обычно не загружает никаких данных, а просто сканирует их. вы пробовали df.select("exploded_col").distinct().count()? - person David Rabinowitz; 17.07.2020
comment
Я пробовал отличаться (). Count (), но это тоже занимает много времени. Есть ли какие-либо параметры конфигурации, которые нужно изменить? - person Jor_El; 17.07.2020