почему моя работа с потоковой передачей искры становится медленнее

У меня есть задание, которое получает данные от Kafka каждые 10 секунд, а затем я форматирую данные и вставляю их в cassandra, но это очень сбивает с толку, что моя работа становится все медленнее и медленнее.

Согласно моей статистике, каждые 10 секунд поступает менее 100 сообщений, и в первый раз обработка занимает не более 1 секунды, но через несколько дней обработка замедляется, и теперь для обработки 10-секундных данных требуется 14 секунд.

Я озадачен, если бы был какой-то фактор, который замедлил бы работу.

И я замечаю, что обработка python -m pyspark.daemon тоже стоит все больше и больше памяти, есть ли какие-то методы снижения стоимости памяти.

PID   USER      PR   NI VIRT    RES     SHR  S  %CPU %MEM   TIME+ COMMAND 

24527 yao.yu    20   0 10.334g 9.823g   3580 R  96.8 66.9   3424:56 python                                                                                                                                                     

код выглядит следующим образом:

if __name__ == "__main__":
    conf = SparkConf().setAppName("Kafka_To_Cassandra").set("spark.streaming.kafka.maxRatePerPartition", "1000")
    sc = SparkContext(conf = conf)
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:] 

    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "auto.offset.reset": "smallest"})
    lines = kvs.map(lambda x: x[1]) \
           .filter(lambda s: 'identifier' in s) \
           .filter(lambda s: 'app_name' in s) \
           .filter(lambda s: 'app_version' in s)
    map_lines = lines.map(mapper).filter(lambda s: 'JsonLoadException' not in s)
    #map_lines.pprint()
    map_lines.foreachRDD(lambda rdd: rdd.foreachPartition(save_to_cassandra))

    ssc.start()
    ssc.awaitTermination()

person Yao Yu    schedule 23.09.2015    source источник
comment
Трудно сказать, почему процесс так накопил память, не видя вашего кода.   -  person Klaus D.    schedule 23.09.2015
comment
Звучит как утечка памяти: если ваше или какое-либо другое приложение использует память и не освобождает ее, то через некоторое время эта утечка памяти станет значительно больше, чем ваша физическая оперативная память, что заставит машину использовать подкачку диска для дополнения оперативной памяти, которая очень негативно влияет на производительность. Перезагрузка или просто перезапуск процесса — это быстрое решение, но не решение.   -  person Paul Sasik    schedule 23.09.2015
comment
@Клаус Д. Привет, Клаус, код приведен выше, я думаю, причина может быть в следующем: kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "auto.offset.reset": "smallest"})   -  person Yao Yu    schedule 23.09.2015


Ответы (2)


эта конфигурация может помочь вам.

spark.cleaner.ttl

Продолжительность (в секундах) того, как долго Spark будет помнить любые метаданные (созданные этапы, сгенерированные задачи и т. д.). Периодическая очистка гарантирует, что метаданные старше этого срока будут забыты. Это полезно для запуска Spark в течение многих часов/дней (например, для работы 24/7 в случае приложений Spark Streaming). Обратите внимание, что любой RDD, сохраняющийся в памяти дольше указанного времени, также будет очищен.

person tao.meng    schedule 25.09.2015
comment
Пожалуйста, укажите ссылку, откуда скопирован этот текст. Спасибо. - person Pang; 25.09.2015
comment
@Pang приведенный выше текст связан с поведением выполнения - person Yao Yu; 25.09.2015
comment
@ tao.meng Мне не ясно, повлияют ли накопленные метаданные на производительность задания Spark Streaming, а также на стоимость памяти pyspark.daemon. Не могли бы вы дать мне более подробную информацию, большое спасибо! - person Yao Yu; 25.09.2015

Наконец, я снова использую Scala для написания своего кода, используя Spark-Cassandra-Connector.

Я уверен, что подключение Cassandra требует много памяти, поэтому официальный документ Spark Streaming Шаблоны проектирования для использования foreachRDD предлагает вам создать пул соединений, чтобы вам не приходилось создавать соединение каждый раз для каждого RDD или foreachPartition. Но у меня нет хорошей идеи, как заставить python-cassandra-driver поддерживать это.

Я реконструирую свой код с помощью Scala, Spark-Cassandra-Connector довольно хорошо поддерживает Spark Streaming. Тогда у меня работа стабильно работает несколько недель даже месяцев без утечек памяти.

person Yao Yu    schedule 15.12.2015