Nieblokujący odczyt podprocesu.Rura w Pythonie

Używam modułu podprocesu aby uruchomić podproces i połączyć się z jego strumieniem wyjściowym (standardowe wyjście). Chcę móc wykonywać odczyty bez blokowania na standardowym wyjściu. Jest jakiś sposób .readline non-blocking lub aby sprawdzić, czy w strumieniu są dane przed wywołaniem .readline? Chciałbym, żeby to było przenośne lub przynajmniej działało pod Windowsem i Linuksem.

Oto Jak to na razie robię (blokuje się na .readline jeśli brak danych jest dostępne):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Author: Peter Mortensen, 2008-12-17

29 answers

fcntl, select, asyncproc to nie pomoże w tej sprawie.

Niezawodnym sposobem odczytu strumienia bez blokowania niezależnie od systemu operacyjnego jest użycie Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
 427
Author: jfs,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-06 15:02:06

Często miałem podobny problem; programy Pythona, które często piszę, muszą mieć możliwość wykonywania niektórych podstawowych funkcji, jednocześnie akceptując dane wejściowe użytkownika z wiersza poleceń (stdin). Po prostu umieszczenie funkcji obsługi danych wejściowych użytkownika w innym wątku nie rozwiązuje problemu, ponieważ readline() blokuje i nie ma limitu czasu. Jeśli podstawowa funkcjonalność jest kompletna i nie ma już potrzeby oczekiwania na dalsze wejście użytkownika, zazwyczaj chcę, aby mój program zakończył działanie, ale nie może, ponieważ readline() nadal blokuje w innym wątku czekając na linię. Rozwiązaniem, które znalazłem na ten problem, jest uczynienie stdin nieblokującym plikiem za pomocą modułu fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

Moim zdaniem jest to nieco czystsze niż użycie modułów select lub signal do rozwiązania tego problemu, ale z drugiej strony działa tylko na Unixie...

 81
Author: Jesse,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-11-20 02:41:51

Python 3.4 wprowadza nowe tymczasowe API dla asynchronicznego IO -- asyncio Moduł .

Podejście jest podobne do twisted-na podstawie odpowiedzi @Bryan Ward -- define a protocol and its methods are called as soon as data is ready:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Zobacz "podproces" w dokumentach .

Istnieje interfejs wysokiego poziomu asyncio.create_subprocess_exec(), który zwraca Process obiekty, które pozwalają na odczyt linii asynchronicznej za pomocą StreamReader.readline() coroutine (z async/await Python 3.5 + składnia):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() wykonuje następujące zadania:

  • Uruchom podproces, przekieruj jego stdout do rury
  • odczyt linii z podprocesu ' stdout asynchronicznie
  • Zabij podproces
  • poczekaj aż wyjdzie

Każdy krok może być ograniczony czasem sekund, jeśli to konieczne.

 45
Author: jfs,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:44

Wypróbuj moduł asyncproc . Na przykład:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Moduł zajmuje się wszystkimi wątkami zgodnie z sugestią S. Lotta.

 19
Author: Noah,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-03-09 19:31:53

Można to zrobić naprawdę łatwo w Twisted . W zależności od istniejącej bazy kodu, może to nie być takie łatwe w użyciu, ale jeśli budujesz pokręconą aplikację, to rzeczy takie jak ta stają się prawie trywialne. Tworzysz klasę ProcessProtocol i nadpisujesz metodę outReceived(). Twisted (w zależności od zastosowanego reaktora) to zwykle duża pętla select() z zainstalowanymi wywołaniami zwrotnymi do obsługi danych z różnych deskryptorów plików (często gniazd sieciowych). Tak więc metoda outReceived() jest po prostu instalacją oddzwanianie do obsługi danych pochodzących z STDOUT. Prosty przykład demonstrujący to zachowanie jest następujący:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Twisted documentation ma na ten temat kilka dobrych informacji.

Jeśli zbudujesz całą aplikację wokół Twisted, sprawi to, że asynchroniczna komunikacja z innymi procesami, lokalnymi lub zdalnymi, będzie naprawdę elegancka. Z drugiej strony, jeśli twój program nie jest zbudowany na Twisted, nie będzie to tak naprawdę pomocne. Mam nadzieję, że to może być pomocne dla innych czytelników, nawet jeśli nie ma zastosowania do konkretnej aplikacji.

 17
