многопроцессорность с большим массивом numpy в качестве аргумента

Я хочу использовать многопроцессорность, где одним из аргументов является очень большой массив numpy. Я исследовал некоторые другие сообщения, которые, по-видимому, имеют аналогичные проблемы.

Большие массивы numpy в общей памяти для многопроцессорной обработки: что-то не так с этим подходом?

Общий доступ к большому массиву Numpy только для чтения между многопроцессорными процессами

но, поскольку я новичок в python, у меня возникли проблемы с адаптацией решений к этому шаблону и в этой форме. Интересно, могу ли я попросить вашей помощи, чтобы понять, какие у меня есть варианты, чтобы передать X функциям только для чтения. Мой упрощенный фрагмент кода находится здесь:

import multiprocessing as mp
import numpy as np

def funcA(X):
    # do something with X
    print 'funcA OK'

def funcB(X):
    # do something else with X
    print 'funcB OK'

if __name__=='__main__':
    X=np.random.rand(int(5.00e8),)
    funcA(X) # OK
    funcB(X) # OK
    X=np.random.rand(int(2.65e8),)
    P=[]
    P.append(mp.Process(target=funcA,args=(X,))) # OK
    P.append(mp.Process(target=funcB,args=(X,))) # OK
    for p in P:
        p.start()

    for p in P:
        p.join()

    X=np.random.rand(int(2.70e8),)
    P=[]
    P.append(mp.Process(target=funcA,args=(X,))) # FAIL
    P.append(mp.Process(target=funcB,args=(X,))) # FAIL
    for p in P:
        p.start()

    for p in P:
        p.join()

funcA и funcB, по-видимому, принимают очень большие массивы numpy при последовательном вызове. Однако, если они вызываются как мультипроцессы, то, по-видимому, существует верхний предел размера массива numpy, который может быть передан функции. Как мне лучше обойти это?

Примечание.

0) я не хочу модифицировать X; только читать из него;

1) У меня 64-битная Windows 7 Professional.


