Сохранение вывода заданий Spark ETL | Навстречу AI

Как следует сохранять выходные данные заданий Spark ETL (если вы не выполняете запись в базе данных)

Вступление

В этой статье я поделюсь своими мыслями о том, как лучше всего сохранить вывод заданий Spark ETL, чтобы впоследствии было легче выполнять аналитическую работу. Код для воспроизведения примеров можно найти здесь .

Настройка кластера

Кластер, который я использовал для запуска кода в этой статье, размещен на Databricks со следующей конфигурацией:

  • Кластерный режим: стандартный
  • Версия среды выполнения Databricks: 5.5 LTS ML (включает Apache Spark 2.4.3 Scala 2.11)

Есть 8 рабочих процессов, и рабочие и драйвер являются экземплярами m4.xlarge (16,0 ГБ, 4 ядра).

Пример использования

Представьте, что вы попали в следующий сценарий:

Вы только что присоединились к чрезвычайно популярному онлайн-магазину с покупателями по всему миру. Эта компания ежегодно зарабатывает миллиарды долларов дохода. Итак, вы знаете, что он ежедневно генерирует столько транзакционных данных, что их можно эффективно обработать только с помощью распределенной вычислительной инфраструктуры, такой как Apache Spark. Вам было поручено построить модель для прогнозирования ежедневного оборота, и вы решили начать с выполнения некоторого исследовательского анализа данных по прошлым транзакциям.

Команда инженеров данных любезно согласилась предоставить вам доступ к этим данным в виде плоских файлов, сброшенных в какую-то папку, расположенную в облаке. Эта команда очень сосредоточена на удовлетворении потребностей клиентов, поэтому они пошли еще дальше и спросили вас, как вы хотите, чтобы файлы в этой папке были организованы.

Как вы ответите?

Обычный макет - это организация файлов в папки с трехуровневой иерархией, а именно «год», «месяц» и «день». Папка «день» будет содержать файлы, относящиеся к транзакции этого дня.

В следующем разделе будет показано, почему этот макет не оптимален для анализа данных.

Экономия по годам - ​​›Месяц -› День

Чтобы сделать вещи более конкретными, мы будем использовать набор данных asa/airlines в качестве рабочего примера до конца этой статьи. Этот набор данных является частью общедоступных наборов данных Databricks.

Вот как этот набор данных будет выглядеть при этой схеме размещения:

На рисунке 1 показана структура каталогов для папки с именем «2006», которая представляет все транзакции, относящиеся к 2006 году. Внутри этой папки находится еще один набор папок с именами 1, 2,…, 12, представляющий январь, февраль,…, Декабрь. Эти папки, в свою очередь, содержат папки, обозначающие день конкретного месяца. Фактические данные хранятся внутри папки дня месяца в виде другой папки, содержащей несколько относительно небольших файлов csv, поскольку ежедневные данные действительно огромны.

Теперь предположим, что вы хотите узнать количество транзакций в январе 2008 года. Это легко сделать с помощью следующего фрагмента кода:

Обратите внимание, что использование подстановочного знака, то есть символа «*», упрощает получение всех файлов внутри папки.

Что, если вы хотите узнать количество транзакций за первые 6 месяцев января 2008 года? Использование /tmp/asa/folders/2008/*/*/* в качестве аргумента для вызова csv не сработает, потому что в конечном итоге будут прочитаны все 12 месяцев 2008 года. Вы можете прочитать все эти файлы, а затем отфильтровать их, чтобы отобразить только соответствующие месяцы, но:

На рисунке 3 показано, что чтение данных за все 12 месяцев и последующая фильтрация для возврата только первых 6 месяцев приводит к чтению данных 670 МБ вместо 343,2 МБ, если бы вы только что прочитали только первые 6 месяцев. Следовательно, первый метод неэффективен и не масштабируется, особенно для больших наборов данных. Учитывая расположение папок, нам нужно будет написать код для программного построения пути к соответствующим файлам. Вот один из способов сделать это:

На рисунке 4 показано, что нам нужно было написать только одну дополнительную строку кода, прежде чем мы сможем продолжить наш анализ.

Давайте рассмотрим ситуацию, когда мы заинтересованы в подсчете транзакций за последние 7 дней в году за последние 2 года, т.е. получаем количество транзакций с 25 декабря по 31 декабря за 2007 и 2008 годы. сложно, потому что теперь у вас есть две части пути, которые меняются: год и месяц. Вы можете создать список лет и дней, а затем создать декартово произведение, чтобы получить кортеж (год, день), который затем можно использовать для создания путей к файлам, например:

Другое решение - создать список дат последовательно с начала 2007 года до конца 2008 года, а затем применить соответствующие фильтры. Это более сложное решение, чем предыдущее, поэтому для краткости я не буду пытаться его использовать.

В любом случае я не являюсь поклонником этого метода чтения файлов по следующим причинам:

  • Подробность: вам нужно написать дополнительный код, прежде чем вы сможете продолжить анализ. В зависимости от вашего стиля кодирования и привычек код для генерации путей к файлам может быть загадочным, что затрудняет отслеживание другими людьми вашего анализа.
  • Хрупкость: этот метод неявно предполагает, что для каждого дня года существует файл. Это может быть не так по многим причинам, например: обслуживание системы, сбой системы, праздничные дни и т. д. Когда это произойдет, вы получите Path does not exist исключение.

Можем ли мы сделать лучше, чем этот подход?

Сохранение по разделам

На рисунке 6 показан идеальный способ организации файлов для вашего конкретного случая использования. Используя этот макет, вы можете положиться на функцию Spark filter() для получения только тех данных, которые вам интересны. Например, вот как вы бы подсчитали все транзакции в январе 2008 года:

Обратите внимание, что вам нужно только определить предикат, выражающий условие (я), которое вы хотите выполнить (Jan2008), а затем передать его функции filter.

На следующих двух рисунках показано, как можно подсчитать количество транзакций за первые 6 месяцев 2008 г. и последние 7 дней 2007 и 2008 гг. Соответственно в этом макете каталога:

Понятно, что этот метод дает более читаемый код. Вам также не нужно беспокоиться о Path does not exist исключении.

Как мы добились такого расположения каталогов? Взгляните на следующий рисунок:

Представьте, что ваша группа инженеров данных должна была выполнить сложную предварительную обработку, прежде чем они смогли получить данные транзакций, например. объединить несколько таблиц вместе, добавить некоторые производные функции и т. д. df представляет данные, которые, наконец, готовы для вашего использования. Все, что им нужно сделать, это сохранить его в указанной папке.

Ключевым шагом к достижению макета, изображенного на рисунке 6, является вызов partitionBy в строке 5 рисунка 10. Результатом рисунка 10 является создание папки с именем data.csv (имя файла, которое мы указали при сохранении как csv). Вы можете думать о data.csv как об одном большом файле, который внутренне разбит на значения в столбцах Year, Month и DayOfMonth, то есть столбцах, которые вы указали при вызове partitionBy.

Заключительные мысли

В этой статье читателю была представлена ​​функция partitionBy, которая является методом класса DataFrameWriter. Ключевая идея здесь заключается в том, что, когда вы пишете выходные данные своих заданий ETL, вы должны разделить его по столбцам, которые ваши пользователи будут часто фильтровать, прежде чем выполнять свой анализ. Это не только обеспечивает значительное ускорение времени чтения, но, как показано в этой статье, пользователям не нужно прибегать к написанию запутанного кода, чтобы получить нужные данные.

Дайте мне знать в комментариях, что вы думаете об этом подходе.