Author: Bryan Ward,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-05-14 01:26:17

Użyj select & read (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Dla readline () - jak:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
 14
Author: Andy Jackson,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-01-26 01:02:08

Jednym z rozwiązań jest zrobienie innego procesu, aby wykonać odczyt procesu lub zrobić wątek procesu z limitem czasu.

Oto gwintowana wersja funkcji timeout:

Http://code.activestate.com/recipes/473878/

Jednak, czy trzeba czytać stdout jak to nadchodzi? Innym rozwiązaniem może być zrzut danych wyjściowych do pliku i oczekiwanie na zakończenie procesu za pomocą p. wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
 8
Author: monkut,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2008-12-17 18:16:48

Zastrzeżenie: to działa tylko dla tornado

Możesz to zrobić, ustawiając FD na nieblokujący, a następnie użyj ioloop do rejestracji wywołań zwrotnych. Zapakowałem to w jajko o nazwie tornado_subprocess i można go zainstalować przez PyPI:

easy_install tornado_subprocess

Teraz możesz zrobić coś takiego:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

Możesz go również użyć z RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
 7
Author: Vukasin Toroman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-07-06 12:42:26

Oto Mój kod, używany do przechwytywania każdego wyjścia z podprocesu, w tym częściowych linii. Pompuje w tym samym czasie i stdout i stderr w prawie prawidłowej kolejności.

Przetestowano i poprawnie działa na Pythonie 2.7 linux & windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
 7
Author: datacompboy,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-06-11 19:09:11

Dodaję ten problem, aby przeczytać jakiś podproces.Popen stdout. Oto moje rozwiązanie nie blokujące odczytu:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
 6
Author: Sebastien Claeys,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-04-21 20:51:57

Istniejące rozwiązania nie działały dla mnie (szczegóły poniżej). Ostatecznie udało się zaimplementować readline używając read(1) (na podstawie tej odpowiedzi ). Ten ostatni nie blokuje:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Dlaczego istniejące rozwiązania nie działały:

  1. rozwiązania wymagające readline (w tym rozwiązania oparte na kolejkach) zawsze blokują się. Jest to trudne(niemożliwe?), aby zabić wątek, który wykonuje readline. Ginie tylko wtedy, gdy zakończy się proces, który go stworzył, ale nie wtedy, gdy wyjście-proces produkcji jest zabity.
  2. mieszanie niskopoziomowych wywołań fcntl z wysokopoziomowymi wywołaniami readline może nie działać poprawnie, jak zauważył anonnn.
  3. użycie select.poll () jest schludne, ale nie działa na Windows zgodnie z pythonowymi dokumentami.
  4. używanie bibliotek innych firm wydaje się przesadą dla tego zadania i dodaje dodatkowe zależności.
 6
Author: Vikram Pudi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:02:56

Na systemach uniksopodobnych i Pythonie 3.5 + jest os.set_blocking co robi dokładnie to, co mówi.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

To wyjście:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

Z os.set_blocking skomentował (a):

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
 5
Author: saaj,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-12-11 17:51:31

Ta wersja nieblokującego odczytu nie wymaga specjalnych modułów i będzie działać po wyjęciu z pudełka na większości dystrybucji Linuksa.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
 4
Author: Tom Lime,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-05-24 15:55:16

Oto proste rozwiązanie oparte na wątkach, które:

  • działa zarówno na Linuksie, jak i Windows(nie polegając na select).
  • odczytuje zarówno stdout jak i stderr asynchronicznie.
  • nie polega na aktywnej ankiecie z dowolnym czasem oczekiwania (przyjazny dla procesora).
  • nie używa asyncio (co może kolidować z innymi bibliotekami).
  • uruchamia się, dopóki proces potomny nie zakończy się.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()
 4
Author: Olivier Michel,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-03-20 15:41:53

We współczesnym Pythonie jest o wiele lepiej.

Oto prosty program dla dzieci, "hello.py": {]}

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

I program do interakcji z nim:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

To drukuje:

b'hello bob\n'
b'hello alice\n'