person cpicke1    schedule 29.01.2018    source источник
comment
X уникален в вашем коде? Или вам нужно вызывать два метода для других переменных?   -  person Roberto Trani    schedule 30.01.2018
comment
Это происходит в Python 2, но не в Python 3 на моей машине.   -  person Thomas Weller    schedule 30.01.2018
comment
@RobertoTrani Я планирую вызывать методы только для X в сочетании с некоторыми другими, гораздо меньшими переменными. Моя мотивация здесь заключается в том, что методы (которые полностью независимы друг от друга) занимают много времени, и я пытаюсь сэкономить время, разделяя рабочую нагрузку на несколько процессов. Я предположил, что, поскольку X доступен только для чтения, его настройка не будет слишком сложной, но я обнаружил обратное. Кроме того, мои извинения за неправильное написание вашего имени ниже.   -  person cpicke1    schedule 30.01.2018
comment
@cpicke1 Можете ли вы проверить, используете ли вы 32-битную версию Python? Вероятно, вы можете проверить это из диспетчера задач. У меня нет ваших проблем на моей Linux-машине, более того, 2.7e8 с плавающей запятой составляет около 2 ГБ, и эта проблема, похоже, связана с этими: ссылка1 и ссылка2. Таким образом, можем ли мы рассматривать проблему, связанную с используемой вами версией Python? Если это так, мы можем улучшить решение на основе mmap.   -  person Roberto Trani    schedule 31.01.2018
comment
@RobertoTrani В моей консоли Python я применил метод нашел здесь, и результат был 64. Кроме того, я сохранил исполняемый файл установщика, который я взял с веб-сайта Anaconda; это Anaconda2-4.1.0-Windows-x86_64.exe. Я думаю, что у меня нет 32-битного Python. Я также хотел бы уточнить, что у меня нет проблем с объявлением или обработкой чисел с плавающей запятой 5e8 или больше; проблемы возникают только тогда, когда я пытаюсь сделать что-то параллельно с ними!   -  person cpicke1    schedule 31.01.2018
comment
@cpicke1, я обновил ответ и код, можешь попробовать еще раз, пожалуйста? Извините, но у меня нет возможности проверить это на машине с Windows :(   -  person Roberto Trani    schedule 31.01.2018
comment
@RobertoTrani RE: ОБНОВЛЕНИЕ2. Код в вашем обновлении работает успешно. Однако новый предел dim находится где-то между 5,368e8 и 5,369e8, и это вдвое превышает «старый» предел dim, который был где-то между 2,684e8 и 2,685e8. Удвоение возможно из-за dtype=np.float32 вместо dtype=np.float. Я также немного поигрался с max_chunk_size, но это не помогло обойти проблему.   -  person cpicke1    schedule 31.01.2018


Ответы (1)


Проблема может быть в передаче данных дочерним процессам. Когда необходимо использовать объекты только для чтения, я предпочитаю использовать copy-on-write механизм, используемый базовой ОС для управления памятью дочерних процессов. Однако я не знаю, использует ли Windows 7 этот механизм. Когда доступно копирование при записи, вы можете получить доступ к области родительского процесса, не копируя их внутри дочернего процесса. Этот трюк работает, только если вы обращаетесь к ним только для чтения и если объект создается до создания процессов.

Подводя итог, возможное решение (по крайней мере, для Linux-машин) таково:

import multiprocessing as mp
import numpy as np

def funcA():
    print "A {}".format(X.shape)
    # do something with the global variable X
    print 'funcA OK'

def funcB():
    print "B {}".format(X.shape)
    # do something else with the global variable X
    print 'funcB OK'

if __name__=='__main__':
    X=np.random.rand(int(5.00e8),)
    funcA() # OK
    funcB() # OK

    X=np.random.rand(int(2.65e8),)
    P=[mp.Process(target=funcA), mp.Process(target=funcB)]
    for p in P:
        p.start()

    for p in P:
        p.join()

    X=np.random.rand(int(2.70e8),)
    P=[mp.Process(target=funcA), mp.Process(target=funcB)]
    for p in P:
        p.start()

    for p in P:
        p.join()

ОБНОВЛЕНИЕ: после различных комментариев о проблемах совместимости с Windows я набросал новое решение однозначно на основе родных карт памяти. В этом решении я создаю карту памяти numpy в файле, которая используется совместно через файловый дескриптор, поэтому не требуется копировать весь массив внутри дочерних элементов. Я нашел это решение намного быстрее, чем использование multiprocessing.Array!

ОБНОВЛЕНИЕ 2: приведенный ниже код был обновлен, чтобы избежать проблем с памятью во время рандомизации карты памяти.

import multiprocessing as mp
import numpy as np
import tempfile

def funcA(X):
    print "A {}".format(X.shape)
    # do something with X
    print 'funcA OK'

def funcB(X):
    print "B {}".format(X.shape)
    # do something else with X
    print 'funcB OK'

if __name__=='__main__':
    dim = int(2.75e8)
    with tempfile.NamedTemporaryFile(dir='/tmp', delete=False) as tmpfile:
        X = np.memmap(tmpfile, shape=dim, dtype=np.float32)  # create the memory map
        # init the map by chunks of size 1e8
        max_chunk_size = int(1e8)
        for start_pos in range(0, dim, max_chunk_size):
            chunk_size = min(dim-start_pos, max_chunk_size)
            X[start_pos:start_pos+chunk_size] = np.random.rand(chunk_size,)
        P=[mp.Process(target=funcA, args=(X,)), mp.Process(target=funcB, args=(X,))]
        for p in P:
            p.start()

        for p in P:
            p.join()
person Roberto Trani    schedule 29.01.2018
comment
На моем компьютере с Windows это не удается в операторе отладки print "A {}".format(X.shape) - person Thomas Weller; 30.01.2018
comment
Я предполагаю, что это происходит потому, что вы используете python 3, а это код python 2. Я протестировал решение на своей машине. Более того, как я сказал в ответе, этот код работает только в том случае, если X создается до создания дочерних процессов (и до вызова двух функций) - person Roberto Trani; 30.01.2018
comment
Я переключился на интерпретатор Python 2. Ошибка NameError: global name 'X' is not defined, даже если я добавлю оператор global X. (Питон 2.7.11) - person Thomas Weller; 30.01.2018
comment
Вы определяете X перед вызовом funcA()? Например, как я сделал в main. - person Roberto Trani; 30.01.2018
comment
Я скопировал весь код, который вы дали в ответе - person Thomas Weller; 30.01.2018
comment
Неважно, мне не нужно это запускать. Мне просто было интересно. Я пойду спать сейчас. - person Thomas Weller; 30.01.2018
comment
Просто для отладки: пожалуйста, не могли бы вы сказать нам, происходит ли это в параллельном коде или в последовательном коде? Очень странно, ведь на линуксе с питоном 2 все работает нормально :С - person Roberto Trani; 30.01.2018
comment
@ roberto-traini Я получаю точно такое же сообщение об ошибке, что и мистер Веллер, и я тоже скопировал и вставил весь код. Эта проблема возникает при параллельном коде. Я использую Python 2.7, если это актуально. Я также пытался добавить глобальный X; X=np.empty(1) прямо под операторами импорта, но параллельный код распознает только инициализированную структуру X и не распознает мой большой случайный X. Кажется, что с использованием глобальных переменных я все еще не смог получить вокруг проблемы. - person cpicke1; 30.01.2018
comment
Windows Python не использует fork и, следовательно, не может использовать метод копирования при записи. В Windows создание нового экземпляра Process порождает новый пустой Python, который затем загружает ту же программу, что и исходный интерпретатор. - person torek; 30.01.2018
comment
@RobertoTrani RE: ОБНОВЛЕНИЕ. Это работает, когда X является целым. К сожалению, это не работает, если X является плавающим. Я изменил вашу строку на X=np.memmap(tmpfile, shape=dim, dtype=float), и у меня точно такая же проблема, как и раньше, и проблема возникает примерно в том же размере, что и раньше (где-то между 2,65e8 и 2,70e8). - person cpicke1; 30.01.2018
comment
Поскольку эта проблема не возникает в Python 3, я смирился с использованием Python 3.6 вместо Python 2.7. Спасибо за всю вашу помощь, потраченную на решение этой проблемы. - person cpicke1; 31.01.2018