Прежде чем углубляться, рекомендуем пройти часть 1 серии Best Practices for Data Science с использованием Python Pandas.

Передовые практики в области обработки и анализа данных с помощью Snowflake и Azure — часть 1 Pandas

Подробности части 2 с использованием машинного обучения Azure со Snowflake



Обе платформы Snowflake и Databricks позиционируют себя как комплексные платформы AI/ML, но многие организации используют обе платформы, дополняя друг друга, чтобы получить лучшее из обоих миров вместе для создания интегрированной платформы обработки данных.

Как достичь варианта использования выше? «Коннектор Spark» переносит Snowflake в экосистему Apache Spark, позволяя Spark считывать данные и записывать данные в Snowflake. Коннектор предоставляет экосистеме Spark доступ к Snowflake как к полностью управляемому репозиторию для всех типов данных, включая JSON, Avro, CSV, XML, машинные данные.

Так зачем же кому-то нужен Spark Connector для Snowflake? (Попробуйте новую функцию Snowflake Snowpark в предварительной версии, которая удаляет использование коннектора Spark)

Сложный ETL: это вариант Databricks. Используя Spark, вы можете легко создавать сложные, функционально насыщенные и масштабируемые конвейеры приема данных для Snowflake, а данные, созданные этими сложными конвейерами ETL, можно легко хранить в Snowflake для широкий доступ к самообслуживанию по всей организации с использованием стандартных инструментов SQL. Spark-запросы извлекают выгоду из автоматической оптимизации запросов Snowflake, которая повышает производительность за счет передачи некоторых ресурсоемких операций в Snowflake вместо Spark. По умолчанию в модулях данных включено раскрытие запроса Snowflake.

Давайте попробуем понять основной поток запросов между Spark и Snowflake, который поможет нам глубже погрузиться в процесс pushdown.

  1. Драйвер Spark отправляет SQL-запрос в Snowflake, используя соединение Snowflake JDBC.
  2. Snowflake использует виртуальное хранилище для обработки запроса и копирует результат запроса в AWS S3/Azure Blob.
  3. Соединитель извлекает данные из S3/Blob и заполняет их в DataFrames в Spark.

Так почему же Pushdown?

И Spark, и Snowflake способны обрабатывать весь или часть определенного потока задач данных. Таким образом, для достижения наилучшей производительности в сценарии с большими данными мы обычно хотим избегать чтения большого количества данных или передачи больших промежуточных результатов между взаимосвязанными системами. Таким образом, большая часть обработки должна происходить рядом с местом хранения данных, чтобы использовать любые возможности участвующих хранилищ для динамического удаления ненужных данных. Spark поддерживает хороший набор функций для обработки реляционных данных и подключения к различным источникам данных. Snowflake может значительно повысить производительность запросов за счет эффективного сокращения данных благодаря отслеживанию метаданных микроразделов и оптимизации кластеризации. Эти метаданные позволяют Snowflake более эффективно сканировать данные, когда заданные предикаты запроса используют совокупную информацию о микроразделах, такую ​​как минимальные и максимальные значения, а данные, которые определены как не содержащие релевантных значений, могут быть полностью пропущены. Метаданные, такие как кардинальность значений столбцов, позволяют Snowflake лучше оптимизировать такие операции, как упорядочение соединений. Такие операции, как фильтрация, проецирование, объединение и агрегирование данных, потенциально могут значительно сократить набор результатов заданного запроса. Можно использовать сокращение данных, используемое Snowflake. Преимущество этого заключается в сокращении объема данных, которые необходимо передать в Spark через промежуточную среду Snowflake и сеть, что, в свою очередь, сокращает время отклика.

Вишенкой на торте является то, что коннектор Snowflake глубоко интегрируется с процессом создания плана запроса в Spark, обеспечивая больше возможностей pushdown. например, в приведенном ниже запросе Создание плана запроса в Spark

dfZipCodes.createOrReplaceTempView("temp_zip_codes")
val dfSQLCities = spark.sql("SELECT city from temp_zip_codes WHERE zip_code < 98000")

DataFrames выполняются лениво. Это означает, что Spark может оценивать и оптимизировать реляционные операторы, применяемые к DataFrame, и выполнять DataFrame только при вызове действия. Spark откладывает планирование и выполнение кода до тех пор, пока не произойдет действие, такое как collect(), show() или count().

Когда требуется действие, оптимизатор Spark, Catalyst, сначала создает оптимизированный логический план. Затем процесс переходит к этапу физического планирования. Именно здесь Spark определяет, следует ли отправить запрос в Snowflake, как показано на следующей диаграмме:

Как мы видим на приведенной выше диаграмме, Spark Catalyst вставляет план Snowflake в качестве одного из возможных физических планов, которые Spark может выбрать в зависимости от стоимости.

В приведенной выше операции рабочие узлы Snowflake выполняют всю тяжелую обработку исходящих данных, а подчиненные узлы в кластере Spark выполняют вход данных. Это позволяет вам масштабировать виртуальное хранилище Snowflake и кластеры Spark, чтобы сбалансировать вычислительную мощность и пропускную способность ввода-вывода, что дает вам практически неограниченные возможности для передачи данных туда и обратно между Spark и Snowflake.

