Jak przekazać duże tablice numpy między podprocesami Pythona bez zapisywania na dysku?

Czy jest dobry sposób na przekazanie dużej ilości danych pomiędzy dwoma podprocesami Pythona bez użycia dysku? Oto kreskówkowy przykład tego, co mam nadzieję osiągnąć:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        data.dump('data.pkl')
        sys.stdout.write('data.pkl' + '\\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data\n')
    print proc.stdout.readline().rstrip()
    a = numpy.load('data.pkl')
    print a.shape

proc.stdin.write('done\n')

Tworzy podproces, który generuje tablicę numpy i zapisuje tablicę na dysk. Proces macierzysty ładuje tablicę z dysku. To działa!

Problem w tym, że nasz sprzęt potrafi generować dane 10x szybciej niż dysk potrafi odczytywać/zapisywać. Czy istnieje sposób na przeniesienie danych z jednego procesu Pythona do kolejny czysto w pamięci, może nawet bez robienia kopii danych? Czy Mogę zrobić coś w stylu "przechodzenie przez odniesienie"?

Moja pierwsza próba przeniesienia danych czysto w pamięci jest dość kiepska:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        ##Note that this is NFG if there's a '10' in the array:
        sys.stdout.write(data.tostring() + '\\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data\n')
    a = numpy.fromstring(proc.stdout.readline().rstrip(), dtype=numpy.uint8)
    print a.shape

proc.stdin.write('done\n')

Jest to bardzo wolne (znacznie wolniejsze niż zapisywanie na dysk) i bardzo, bardzo kruche. Musi być lepszy sposób!

Nie jestem żonaty z modułem 'podproces', o ile proces pobierania danych nie blokuje aplikacji nadrzędnej. Krótko próbowałem "multiprocessingu", ale jak dotąd bez powodzenia.

Tło: mamy sprzęt, który generuje do ~2 GB/S danych w serii buforów ctypes. Kod Pythona do obsługi tych buforów ma pełne ręce roboty po prostu radzenie sobie z zalewem informacji. Chcę skoordynować ten przepływ informacji z kilkoma innymi elementami sprzętowymi działającymi jednocześnie w programie "master", bez podprocesów blokujących się nawzajem. Moim obecnym podejściem jest zagotowanie danych trochę w podprocesie przed zapisaniem na dysk, ale miło by było przekazać cały monty procesowi 'master'.

Author: Andrew, 2011-02-17

6 answers

Podczas googlowania po więcej informacji o kodzie, który opublikował Joe Kington, znalazłem numpy-sharedmem Pakiet. Sądząc po tym numpy/multiprocessing tutorial wydaje się, że dzieli to samo dziedzictwo intelektualne (może w dużej mierze tych samych autorów? -- Nie jestem pewien).

Używając modułu sharedmem, możesz utworzyć tablicę numpy z dzieloną pamięcią (Super!), i używać go z multiprocessing w następujący sposób:

import sharedmem as shm
import numpy as np
import multiprocessing as mp

def worker(q,arr):
    done = False
    while not done:
        cmd = q.get()
        if cmd == 'done':
            done = True
        elif cmd == 'data':
            ##Fake data. In real life, get data from hardware.
            rnd=np.random.randint(100)
            print('rnd={0}'.format(rnd))
            arr[:]=rnd
        q.task_done()

if __name__=='__main__':
    N=10
    arr=shm.zeros(N,dtype=np.uint8)
    q=mp.JoinableQueue()    
    proc = mp.Process(target=worker, args=[q,arr])
    proc.daemon=True
    proc.start()

    for i in range(3):
        q.put('data')
        # Wait for the computation to finish
        q.join()   
        print arr.shape
        print(arr)
    q.put('done')
    proc.join()

Wydajność pracy

rnd=53
(10,)
[53 53 53 53 53 53 53 53 53 53]
rnd=15
(10,)
[15 15 15 15 15 15 15 15 15 15]
rnd=87
(10,)
[87 87 87 87 87 87 87 87 87 87]
 26
Author: unutbu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-02-18 01:31:54

Zasadniczo, po prostu chcesz dzielić blok pamięci między procesami i wyświetlać go jako tablicę numpy, prawda?

