Przekształcić funkcje z wywołaniem zwrotnym w Generatory Pythona?
Funkcja minimalizacji Scipy (tylko do wykorzystania jako przykład), ma możliwość dodania funkcji zwrotnej na każdym kroku. Więc mogę zrobić coś w stylu:
def my_callback(x):
print x
scipy.optimize.fmin(func, x0, callback=my_callback)
Czy istnieje sposób, aby użyć funkcji zwrotnej do stworzenia generatora wersji fmin, tak, że mogę to zrobić,
for x in my_fmin(func,x0):
print x
Wygląda na to, że może to być możliwe z jakąś kombinacją plonów i wysyłek, ale mogę myśleć o wszystkim. 4 answers
Jak zaznaczono w komentarzach, można to zrobić w nowym wątku, używając Queue
. Wadą jest to, że nadal potrzebujesz jakiegoś sposobu, aby uzyskać dostęp do końcowego wyniku (co fmin
zwraca na końcu). Mój przykład poniżej używa opcjonalnego wywołania zwrotnego, aby coś z nim zrobić (inną opcją byłoby po prostu dać go również, choć twój kod wywołujący musiałby odróżnić wyniki iteracji od wyników końcowych): {]}
from thread import start_new_thread
from Queue import Queue
def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):
q = Queue() # fmin produces, the generator consumes
job_done = object() # signals the processing is done
# Producer
def my_callback(x):
q.put(x)
def task():
ret = scipy.optimize.fmin(func,x0,callback=my_callback)
q.put(job_done)
end_callback(ret) # "Returns" the result of the main call
# Starts fmin in a new thread
start_new_thread(task,())
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
Update: aby zablokować wykonanie następnego iteracja dopóki konsument nie skończy przetwarzania ostatniego, konieczne jest również użycie task_done
i join
.
# Producer
def my_callback(x):
q.put(x)
q.join() # Blocks until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
q.task_done() # Unblocks the producer, so a new iteration can start
Zauważ, że maxsize=1
nie jest konieczne, ponieważ żaden nowy element nie zostanie dodany do kolejki, dopóki nie zostanie zużyty ostatni.
Update 2: należy również pamiętać, że jeśli wszystkie elementy nie zostaną ostatecznie pobrane przez ten generator, utworzony wątek zablokuje się (zablokuje się na zawsze, a jego zasoby nigdy nie zostaną wydane). Producent czeka w kolejce, a ponieważ przechowuje odniesienie do tej kolejki, nigdy nie zostanie odzyskane przez gc, nawet jeśli konsument jest. Kolejka stanie się wtedy nieosiągalna, więc nikt nie będzie mógł zwolnić blokady.
Czyste rozwiązanie tego problemu nie jest znane, jeśli w ogóle jest to możliwe (ponieważ zależałoby to od konkretnej funkcji użytej w miejscu fmin
). Można to obejść za pomocą timeout
, gdy producent podnosi wyjątek, jeśli put
blokuje się zbyt długo:
q = Queue(maxsize=1)
# Producer
def my_callback(x):
q.put(x)
q.put("dummy",True,timeout) # Blocks until the first result is retrieved
q.join() # Blocks again until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
q.task_done() # (one "task_done" per "get")
if next_item is job_done:
break
yield next_item
q.get() # Retrieves the "dummy" object (must be after yield)
q.task_done() # Unblocks the producer, so a new iteration can start
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-05-30 07:20:37
Concept użyj kolejki blokującej z
maxsize=1
i modelem producent / konsument.
Wywołanie zwrotne produkuje, następnie następne połączenie do wywołania zwrotnego zablokuje się w pełnej kolejce.
Konsument następnie zwraca wartość z kolejki, próbuje uzyskać inną wartość i blokuje odczyt.
Producent ma prawo wcisnąć się do kolejki, spłukać i powtórzyć.Użycie:
def dummy(func, arg, callback=None):
for i in range(100):
callback(func(arg+i))
# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
print(i)
# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
print(i)
Może być stosowany zgodnie z oczekiwaniami dla iterator:
for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
print(i)
Klasa iteracyjna:
from thread import start_new_thread
from Queue import Queue
class Iteratorize:
"""
Transforms a function that takes a callback
into a lazy iterator (generator).
"""
def __init__(self, func, ifunc, arg, callback=None):
self.mfunc=func
self.ifunc=ifunc
self.c_callback=callback
self.q = Queue(maxsize=1)
self.stored_arg=arg
self.sentinel = object()
def _callback(val):
self.q.put(val)
def gentask():
ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
self.q.put(self.sentinel)
if self.c_callback:
self.c_callback(ret)
start_new_thread(gentask, ())
def __iter__(self):
return self
def next(self):
obj = self.q.get(True,None)
if obj is self.sentinel:
raise StopIteration
else:
return obj
Może prawdopodobnie zrobić z pewnym czyszczeniem, aby zaakceptować *args
i **kwargs
dla funkcji zawijanej i / lub wywołania zwrotnego wyniku końcowego.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-06-02 11:54:49
Generator jako koroutine (bez gwintowania)
Niech FakeFtp
z retrbinary
funkcją wywołującą wywołanie zwrotne przy każdym pomyślnym odczycie fragmentu danych:
class FakeFtp(object):
def __init__(self):
self.data = iter(["aaa", "bbb", "ccc", "ddd"])
def login(self, user, password):
self.user = user
self.password = password
def retrbinary(self, cmd, cb):
for chunk in self.data:
cb(chunk)
Używanie prostej funkcji callback ma wadę, że jest wywoływana wielokrotnie, a callback funkcja nie może łatwo zachować kontekstu między wywołaniami.
Poniższy kod definiuje generator process_chunks
, który będzie mógł odbierać fragmenty danych jeden
przez jednego i przetwarzając je. W przeciwieństwie do prostego wywołania zwrotnego, tutaj jesteśmy w stanie zachować wszystkie
przetwarzanie w ramach jednej funkcji bez utraty kontekstu.
from contextlib import closing
from itertools import count
def main():
processed = []
def process_chunks():
for i in count():
try:
# (repeatedly) get the chunk to process
chunk = yield
except GeneratorExit:
# finish_up
print("Finishing up.")
return
else:
# Here process the chunk as you like
print("inside coroutine, processing chunk:", i, chunk)
product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
processed.append(product)
with closing(process_chunks()) as coroutine:
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
print("processed result", processed)
print("DONE")
Aby zobaczyć kod w akcji, umieść klasę FakeFtp
, kod pokazany powyżej i następujący wiersz:
main()
Do jednego pliku i nazwij go:
$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE
Jak to działa
processed = []
jest tu tylko po to, aby pokazać, że generator process_chunks
nie będzie miał problemów z
współpraca z jego zewnętrznym kontekstem. Wszystko jest owinięte w def main():
, aby udowodnić, że nie ma potrzeby
użyj globalnego zmienne.
def process_chunks()
jest podstawą rozwiązania. Może mieć parametry wejściowe jednego strzału (nie
użyte tutaj), ale głównym punktem, w którym odbiera dane wejściowe jest każda linia yield
zwracająca to, co ktoś wysyła
poprzez {[14] } do instancji tego generatora. Można coroutine.send(chunk)
, ale w tym przykładzie odbywa się to poprzez wywołanie zwrotne odwołujące się do tej funkcji callback.send
.
Zauważ, że w realnym rozwiązaniu nie ma problemu, aby w kodzie było wiele yield
s, są to
przetwarzane jeden po drugim. Można to wykorzystać np. do odczytać (i zignorować) nagłówek pliku CSV, a następnie
Kontynuuj przetwarzanie rekordów z danymi.
coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()
Prawdziwy kod używa contextlib
closing
context manager, aby zapewnić, że coroutine.close()
jest
zawsze dzwoniłem.
Wnioski
To rozwiązanie nie zapewnia iteratora do zużywania danych w tradycyjnym stylu " z Na Zewnątrz". Z drugiej strony, jesteśmy w stanie:
- użyj generatora " z inside "
- zachowaj całe przetwarzanie iteracyjne w ramach jednej funkcji bez przerywania między wywołaniami zwrotnymi
- opcjonalnie użyj kontekstu zewnętrznego
- podaj użyteczne wyniki na zewnątrz
- wszystko to można zrobić bez użycia gwintowania
Credits : rozwiązanie jest mocno inspirowane przez SO answer Python FTP" chunk " iterator (bez ładowania całego pliku do pamięci) napisane przez user2357112
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:23
A może
data = []
scipy.optimize.fmin(func,x0,callback=data.append)
for line in data:
print line
Jeśli nie, to co dokładnie chcesz zrobić z danymi generatora?
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-04-01 22:18:17