Zauważ, że rzeczywisty wzorzec, który jest również przez prawie wszystkie poprzednie odpowiedzi, zarówno tutaj, jak i w powiązanych pytaniach, polega na ustawieniu deskryptora pliku stdout na nieblokujący, a następnie przeszukiwaniu go w jakiejś pętli select. W dzisiejszych czasach taką pętlę zapewnia oczywiście asyncio.

 3
Author: user240515,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-05-09 02:12:08

Dodanie tej odpowiedzi tutaj, ponieważ zapewnia możliwość ustawienia nieblokujących rur w systemach Windows i Unix.

Wszystkie szczegóły ctypes są dzięki @techtonik ' s answer .

Istnieje nieco zmodyfikowana wersja, która może być używana zarówno w systemach Unix, jak i Windows.

  • Python3 compatible (potrzebna tylko drobna zmiana) .
  • zawiera wersję posix i definiuje wyjątek, którego można użyć dla każdego z nich.

W ten sposób możesz użyć tej samej funkcji i wyjątek dla kodu Unix i Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Aby uniknąć czytania niekompletnych danych, napisałem własny generator readline (który zwraca łańcuch bajtów dla każdej linii).

To generator, więc możesz na przykład...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)
 2
Author: ideasman42,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 11:54:58

Mam oryginalny problem pytającego, ale nie chciałem wywoływać wątków. Zmieszałem rozwiązanie Jesse ' ego z bezpośrednim read() z rury, i mój własny bufor-handler dla odczytów linii (jednak mój pod-proces-ping - zawsze pisał pełne linie

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

The watcher is

def watch(f, *other):
    print 'reading',f.read()
    return True

I główny program ustawia ping i następnie wywołuje pętlę poczty gobject.

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

Każda inna praca jest dołączona do wywołań zwrotnych w gobject.

 2
Author: Dave Kitchen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2021-01-08 13:27:33

Moduł select pomaga określić, gdzie znajduje się następne przydatne dane wejściowe.

Jednak prawie zawsze jesteś szczęśliwszy z oddzielnymi wątkami. Jeden robi blokowanie odczytać stdin, inny robi gdziekolwiek to jest nie chcesz zablokowany.

 1
Author: S.Lott,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2008-12-17 18:19:25

Dlaczego przeszkadza wątek & Kolejka? w przeciwieństwie do readline(), BufferedReader.read1() nie blokuje czekania na \r \ n, zwraca ASAP, jeśli pojawi się jakieś wyjście.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()
 1
Author: mfmain,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-01-20 01:41:15

W moim przypadku potrzebowałem modułu rejestrującego, który wychwytuje dane wyjściowe z aplikacji w tle i je rozszerza (Dodawanie znaczników czasu, kolorów itp.).

Skończyłem z wątkiem tła, który wykonuje rzeczywiste I / O. poniższy kod jest tylko dla platform POSIX. Rozebrałem zbędne części.

Jeśli ktoś ma zamiar używać tej bestii na długie przebiegi rozważ zarządzanie otwartymi deskryptorami. W moim przypadku nie był to duży problem.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
 1
Author: Dmytro,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-02-01 16:32:43

Mój problem jest nieco inny, ponieważ chciałem zebrać zarówno stdout i stderr z uruchomionego procesu, ale ostatecznie tak samo, ponieważ chciałem renderować wyjście w widżecie jako jego wygenerowany.

Nie chciałem uciekać się do wielu proponowanych obejść za pomocą kolejek lub dodatkowych wątków, ponieważ nie powinny być one konieczne do wykonania tak powszechnego zadania, jak uruchomienie innego skryptu i pobranie jego wyników.

Po zapoznaniu się z proponowanymi rozwiązaniami i pythonowymi dokumentami rozwiązałem problem z realizacją poniżej. Tak, działa tylko dla POSIX, ponieważ używam funkcji select.

Zgadzam się, że dokumenty są mylące, a implementacja jest niewygodna dla tak popularnego zadania skryptowego. Uważam, że starsze wersje Pythona mają różne wartości domyślne dla Popen i różne wyjaśnienia, co wywołało wiele zamieszania. Wydaje się, że działa to dobrze zarówno dla Pythona 2.7.12, jak i 3.5.2.

Kluczem było ustawienie bufsize=1 dla buforowania linii, a następnie universal_newlines=True przetworzyć jako plik tekstowy zamiast pliku binarnego, który wydaje się być domyślnym przy ustawieniu bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG i VERBOSE to po prostu makra, które drukują wyjście do terminala.