W takim razie, spójrz na to (Opublikowane do numpy-dyskusja przez Nadav Horesh jakiś czas temu, nie Moja praca). Istnieje kilka podobnych implementacji (niektóre bardziej elastyczne), ale wszystkie zasadniczo wykorzystują tę zasadę.

#    "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing"
# Modified and corrected by Nadav Horesh, Mar 2010
# No rights reserved


import numpy as N
import ctypes
import multiprocessing as MP

_ctypes_to_numpy = {
    ctypes.c_char   : N.dtype(N.uint8),
    ctypes.c_wchar  : N.dtype(N.int16),
    ctypes.c_byte   : N.dtype(N.int8),
    ctypes.c_ubyte  : N.dtype(N.uint8),
    ctypes.c_short  : N.dtype(N.int16),
    ctypes.c_ushort : N.dtype(N.uint16),
    ctypes.c_int    : N.dtype(N.int32),
    ctypes.c_uint   : N.dtype(N.uint32),
    ctypes.c_long   : N.dtype(N.int64),
    ctypes.c_ulong  : N.dtype(N.uint64),
    ctypes.c_float  : N.dtype(N.float32),
    ctypes.c_double : N.dtype(N.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))


def shmem_as_ndarray(raw_array, shape=None ):

    address = raw_array._obj._wrapper.get_address()
    size = len(raw_array)
    if (shape is None) or (N.asarray(shape).prod() != size):
        shape = (size,)
    elif type(shape) is int:
        shape = (shape,)
    else:
        shape = tuple(shape)

    dtype = _ctypes_to_numpy[raw_array._obj._type_]
    class Dummy(object): pass
    d = Dummy()
    d.__array_interface__ = {
        'data' : (address, False),
        'typestr' : dtype.str,
        'descr' :   dtype.descr,
        'shape' : shape,
        'strides' : None,
        'version' : 3}
    return N.asarray(d)

def empty_shared_array(shape, dtype, lock=True):
    '''
    Generate an empty MP shared array given ndarray parameters
    '''

    if type(shape) is not int:
        shape = N.asarray(shape).prod()
    try:
        c_type = _numpy_to_ctypes[dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[N.dtype(dtype)]
    return MP.Array(c_type, shape, lock=lock)

def emptylike_shared_array(ndarray, lock=True):
    'Generate a empty shared array with size and dtype of a  given array'
    return empty_shared_array(ndarray.size, ndarray.dtype, lock)
 9
Author: Joe Kington,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-02-17 20:18:11

Z pozostałych odpowiedzi wynika, że numpy-sharedmem jest drogą do zrobienia.

Jednak, jeśli potrzebujesz czystego rozwiązania Pythona lub instalowanie rozszerzeń, cython lub tym podobnych jest (dużym) kłopotem, możesz użyć następującego kodu, który jest uproszczoną wersją kodu Nadav:

import numpy, ctypes, multiprocessing

_ctypes_to_numpy = {
    ctypes.c_char   : numpy.dtype(numpy.uint8),
    ctypes.c_wchar  : numpy.dtype(numpy.int16),
    ctypes.c_byte   : numpy.dtype(numpy.int8),
    ctypes.c_ubyte  : numpy.dtype(numpy.uint8),
    ctypes.c_short  : numpy.dtype(numpy.int16),
    ctypes.c_ushort : numpy.dtype(numpy.uint16),
    ctypes.c_int    : numpy.dtype(numpy.int32),
    ctypes.c_uint   : numpy.dtype(numpy.uint32),
    ctypes.c_long   : numpy.dtype(numpy.int64),
    ctypes.c_ulong  : numpy.dtype(numpy.uint64),
    ctypes.c_float  : numpy.dtype(numpy.float32),
    ctypes.c_double : numpy.dtype(numpy.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(),
                            _ctypes_to_numpy.keys()))


def shm_as_ndarray(mp_array, shape = None):
    '''Given a multiprocessing.Array, returns an ndarray pointing to
    the same data.'''

    # support SynchronizedArray:
    if not hasattr(mp_array, '_type_'):
        mp_array = mp_array.get_obj()

    dtype = _ctypes_to_numpy[mp_array._type_]
    result = numpy.frombuffer(mp_array, dtype)

    if shape is not None:
        result = result.reshape(shape)

    return numpy.asarray(result)


def ndarray_to_shm(array, lock = False):
    '''Generate an 1D multiprocessing.Array containing the data from
    the passed ndarray.  The data will be *copied* into shared
    memory.'''

    array1d = array.ravel(order = 'A')

    try:
        c_type = _numpy_to_ctypes[array1d.dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)]

    result = multiprocessing.Array(c_type, array1d.size, lock = lock)
    shm_as_ndarray(result)[:] = array1d
    return result

