Задача состоит в том, чтобы взять записи из входного файла, обработать их и сохранить в базе данных SQLite. В файле могут быть миллионы записей, обработка одной записи происходит довольно быстро, но я надеюсь получить некоторый импульс от многопроцессорности. Я его реализовал и обнаружил, что где-то есть узкое место, так как буст не такой уж и большой.
Я не могу эффективно использовать все ядра. 3 процесса дают какой-то заметный эффект, больше процессов уже не эффективно.
Ниже я предоставил упрощенный пример кода, чтобы показать, как создаются и управляются процессы.
После некоторого расследования я подозреваю:
чтение данных с диска
сериализация/десериализация - наименее подозрительный
передача данных процессам
замки. У меня их два:
- to write data to db and
- для управления промежуточными данными в памяти, которые будут сохранены в БД после завершения всего процесса
Что не является узким местом:
- db writing
- I made it by chunks and
- замена на in-memory базу данных не дала прироста скорости
Я профилировал код в отдельных процессах (с cProfile
). Это не очень полезно. Больше всего времени уходит на расчетный этап.
Измерение времени выполнения на небольшом подмножестве данных дало:
# (laptop, 2 cores with hyper-threading, Python 3.5, Ubuntu 16.04, SSD)
serial (old implementation): 28s
parallel (workers = 1): 28s
parallel (workers = 2): 19s
parallel (workers = 3): 17s
parallel (workers = 4): 17s
# (virtual machine on a server, 30 cores, Python 3.4, Ubuntu 14.04, HDD)
parallel (workers = 1): 28s
parallel (workers = 2): 11s
parallel (workers = 3): 10s
parallel (workers = 4): 8s
parallel (workers = 5): 8s
parallel (workers = 6): 8s
В: Как определить, что является узким местом или хотя бы некоторыми из предполагаемых проблем? Можно ли получить выигрыш лучше, чем в 4 раза?
# indigo is an external module
def process(q, conn, cursor, d, lock_db, lock_dict):
data_collector = []
while True:
data = q.get()
if data is None:
break
mol_name = data[1]
mol = indigo.unserialize(data[0]) # <-- unserialization
lock_dict.acquire()
value = d.get(mol_name, None)
if value is None:
value = calc_value(mol)
d[name] = value
lock_dict.release()
# some calculations which return several variables A, B and C
data_collector.append([mol_name, A, B, C])
if len(data_collector) == 1000:
insert_data(conn, cursor, data_collector, lock_db)
data_collector = []
insert_data(conn, cursor, data_collector, lock_db)
with lite.connect(out_fname) as conn:
cur = conn.cursor()
create_tables(cur)
nprocess = max(min(ncpu, cpu_count()), 1)
manager = Manager()
lock_db = manager.Lock()
lock_dict = manager.Lock()
q = manager.Queue(2 * nprocess)
d = manager.dict()
pool = []
for i in range(nprocess):
p = Process(target=process, args=(q, conn, cur, d, lock_db, lock_dict))
p.start()
pool.append(p)
for i, mol in enumerate(indigo.iterateSDFile(file_name)):
q.put((mol.serialize(), mol.name()) # <-- serialization
for _ in range(nprocess):
q.put(None)
for p in pool:
p.join()
for k, v in d.items():
insert_db(cur, k, v)
conn.commit()