Przerwania klawiatury z pulą wieloprocesorową Pythona

Jak mogę obsługiwać zdarzenia typu KeyboardInterrupt z pulami wieloprocesorowymi Pythona? Oto prosty przykład:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Podczas uruchamiania powyższego kodu, {[1] } jest podnoszony po naciśnięciu ^C, ale proces po prostu zawiesza się w tym momencie i muszę go zabić zewnętrznie.

Chcę być w stanie nacisnąć ^C w dowolnym momencie i spowodować, że wszystkie procesy zakończą się z wdziękiem.

Author: Fragsworth, 2009-09-10

10 answers

To jest błąd Pythona. Podczas oczekiwania na warunek w gwintowaniu.Warunek.wait (), Keyboard interrupt nigdy nie jest wysyłany. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

Wyjątek KeyboardInterrupt nie zostanie dostarczony, dopóki funkcja wait() nie powróci i nigdy nie powróci, więc przerwanie nigdy się nie wydarzy. KeyboardInterrupt powinien prawie na pewno przerwać stan oczekiwania.

Zauważ, że nie dzieje się tak, jeśli podany jest limit czasu; cond.wait (1) natychmiast odbierze przerwanie. Obejściem jest więc określ limit czasu. Aby to zrobić, zastąp

    results = pool.map(slowly_square, range(40))

Z

    results = pool.map_async(slowly_square, range(40)).get(9999999)

Lub podobne.

 138
Author: Glenn Maynard,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-06-07 20:45:37

Z tego, co ostatnio odkryłem, najlepszym rozwiązaniem jest skonfigurowanie procesów roboczych tak, aby całkowicie ignorowały SIGINT i ograniczyły cały kod czyszczenia do procesu nadrzędnego. Rozwiązuje to problem zarówno dla bezczynnych, jak i zajętych procesów roboczych i nie wymaga kodu obsługi błędów w procesach potomnych.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Wyjaśnienie i pełny przykładowy kod można znaleźć na stronie http://noswap.com/blog/python-multiprocessing-keyboardinterrupt / i http://github.com/jreese/multiprocessing-keyboardinterrupt odpowiednio.

 60
Author: John Reese,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-24 03:19:22

Z pewnych powodów, tylko wyjątki dziedziczone z podstawowej klasy Exception są obsługiwane normalnie. Jako obejście możesz ponownie podnieść swoją KeyboardInterrupt jako instancję Exception:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalnie otrzymałbyś następujące wyjście:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Więc jeśli trafisz ^C, otrzymasz:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
 30
Author: Andrey Vlasovskikh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-04-01 21:22:58

Głosowana odpowiedź nie rozwiązuje podstawowej kwestii, ale podobny efekt uboczny.

Jesse Noller, autor biblioteki wieloprocesorowej, wyjaśnia, jak poprawnie postępować z CTRL+C podczas używania multiprocessing.Pool W starym poście na blogu .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
 10
Author: noxdafox,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-09-07 03:10:12

Zwykle ta prosta struktura działa dla Ctrl-C {[3] } Na Basenie:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Jak napisano w kilku podobnych postach:

Capture keyboard in Python without try-except

 9
Author: igco,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:03:05

Wydaje się, że są dwa problemy, które sprawiają, że wyjątki podczas przetwarzania irytujące. Pierwsza (zauważona przez Glenna) polega na tym, że musisz użyć map_async z timeoutem zamiast map, aby uzyskać natychmiastową odpowiedź (tzn. nie kończ przetwarzania całej listy). Drugi (odnotowany przez Andreya)jest taki, że multiprocessing nie wyłapuje wyjątków, które nie dziedziczą z Exception (np. SystemExit). Więc oto moje rozwiązanie, które dotyczy obu z nich:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
 5
Author: Paul Price,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-05-15 15:23:24

Stwierdziłem, że na razie najlepszym rozwiązaniem jest nie używać multiprocessingu.funkcja basen, ale raczej rolki własną funkcjonalność basen. Podałem przykład demonstrujący błąd z apply_async, a także przykład pokazujący, jak całkowicie uniknąć korzystania z funkcji puli.

Http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

 4
Author: bboe,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-08-26 17:16:12

Możesz spróbować użyć metody apply_async obiektu Pool, jak to:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Wyjście:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Zaletą tej metody jest to, że wyniki przetworzone przed przerwaniem zostaną zwrócone w słowniku wyników:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
 3
Author: bparker856,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-14 06:37:12

Jestem nowicjuszem w Pythonie. Szukałem wszędzie odpowiedzi i natknąłem się na to i kilka innych blogów i filmów na youtube. Próbowałem skopiować powyższy kod autora i odtworzyć go na moim Pythonie 2.7.13 w windows 7 64-bit. Jest blisko tego, co chcę osiągnąć.

Sprawiłem, że moje procesy potomne ignorują ControlC i powodują zakończenie procesu rodzica. Wygląda na to, że ominięcie procesu potomnego unika tego problemu.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

Część zaczynająca się na pool.terminate() nigdy wygląda na egzekucję.

 1
Author: Linux Cli Aik,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-06-20 22:39:15

/ Align = "left" / Spodziewałem się, że to zadziała tak, jak napisano... spróbuj zmienić slowly_square na:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0
To powinno zadziałać tak jak oczekiwałeś.
 -5
Author: D.Shawley,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-09-11 00:26:22