Przekształcić funkcje z wywołaniem zwrotnym w Generatory Pythona?

Funkcja minimalizacji Scipy (tylko do wykorzystania jako przykład), ma możliwość dodania funkcji zwrotnej na każdym kroku. Więc mogę zrobić coś w stylu:

def my_callback(x):
    print x
scipy.optimize.fmin(func, x0, callback=my_callback)

Czy istnieje sposób, aby użyć funkcji zwrotnej do stworzenia generatora wersji fmin, tak, że mogę to zrobić,

for x in my_fmin(func,x0):
    print x
Wygląda na to, że może to być możliwe z jakąś kombinacją plonów i wysyłek, ale mogę myśleć o wszystkim.
Author: Eric Lebigot, 2012-04-02

4 answers

Jak zaznaczono w komentarzach, można to zrobić w nowym wątku, używając Queue. Wadą jest to, że nadal potrzebujesz jakiegoś sposobu, aby uzyskać dostęp do końcowego wyniku (co fmin zwraca na końcu). Mój przykład poniżej używa opcjonalnego wywołania zwrotnego, aby coś z nim zrobić (inną opcją byłoby po prostu dać go również, choć twój kod wywołujący musiałby odróżnić wyniki iteracji od wyników końcowych): {]}

from thread import start_new_thread
from Queue import Queue

def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):

    q = Queue() # fmin produces, the generator consumes
    job_done = object() # signals the processing is done

    # Producer
    def my_callback(x):
        q.put(x)
    def task():
        ret = scipy.optimize.fmin(func,x0,callback=my_callback)
        q.put(job_done)
        end_callback(ret) # "Returns" the result of the main call

    # Starts fmin in a new thread
    start_new_thread(task,())

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item

Update: aby zablokować wykonanie następnego iteracja dopóki konsument nie skończy przetwarzania ostatniego, konieczne jest również użycie task_done i join.

    # Producer
    def my_callback(x):
        q.put(x)
        q.join() # Blocks until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item
        q.task_done() # Unblocks the producer, so a new iteration can start

Zauważ, że maxsize=1 nie jest konieczne, ponieważ żaden nowy element nie zostanie dodany do kolejki, dopóki nie zostanie zużyty ostatni.

Update 2: należy również pamiętać, że jeśli wszystkie elementy nie zostaną ostatecznie pobrane przez ten generator, utworzony wątek zablokuje się (zablokuje się na zawsze, a jego zasoby nigdy nie zostaną wydane). Producent czeka w kolejce, a ponieważ przechowuje odniesienie do tej kolejki, nigdy nie zostanie odzyskane przez gc, nawet jeśli konsument jest. Kolejka stanie się wtedy nieosiągalna, więc nikt nie będzie mógł zwolnić blokady.

Czyste rozwiązanie tego problemu nie jest znane, jeśli w ogóle jest to możliwe (ponieważ zależałoby to od konkretnej funkcji użytej w miejscu fmin). Można to obejść za pomocą timeout, gdy producent podnosi wyjątek, jeśli put blokuje się zbyt długo:

    q = Queue(maxsize=1)

    # Producer
    def my_callback(x):
        q.put(x)
        q.put("dummy",True,timeout) # Blocks until the first result is retrieved
        q.join() # Blocks again until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        q.task_done()                   # (one "task_done" per "get")
        if next_item is job_done:
            break
        yield next_item
        q.get() # Retrieves the "dummy" object (must be after yield)
        q.task_done() # Unblocks the producer, so a new iteration can start
 15
Author: mgibsonbr,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-05-30 07:20:37

Concept użyj kolejki blokującej z maxsize=1 i modelem producent / konsument.

Wywołanie zwrotne produkuje, następnie następne połączenie do wywołania zwrotnego zablokuje się w pełnej kolejce.

Konsument następnie zwraca wartość z kolejki, próbuje uzyskać inną wartość i blokuje odczyt.

Producent ma prawo wcisnąć się do kolejki, spłukać i powtórzyć.

Użycie:

def dummy(func, arg, callback=None):
  for i in range(100):
    callback(func(arg+i))

# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
  print(i)

# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
   print(i)

Może być stosowany zgodnie z oczekiwaniami dla iterator:

for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
  print(i)

Klasa iteracyjna:

from thread import start_new_thread
from Queue import Queue

class Iteratorize:
  """ 
  Transforms a function that takes a callback 
  into a lazy iterator (generator).
  """
  def __init__(self, func, ifunc, arg, callback=None):
    self.mfunc=func
    self.ifunc=ifunc
    self.c_callback=callback
    self.q = Queue(maxsize=1)
    self.stored_arg=arg
    self.sentinel = object()

    def _callback(val):
      self.q.put(val)

    def gentask():
      ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
      self.q.put(self.sentinel)
      if self.c_callback:
        self.c_callback(ret)

    start_new_thread(gentask, ())

  def __iter__(self):
    return self

  def next(self):
    obj = self.q.get(True,None)
    if obj is self.sentinel:
     raise StopIteration 
    else:
      return obj