Давайте попробуем понять pushdown в новой и старой версии Spark Connector для приведенного ниже примера запроса.

val dfZips = spark.read.format("net.snowflake.spark.snowflake").option("dbtable","zip_codes").load()
val dfMayors = spark.read.format("net.snowflake.spark.snowflake").option("dbtable","city_mayors").load()
val dfResult = dfZips.filter("zip_code > 98000").join(dfMayors.select($"first",$"last",$"city",$"city_id"), dfZip("city_id") === dfMayors("city_id"), "inner")

Каждый DataFrame представлен в виде дерева логического плана с узлами, представляющими источники данных и операторы, а приведенный выше результирующий DataFrame представлен в древовидной структуре ниже с помощью Spark.

Подход к старой версии Spark Connector. Простые операции проецирования и фильтрации (например, Select(.) и .Filter(.) в Scala) будут переведены и отправлены в Snowflake вместо обработки в Spark, однако другие операции, такие как соединения, агрегации и даже скалярные функции SQL могли выполняться только в Spark. Таким образом, в приведенном выше примере запроса Spark выполняет объединение нескольких таблиц, формируя окончательный кадр данных.

Подход к более новой версии Spark Connector. Используя Snowflake в качестве источника данных для Spark, версия 2.1 соединителя может передавать большие и сложные логические планы Spark (полностью или частично) для обработки в Snowflake, что позволяет Snowflake выполнять больше работы. используя его эффективность. Этот подход сочетает в себе мощную обработку запросов Snowflake с вычислительными возможностями Apache Spark и его экосистемы. Таким образом, в приведенном выше примере запроса коннектор Spark пытается проверить, что несколько таблиц, формирующих окончательный кадр данных, являются присоединяемыми отношениями в Snowflake, чтобы иметь возможность распознать, что объединение может быть полностью выполнено в Snowflake. Тот же процесс, описанный выше, также можно применить к операциям SORT, GROUP BY и LIMIT в более новой версии.

Отказоустойчивость

При сбое pushdown из-за того, что иногда однозначное преобразование операторов Spark SQL в выражения Snowflake невозможно, соединитель возвращается к менее оптимизированному плану выполнения, и вместо этого эти неподдерживаемые операции выполняются в Spark.

Машинное обучение.Spark предоставляет богатую экосистему для машинного обучения и функций прогнозной аналитики через MLlib. Благодаря интеграции Snowflake предоставляет вам гибкий, масштабируемый репозиторий для всех данных, лежащих в основе обучения и тестирования вашего алгоритма.

Обычный процесс машинного обучения заключается в использовании Snowflake для некоторых основных операций с данными, обучении модели машинного обучения в Databricks и записи результатов обратно в Snowflake.

Пример кода для подключения к Snowflake из Databricks

# Use secrets DBUtil to get Snowflake credentials.
user = dbutils.secrets.get("data-warehouse", "<snowflake-user>")
password = dbutils.secrets.get("data-warehouse", "<snowflake-password>")

# snowflake connection options
options = {
  "sfUrl": "<snowflake-url>",
  "sfUser": user,
  "sfPassword": password,
  "sfDatabase": "<snowflake-database>",
  "sfSchema": "<snowflake-schema>",
  "sfWarehouse": "<snowflake-cluster>"
}

Пример кода для записи данных в Snowflake из Databricks

# Generate a simple dataset containing five values and write the dataset to Snowflake.
spark.range(5).write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "<snowflake-database>") \
  .save()

Пример кода для чтения данных из Snowflake в Databricks

# Read the data written by the previous cell back.
df = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "<snowflake-database>") \
  .load()

display(df)
df = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

Прежде чем полностью остановиться на теме, один важный аспект — предварительные условия.

  1. Учетная запись Databricks с версией среды выполнения 4.2 и выше.
  2. Учетная запись Snowflake с подробностями (URL-адрес, имя пользователя и пароль, база данных и схема по умолчанию, виртуальный склад по умолчанию).
  3. Роль, используемая в подключении, должна иметь привилегии Usage и Create Stage в схеме, которая содержит таблицу для чтения или записи с помощью Databricks.

Некоторые очень важные указатели, которые следует учитывать при совместном использовании Databricks и Snowflake.

  1. Мы должны явно указать сопоставление между столбцами DataFrame и Snowflake, используя параметр карты столбцов.
  2. Integer и Decimal семантически эквивалентны в Snowflake, поэтому при записи данных и чтении данных из Snowflake целые данные могут быть преобразованы в десятичные числа при записи в Snowflake.
  3. Snowflake по умолчанию использует поля в верхнем регистре, поэтому схема таблицы преобразуется в верхний регистр.

Вывод. Полная передача обработки запросов в Snowflake обеспечивает наиболее стабильную и в целом лучшую производительность, при этом Snowflake в среднем работает лучше, чем даже собственный Spark-with-Parquet.