Jak połączyć Basen.map with Array (shared memory) in Python multiprocessing?

Mam bardzo dużą (tylko do odczytu) tablicę danych, które chcę przetwarzać równolegle przez wiele procesów.

Lubię Basen.funkcja map i chciałby używać jej do równoległego obliczania funkcji na tych danych.

Zauważyłem, że można użyć klasy Value lub Array do wykorzystania danych pamięci współdzielonej między procesami. Ale kiedy próbuję tego użyć, dostaję RuntimeError: 'Obiekty SynchronizedString powinny być współdzielone tylko między procesami poprzez dziedziczenie podczas korzystania z puli.mapka Funkcja:

Oto uproszczony przykład tego, co próbuję zrobić:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
Czy ktoś może mi powiedzieć, co robię źle?

Więc to, co chciałbym zrobić, to przekazać informacje o nowo utworzonej tablicy przydzielonej pamięci współdzielonej procesom po ich utworzeniu w puli procesów.

Author: Jeroen Dirks, 2009-11-04

4 answers

[[3]}próbuje ponownie, jak właśnie zobaczyłem bounty;)

Zasadniczo myślę, że komunikat o błędzie oznacza to, co powiedział-wieloprocesorowe Tablice pamięci współdzielonej nie mogą być przekazywane jako argumenty (przez wytrawianie). Serializacja danych nie ma sensu - chodzi o to, że dane są dzieloną pamięcią. Więc musisz sprawić, że udostępniona tablica będzie globalna. Myślę, że lepiej jest umieścić go jako atrybut modułu, jak w mojej pierwszej odpowiedzi, ale samo pozostawienie go jako zmiennej globalnej w twoim przykładzie również działa dobrze. Taking on jeśli nie chcesz ustawiać danych przed widelcem, oto zmodyfikowany przykład. Jeśli chcesz mieć więcej niż jedną współdzieloną tablicę (i dlatego chciałeś przekazać toshare jako argument), możesz podobnie utworzyć globalną listę współdzielonych tablic i po prostu przekazać indeks do count_it (który stałby się for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[EDIT: powyższe nie działa w systemie windows z powodu nie używania forka. Jednak poniżej działa na Windows, nadal używając puli, więc myślę, że jest to najbliżej tego, co chcesz:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

Nie jestem pewien, dlaczego map nie będzie zbierać tablicy, ale proces i Pula będą-myślę, że być może został przeniesiony w momencie inicjalizacji podprocesu w windows. Zauważ, że dane są nadal ustawiane za widelcem.

 37
Author: robince,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-11-12 18:19:03

Widzę problem w tym, że Pool nie obsługuje wytrawiania udostępnionych danych poprzez listę argumentów. To właśnie oznacza komunikat o błędzie "obiekty powinny być współdzielone między procesami tylko poprzez dziedziczenie". Udostępnione dane muszą być dziedziczone, np. globalne, jeśli chcesz je udostępnić za pomocą klasy Pool.

Jeśli musisz je przekazać jawnie, być może będziesz musiał użyć przetwarzania wieloprocesorowego.Proces. Oto twój przerobiony przykład:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

Wyjście: ("s", 9) ("a", 2) ("b", 3) ("d", 12)

Kolejność elementów kolejki może być różna.

Aby uczynić to bardziej ogólnym i podobnym do Pool, możesz utworzyć stałą liczbę N procesów, podzielić listę kluczy na N kawałków, a następnie użyć funkcji owijania jako celu procesu, która wywoła count_it dla każdego klucza na liście, który jest przekazywany, jak:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)
 4
Author: jwilson,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-11-10 02:08:28

Jeśli dane są tylko odczytywane, po prostu ustaw zmienną w module przed widełką z puli. Wtedy wszystkie procesy potomne powinny mieć do niego dostęp i nie zostaną skopiowane pod warunkiem, że do niego nie napiszesz.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

Jeśli chcesz spróbować użyć Array, możesz spróbować użyć argumentu słowa kluczowego lock=False (domyślnie jest to prawda).

 2
Author: robince,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-11-04 20:37:31

Moduł multiprocessing.sharedctypes zapewnia funkcje przydzielania obiektów ctypes z pamięci współdzielonej, które mogą być dziedziczone przez procesy potomne.

Więc twoje użycie sharedctypes jest złe. Czy chcesz dziedziczyć tę tablicę od procesu nadrzędnego, czy wolisz przekazać ją jawnie? W pierwszym przypadku musisz utworzyć zmienną globalną, jak sugerują inne odpowiedzi. Ale nie musisz używać sharedctypes, aby przekazać go jawnie, wystarczy przekazać oryginalny testData.

BTW, Twój użycie Pool.map() jest błędne. Ma ten sam interfejs co wbudowana funkcja map() (namieszałeś ją starmap()?). Poniżej znajduje się przykład pracy z jawnie przekazującą tablicę:

from multiprocessing import Pool

def count_it( (arr, key) ):
    count = 0
    for c in arr:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
    pool = Pool()
    print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])
 -1
Author: Denis Otkidach,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-11-12 15:26:50