Może prawdopodobnie zrobić z pewnym czyszczeniem, aby zaakceptować *args i **kwargs dla funkcji zawijanej i / lub wywołania zwrotnego wyniku końcowego.

 5
Author: brice,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-06-02 11:54:49

Generator jako koroutine (bez gwintowania)

Niech FakeFtp z retrbinary funkcją wywołującą wywołanie zwrotne przy każdym pomyślnym odczycie fragmentu danych:

class FakeFtp(object):
    def __init__(self):
        self.data = iter(["aaa", "bbb", "ccc", "ddd"])

    def login(self, user, password):
        self.user = user
        self.password = password

    def retrbinary(self, cmd, cb):
        for chunk in self.data:
            cb(chunk)

Używanie prostej funkcji callback ma wadę, że jest wywoływana wielokrotnie, a callback funkcja nie może łatwo zachować kontekstu między wywołaniami.

Poniższy kod definiuje generator process_chunks, który będzie mógł odbierać fragmenty danych jeden przez jednego i przetwarzając je. W przeciwieństwie do prostego wywołania zwrotnego, tutaj jesteśmy w stanie zachować wszystkie przetwarzanie w ramach jednej funkcji bez utraty kontekstu.

from contextlib import closing
from itertools import count


def main():
    processed = []

    def process_chunks():
        for i in count():
            try:
                # (repeatedly) get the chunk to process
                chunk = yield
            except GeneratorExit:
                # finish_up
                print("Finishing up.")
                return
            else:
                # Here process the chunk as you like
                print("inside coroutine, processing chunk:", i, chunk)
                product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
                processed.append(product)

    with closing(process_chunks()) as coroutine:
        # Get the coroutine to the first yield
        coroutine.next()
        ftp = FakeFtp()
        # next line repeatedly calls `coroutine.send(data)`
        ftp.retrbinary("RETR binary", cb=coroutine.send)
        # each callback "jumps" to `yield` line in `process_chunks`

    print("processed result", processed)
    print("DONE")

Aby zobaczyć kod w akcji, umieść klasę FakeFtp, kod pokazany powyżej i następujący wiersz:

main()

Do jednego pliku i nazwij go:

$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE

Jak to działa

processed = [] jest tu tylko po to, aby pokazać, że generator process_chunks nie będzie miał problemów z współpraca z jego zewnętrznym kontekstem. Wszystko jest owinięte w def main():, aby udowodnić, że nie ma potrzeby użyj globalnego zmienne.

def process_chunks() jest podstawą rozwiązania. Może mieć parametry wejściowe jednego strzału (nie użyte tutaj), ale głównym punktem, w którym odbiera dane wejściowe jest każda linia yield zwracająca to, co ktoś wysyła poprzez {[14] } do instancji tego generatora. Można coroutine.send(chunk), ale w tym przykładzie odbywa się to poprzez wywołanie zwrotne odwołujące się do tej funkcji callback.send.

Zauważ, że w realnym rozwiązaniu nie ma problemu, aby w kodzie było wiele yields, są to przetwarzane jeden po drugim. Można to wykorzystać np. do odczytać (i zignorować) nagłówek pliku CSV, a następnie Kontynuuj przetwarzanie rekordów z danymi.

Możemy utworzyć instancję i użyć generatora w następujący sposób:]}
coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()

ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`

# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()

Prawdziwy kod używa contextlib closing context manager, aby zapewnić, że coroutine.close() jest zawsze dzwoniłem.

Wnioski

To rozwiązanie nie zapewnia iteratora do zużywania danych w tradycyjnym stylu " z Na Zewnątrz". Z drugiej strony, jesteśmy w stanie:

  • użyj generatora " z inside "
  • zachowaj całe przetwarzanie iteracyjne w ramach jednej funkcji bez przerywania między wywołaniami zwrotnymi
  • opcjonalnie użyj kontekstu zewnętrznego
  • podaj użyteczne wyniki na zewnątrz
  • wszystko to można zrobić bez użycia gwintowania

Credits : rozwiązanie jest mocno inspirowane przez SO answer Python FTP" chunk " iterator (bez ładowania całego pliku do pamięci) napisane przez user2357112

 4
Author: Jan Vlcinsky,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:23

A może

data = []
scipy.optimize.fmin(func,x0,callback=data.append)
for line in data:
    print line

Jeśli nie, to co dokładnie chcesz zrobić z danymi generatora?

 0
Author: Winston Ewert,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-04-01 22:18:17