Многопроцессорность в Python: есть ли способ использовать pool.imap без накопления памяти?

Я использую модуль multiprocessing в Python для параллельного обучения нейронных сетей с keras, используя объект Pool(processes = 4) с imap. Это неуклонно использует все больше и больше памяти после каждого «цикла», то есть каждые 4 процесса, пока, наконец, не произойдет сбой.

Я использовал модуль memory_profiler для отслеживания использования памяти с течением времени, обучив 12 сетей. Здесь используется ванильный imap: vanilla

Если я поставлю maxtasksperchild = 1 в Pool: 1taskperchild

Если я использую imap(chunksize = 3): chunks

В последнем случае, когда все работает нормально, я отправляю только один пакет каждому процессу в пуле, поэтому кажется, что проблема в том, что процессы несут информацию о предыдущих пакетах. Если да, могу ли я заставить пул не делать этого?

Несмотря на то, что решение с кусками работает, я бы предпочел не использовать его, потому что

  • Я хотел бы отслеживать прогресс с помощью модуля tqdm, и в случае блоков он будет обновляться только после каждого блока, что фактически означает, что он вообще ничего не будет отслеживать, так как все блоки заканчиваются одновременно (в этом пример)
  • В настоящее время для обучения всех сетей требуется одинаковое время, но я хотел бы включить возможность их отдельного времени обучения, когда решение для фрагментов потенциально может привести к тому, что один процесс получит все длительное время обучения.

Вот фрагмент кода в ванильном случае. В двух других случаях я просто изменил параметр maxtasksperchild в Pool и параметр chunksize в imap:

def train_network(network):
    (...)
    return score

pool = Pool(processes = 4)
scores = pool.imap(train_network, networks)
scores = tqdm(scores, total = networks.size)

for (network, score) in zip(networks, scores):
    network.score = score

pool.close()
pool.join()

person Dan Saattrup Nielsen    schedule 02.09.2019    source источник


Ответы (2)


К сожалению, multiprocessing модуль в python требует больших затрат. данные в основном не распределяются между процессами и должны быть реплицированы. Это изменится, начиная с python 3.8.

https://docs.python.org/3.8/library/multiprocessing.shared_memory.html

Хотя официальный выпуск python 3.8 состоится 21 октября 2019 года, вы уже можете скачать его на github

person g.lahlou    schedule 02.09.2019
comment
Я не уверен, что понимаю, почему здесь проблема с общей памятью. Я все еще новичок в этом многопроцессорном материале, но кажется, что у меня есть четыре процесса (с четырьмя копиями моих данных, что хорошо в моем случае), и я хочу, чтобы каждый из этих четырех процессов обучал свою сеть, выводил оценку а затем забудьте обо всем, прежде чем начать тренировать следующую сеть. Кажется, что процессы каким-то образом сохраняют информацию из предыдущих чанков. - person Dan Saattrup Nielsen; 02.09.2019
comment
При этом, если общая память будет совместима с keras, это значительно улучшит использование памяти программой в целом, что было бы потрясающе! Спасибо за ссылку! - person Dan Saattrup Nielsen; 02.09.2019

Я придумал решение, которое, кажется, работает. Я отказался от пула и сделал свою собственную простую систему очередей. Помимо того, что он не увеличивается (хотя он немного увеличивается, но я думаю, что это я храню некоторые словари в виде журнала), он даже потребляет меньше памяти, чем решение с фрагментами выше:

imapqueue

Я понятия не имею, почему это так. Возможно, объекты Pool просто занимают много памяти? Во всяком случае, вот мой код:

def train_network(network):
    (...)
    return score

# Define queues to organise the parallelising
todo = mp.Queue(size = networks.size + 4)
done = mp.Queue(size = networks.size)

# Populate the todo queue
for idx in range(networks.size):
    todo.put(idx)

# Add -1's which will be an effective way of checking
# if all todo's are finished
for _ in range(4):
    todo.put(-1)

def worker(todo, done):
    ''' Network scoring worker. '''
    from queue import Empty
    while True:
        try:
            # Fetch the next todo
            idx = todo.get(timeout = 1)
        except Empty:
            # The queue is never empty, so the silly worker has to go
            # back and try again
            continue

        # If we have reached a -1 then stop
        if idx == -1:
            break
        else:
            # Score the network and store it in the done queue
            score = train_network(networks[idx])
            done.put((idx, score))

# Construct our four processes
processes = [mp.Process(target = worker,
    args = (todo, done)) for _ in range(4)]

# Daemonise the processes, which closes them when
# they finish, and start them
for p in processes:
    p.daemon = True
    p.start()

# Set up the iterable with all the scores, and set
# up a progress bar
idx_scores = (done.get() for _ in networks)
pbar = tqdm(idx_scores, total = networks.size)

# Compute all the scores in parallel
for (idx, score) in pbar:
    networks[idx].score = score

# Join up the processes and close the progress bar
for p in processes:
    p.join()
pbar.close()
person Dan Saattrup Nielsen    schedule 02.09.2019