Użyłbyś go tak:

  1. użyj sa = ndarray_to_shm(a), aby przekształcić ndarray a w wspólny wieloprocesor.Array .
  2. Użyj multiprocessing.Process(target = somefunc, args = (sa, ) (i start, może join) wywołanie somefunc w oddzielnym procesie , przekazującym udostępnioną tablicę.
  3. W somefunc, użyj a = shm_as_ndarray(sa), aby uzyskać ndarray wskazujący na udostępnione dane. (W rzeczywistości, możesz zrobić to samo w oryginalnym procesie, natychmiast po utworzeniu sa, aby mieć dwa ndarray odwołujące się do tych samych danych.)

AFAIKI, nie musisz ustawiać Locka na True, ponieważ shm_as_ndarray i tak nie użyje blokady. Jeśli potrzebujesz blokady, ustawisz lock na True i wywołaj / align = "left" /

Również, jeśli Twoja tablica nie jest 1-wymiarowa, możesz chcieć przenieść kształt wraz z sa(np. użyj args = (sa, a.shape)).

To rozwiązanie ma tę zaletę, że nie wymaga dodatkowych pakietów ani modułów rozszerzeń, z wyjątkiem multiprocessingu (który znajduje się w bibliotece standardowej).

 5
Author: hans_meine,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-03-13 16:26:02

Użyj wątków. Ale chyba będziesz miał problemy z Gilem.

Zamiast tego: Wybierz swoją truciznę .

Wiem z implementacji MPI, z którymi pracuję, że używają one pamięci współdzielonej do komunikacji na węzłach. W takim przypadku będziesz musiał zakodować własną synchronizację.

2 GB/S wygląda na to, że będziesz miał problemy z większością "łatwych" metod, w zależności od ograniczeń w czasie rzeczywistym i dostępnej pamięci głównej.

 3
Author: ebo,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:32:00

Użyj wątków. Prawdopodobnie nie będziesz miał problemów z GIL.

GIL dotyczy tylko kodu Pythona, a nie bibliotek wspieranych przez C/Fortran / Cython. Większość operacji numpy i spory kawałek naukowego stosu Pythona wspieranego przez C zwalnia GIL i może działać dobrze na wielu rdzeniach. [[5]} Ten blogpost omawia GIL i Python naukowy bardziej dogłębnie.

Edit

Proste sposoby korzystania z wątków Obejmują moduł threading i multiprocessing.pool.ThreadPool.

 1
Author: MRocklin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-05-04 22:54:38

Jedną z możliwości do rozważenia jest użycie PAMIĘCI RAM do tymczasowego przechowywania plików, które mają być współdzielone między procesami. Dysk RAM to miejsce, w którym część pamięci RAM jest traktowana jako logiczny dysk twardy, na którym pliki mogą być zapisywane/odczytywane tak, jak w przypadku zwykłego dysku, ale z prędkością odczytu/zapisu pamięci RAM.

Ten artykuł opisuje użycie oprogramowania ImDisk (dla MS Win) do tworzenia takiego dysku i uzyskuje prędkość odczytu/zapisu plików 6-10 Gigabajty / sekundę: https://www.tekrevue.com/tip/create-10-gbs-ram-disk-windows/

Przykład w Ubuntu: https://askubuntu.com/questions/152868/how-do-i-make-a-ram-disk#152871

Inną zauważalną korzyścią jest to, że pliki o dowolnych formatach mogą być przekazywane za pomocą takiej metody: np. Picke, JSON, XML, CSV, HDF5, itp...

Należy pamiętać, że wszystko zapisane na dysku RAM jest wymazywane po ponownym uruchomieniu.

 1
Author: Justas,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-01-02 17:37:43