У меня есть задание, которое получает данные от Kafka каждые 10 секунд, а затем я форматирую данные и вставляю их в cassandra, но это очень сбивает с толку, что моя работа становится все медленнее и медленнее.
Согласно моей статистике, каждые 10 секунд поступает менее 100 сообщений, и в первый раз обработка занимает не более 1 секунды, но через несколько дней обработка замедляется, и теперь для обработки 10-секундных данных требуется 14 секунд.
Я озадачен, если бы был какой-то фактор, который замедлил бы работу.
И я замечаю, что обработка python -m pyspark.daemon
тоже стоит все больше и больше памяти, есть ли какие-то методы снижения стоимости памяти.
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
24527 yao.yu 20 0 10.334g 9.823g 3580 R 96.8 66.9 3424:56 python
код выглядит следующим образом:
if __name__ == "__main__":
conf = SparkConf().setAppName("Kafka_To_Cassandra").set("spark.streaming.kafka.maxRatePerPartition", "1000")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "auto.offset.reset": "smallest"})
lines = kvs.map(lambda x: x[1]) \
.filter(lambda s: 'identifier' in s) \
.filter(lambda s: 'app_name' in s) \
.filter(lambda s: 'app_version' in s)
map_lines = lines.map(mapper).filter(lambda s: 'JsonLoadException' not in s)
#map_lines.pprint()
map_lines.foreachRDD(lambda rdd: rdd.foreachPartition(save_to_cassandra))
ssc.start()
ssc.awaitTermination()
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "auto.offset.reset": "smallest"})
- person Yao Yu   schedule 23.09.2015