To rozwiązanie jest IMHO 99,99% skuteczne, ponieważ nadal używa funkcji blokowania readline, więc zakładamy, że proces podrzędny jest ładny i wyprowadza kompletne linie.

Z zadowoleniem przyjmuję opinie, aby ulepszyć rozwiązanie, ponieważ wciąż jestem nowy w Pythonie.

 1
Author: Brooke Wallace,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-06-27 22:01:20

Stworzyłem bibliotekę opartą na rozwiązaniu J. F. Sebastiana. Możesz go użyć.

Https://github.com/cenkalti/what

 0
Author: Cenk Alti,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:34:50

Bazując na odpowiedzi J. F. Sebastiana i kilku innych źródłach, stworzyłem prosty menedżer podprocesów. Zapewnia nieblokujący odczyt żądania, a także jednoczesne uruchamianie kilku procesów. Nie używa żadnego wywołania specyficznego dla systemu operacyjnego (o którym wiem) i dlatego powinien działać wszędzie.

Jest dostępny w pypi, więc po prostu pip install shelljob. Przykłady i pełne dokumenty można znaleźć na stronie projektu .

 0
Author: edA-qa mort-ora-y,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-10-31 10:09:25

EDIT: Ta implementacja nadal blokuje. Użyj zamiast tego odpowiedzi J. F. Sebastiana .

próbowałem odpowiedzi top , ale dodatkowe ryzyko i utrzymanie kodu wątku było niepokojące.

Przeglądając Moduł io (i będąc ograniczonym do 2.6), znalazłem BufferedReader. To moje bezgwintowe, nieblokujące rozwiązanie.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line
 0
Author: romc,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:02:56

Jest to przykład do uruchomienia poleceń interaktywnych w podprocesie, a stdout jest interaktywny za pomocą pseudo terminala. Możesz odnieść się do: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
 0
Author: Paco,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:02:56

To rozwiązanie wykorzystuje moduł select do "odczytu wszelkich dostępnych danych" ze strumienia IO. Ta funkcja blokuje początkowo dopóki dane nie będą dostępne, ale następnie odczytuje tylko dane, które są dostępne i nie blokuje dalej.

Biorąc pod uwagę fakt, że używa modułu select, działa to tylko na Uniksie.

Kod jest w pełni zgodny z PEP8.
import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
 0
Author: Bradley Odell,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-02 20:55:47

Spotkałem się również z problemem opisanym przez Jesse i rozwiązałem go używając "select" jako Bradley, Andy i inni zrobili to, ale w trybie blokowania, aby uniknąć zajętej pętli. Używa atrapy rury jako fałszywego stdin. Wybierz bloki i poczekaj, aż stdin lub rura będzie gotowa. Po naciśnięciu klawisza stdin odblokowuje select, a wartość klawisza może zostać pobrana za pomocą read(1). Gdy inny wątek pisze do rury, rura odblokowuje select i może być traktowana jako wskazanie, że zapotrzebowanie na stdin jest skończone. Oto kod referencyjny:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
 0
Author: gonzaedu61,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-07 08:09:33

Spróbuj wexpect , który jest alternatywą dla systemu windows dla pexpect .

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()
 0
Author: betontalpfa,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-12-07 14:13:37

Oto moduł, który obsługuje nieblokujące odczyty i zapisy w tle w Pythonie:

Https://pypi.python.org/pypi/python-nonblock

Dostarcza funkcję,

Nonblock_read, który odczyta dane ze strumienia, jeśli jest dostępny, w przeciwnym razie zwróci pusty łańcuch (lub żaden, jeśli strumień jest zamknięty po drugiej stronie i wszystkie możliwe dane zostały odczytane)

Możesz również rozważyć podproces2 Pythona moduł,

Https://pypi.python.org/pypi/python-subprocess2

Który dodaje do modułu podprocesowego. Tak więc na obiekcie zwróconym z " podprocesu.Popen " dodano dodatkową metodę, runInBackground. Spowoduje to uruchomienie wątku i zwrócenie obiektu, który zostanie automatycznie wypełniony, gdy materiał zostanie zapisany na stdout/stderr, bez blokowania głównego wątku.

Enjoy!

 -2
Author: Tim Savannah,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-01-16 21:47:15