Nieblokujący odczyt podprocesu.Rura w Pythonie
Używam modułu podprocesu aby uruchomić podproces i połączyć się z jego strumieniem wyjściowym (standardowe wyjście). Chcę móc wykonywać odczyty bez blokowania na standardowym wyjściu. Jest jakiś sposób .readline non-blocking lub aby sprawdzić, czy w strumieniu są dane przed wywołaniem .readline
? Chciałbym, żeby to było przenośne lub przynajmniej działało pod Windowsem i Linuksem.
Oto Jak to na razie robię (blokuje się na .readline
jeśli brak danych jest dostępne):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
29 answers
fcntl
, select
, asyncproc
to nie pomoże w tej sprawie.
Niezawodnym sposobem odczytu strumienia bez blokowania niezależnie od systemu operacyjnego jest użycie Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-06 15:02:06
Często miałem podobny problem; programy Pythona, które często piszę, muszą mieć możliwość wykonywania niektórych podstawowych funkcji, jednocześnie akceptując dane wejściowe użytkownika z wiersza poleceń (stdin). Po prostu umieszczenie funkcji obsługi danych wejściowych użytkownika w innym wątku nie rozwiązuje problemu, ponieważ readline()
blokuje i nie ma limitu czasu. Jeśli podstawowa funkcjonalność jest kompletna i nie ma już potrzeby oczekiwania na dalsze wejście użytkownika, zazwyczaj chcę, aby mój program zakończył działanie, ale nie może, ponieważ readline()
nadal blokuje w innym wątku czekając na linię. Rozwiązaniem, które znalazłem na ten problem, jest uczynienie stdin nieblokującym plikiem za pomocą modułu fcntl:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
Moim zdaniem jest to nieco czystsze niż użycie modułów select lub signal do rozwiązania tego problemu, ale z drugiej strony działa tylko na Unixie...
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-11-20 02:41:51
Python 3.4 wprowadza nowe tymczasowe API dla asynchronicznego IO -- asyncio
Moduł .
Podejście jest podobne do twisted
-na podstawie odpowiedzi @Bryan Ward -- define a protocol and its methods are called as soon as data is ready:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
Zobacz "podproces" w dokumentach .
Istnieje interfejs wysokiego poziomu asyncio.create_subprocess_exec()
, który zwraca Process
obiekty, które pozwalają na odczyt linii asynchronicznej za pomocą StreamReader.readline()
coroutine
(z async
/await
Python 3.5 + składnia):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
wykonuje następujące zadania:
- Uruchom podproces, przekieruj jego stdout do rury
- odczyt linii z podprocesu ' stdout asynchronicznie
- Zabij podproces
- poczekaj aż wyjdzie
Każdy krok może być ograniczony czasem sekund, jeśli to konieczne.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:44
Wypróbuj moduł asyncproc . Na przykład:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
Moduł zajmuje się wszystkimi wątkami zgodnie z sugestią S. Lotta.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-03-09 19:31:53
Można to zrobić naprawdę łatwo w Twisted . W zależności od istniejącej bazy kodu, może to nie być takie łatwe w użyciu, ale jeśli budujesz pokręconą aplikację, to rzeczy takie jak ta stają się prawie trywialne. Tworzysz klasę ProcessProtocol
i nadpisujesz metodę outReceived()
. Twisted (w zależności od zastosowanego reaktora) to zwykle duża pętla select()
z zainstalowanymi wywołaniami zwrotnymi do obsługi danych z różnych deskryptorów plików (często gniazd sieciowych). Tak więc metoda outReceived()
jest po prostu instalacją oddzwanianie do obsługi danych pochodzących z STDOUT
. Prosty przykład demonstrujący to zachowanie jest następujący:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
Twisted documentation ma na ten temat kilka dobrych informacji.
Jeśli zbudujesz całą aplikację wokół Twisted, sprawi to, że asynchroniczna komunikacja z innymi procesami, lokalnymi lub zdalnymi, będzie naprawdę elegancka. Z drugiej strony, jeśli twój program nie jest zbudowany na Twisted, nie będzie to tak naprawdę pomocne. Mam nadzieję, że to może być pomocne dla innych czytelników, nawet jeśli nie ma zastosowania do konkretnej aplikacji.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-05-14 01:26:17
Użyj select & read (1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Dla readline () - jak:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-01-26 01:02:08
Jednym z rozwiązań jest zrobienie innego procesu, aby wykonać odczyt procesu lub zrobić wątek procesu z limitem czasu.
Oto gwintowana wersja funkcji timeout:
Http://code.activestate.com/recipes/473878/
Jednak, czy trzeba czytać stdout jak to nadchodzi? Innym rozwiązaniem może być zrzut danych wyjściowych do pliku i oczekiwanie na zakończenie procesu za pomocą p. wait () .
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2008-12-17 18:16:48
Zastrzeżenie: to działa tylko dla tornado
Możesz to zrobić, ustawiając FD na nieblokujący, a następnie użyj ioloop do rejestracji wywołań zwrotnych. Zapakowałem to w jajko o nazwie tornado_subprocess i można go zainstalować przez PyPI:
easy_install tornado_subprocess
Teraz możesz zrobić coś takiego:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
Możesz go również użyć z RequestHandler
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-07-06 12:42:26
Oto Mój kod, używany do przechwytywania każdego wyjścia z podprocesu, w tym częściowych linii. Pompuje w tym samym czasie i stdout i stderr w prawie prawidłowej kolejności.
Przetestowano i poprawnie działa na Pythonie 2.7 linux & windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-06-11 19:09:11
Dodaję ten problem, aby przeczytać jakiś podproces.Popen stdout. Oto moje rozwiązanie nie blokujące odczytu:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-04-21 20:51:57
Istniejące rozwiązania nie działały dla mnie (szczegóły poniżej). Ostatecznie udało się zaimplementować readline używając read(1) (na podstawie tej odpowiedzi ). Ten ostatni nie blokuje:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Dlaczego istniejące rozwiązania nie działały:
- rozwiązania wymagające readline (w tym rozwiązania oparte na kolejkach) zawsze blokują się. Jest to trudne(niemożliwe?), aby zabić wątek, który wykonuje readline. Ginie tylko wtedy, gdy zakończy się proces, który go stworzył, ale nie wtedy, gdy wyjście-proces produkcji jest zabity.
- mieszanie niskopoziomowych wywołań fcntl z wysokopoziomowymi wywołaniami readline może nie działać poprawnie, jak zauważył anonnn.
- użycie select.poll () jest schludne, ale nie działa na Windows zgodnie z pythonowymi dokumentami.
- używanie bibliotek innych firm wydaje się przesadą dla tego zadania i dodaje dodatkowe zależności.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:02:56
Na systemach uniksopodobnych i Pythonie 3.5 + jest os.set_blocking
co robi dokładnie to, co mówi.
import os
import time
import subprocess
cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
# first iteration always produces empty byte string in non-blocking mode
for i in range(2):
line = p.stdout.readline()
print(i, line)
time.sleep(0.5)
if time.time() > start + 5:
break
p.terminate()
To wyjście:
1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'
Z os.set_blocking
skomentował (a):
0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-12-11 17:51:31
Ta wersja nieblokującego odczytu nie wymaga specjalnych modułów i będzie działać po wyjęciu z pudełka na większości dystrybucji Linuksa.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-05-24 15:55:16
Oto proste rozwiązanie oparte na wątkach, które:
- działa zarówno na Linuksie, jak i Windows(nie polegając na
select
). - odczytuje zarówno
stdout
jak istderr
asynchronicznie. - nie polega na aktywnej ankiecie z dowolnym czasem oczekiwania (przyjazny dla procesora).
- nie używa
asyncio
(co może kolidować z innymi bibliotekami). - uruchamia się, dopóki proces potomny nie zakończy się.
printer.py
import time
import sys
sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)
reader.py
import queue
import subprocess
import sys
import threading
def enqueue_stream(stream, queue, type):
for line in iter(stream.readline, b''):
queue.put(str(type) + line.decode('utf-8'))
stream.close()
def enqueue_process(process, queue):
process.wait()
queue.put('x')
p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()
while True:
line = q.get()
if line[0] == 'x':
break
if line[0] == '2': # stderr
sys.stdout.write("\033[0;31m") # ANSI red color
sys.stdout.write(line[1:])
if line[0] == '2':
sys.stdout.write("\033[0m") # reset ANSI code
sys.stdout.flush()
tp.join()
to.join()
te.join()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-03-20 15:41:53
We współczesnym Pythonie jest o wiele lepiej.
Oto prosty program dla dzieci, "hello.py": {]}
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
I program do interakcji z nim:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
To drukuje:
b'hello bob\n'
b'hello alice\n'
Zauważ, że rzeczywisty wzorzec, który jest również przez prawie wszystkie poprzednie odpowiedzi, zarówno tutaj, jak i w powiązanych pytaniach, polega na ustawieniu deskryptora pliku stdout na nieblokujący, a następnie przeszukiwaniu go w jakiejś pętli select. W dzisiejszych czasach taką pętlę zapewnia oczywiście asyncio.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-05-09 02:12:08
Dodanie tej odpowiedzi tutaj, ponieważ zapewnia możliwość ustawienia nieblokujących rur w systemach Windows i Unix.
Wszystkie szczegóły ctypes
są dzięki @techtonik ' s answer .
Istnieje nieco zmodyfikowana wersja, która może być używana zarówno w systemach Unix, jak i Windows.
- Python3 compatible (potrzebna tylko drobna zmiana) .
- zawiera wersję posix i definiuje wyjątek, którego można użyć dla każdego z nich.
W ten sposób możesz użyć tej samej funkcji i wyjątek dla kodu Unix i Windows.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
Aby uniknąć czytania niekompletnych danych, napisałem własny generator readline (który zwraca łańcuch bajtów dla każdej linii).
To generator, więc możesz na przykład...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 11:54:58
Mam oryginalny problem pytającego, ale nie chciałem wywoływać wątków. Zmieszałem rozwiązanie Jesse ' ego z bezpośrednim read()
z rury, i mój własny bufor-handler dla odczytów linii (jednak mój pod-proces-ping - zawsze pisał pełne linie
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
The watcher is
def watch(f, *other):
print 'reading',f.read()
return True
I główny program ustawia ping i następnie wywołuje pętlę poczty gobject.
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
Każda inna praca jest dołączona do wywołań zwrotnych w gobject.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2021-01-08 13:27:33
Moduł select pomaga określić, gdzie znajduje się następne przydatne dane wejściowe.
Jednak prawie zawsze jesteś szczęśliwszy z oddzielnymi wątkami. Jeden robi blokowanie odczytać stdin, inny robi gdziekolwiek to jest nie chcesz zablokowany.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2008-12-17 18:19:25
Dlaczego przeszkadza wątek & Kolejka? w przeciwieństwie do readline(), BufferedReader.read1() nie blokuje czekania na \r \ n, zwraca ASAP, jeśli pojawi się jakieś wyjście.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-01-20 01:41:15
W moim przypadku potrzebowałem modułu rejestrującego, który wychwytuje dane wyjściowe z aplikacji w tle i je rozszerza (Dodawanie znaczników czasu, kolorów itp.).
Skończyłem z wątkiem tła, który wykonuje rzeczywiste I / O. poniższy kod jest tylko dla platform POSIX. Rozebrałem zbędne części.
Jeśli ktoś ma zamiar używać tej bestii na długie przebiegi rozważ zarządzanie otwartymi deskryptorami. W moim przypadku nie był to duży problem.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-02-01 16:32:43
Mój problem jest nieco inny, ponieważ chciałem zebrać zarówno stdout i stderr z uruchomionego procesu, ale ostatecznie tak samo, ponieważ chciałem renderować wyjście w widżecie jako jego wygenerowany.
Nie chciałem uciekać się do wielu proponowanych obejść za pomocą kolejek lub dodatkowych wątków, ponieważ nie powinny być one konieczne do wykonania tak powszechnego zadania, jak uruchomienie innego skryptu i pobranie jego wyników.
Po zapoznaniu się z proponowanymi rozwiązaniami i pythonowymi dokumentami rozwiązałem problem z realizacją poniżej. Tak, działa tylko dla POSIX, ponieważ używam funkcji select
.
Zgadzam się, że dokumenty są mylące, a implementacja jest niewygodna dla tak popularnego zadania skryptowego. Uważam, że starsze wersje Pythona mają różne wartości domyślne dla Popen
i różne wyjaśnienia, co wywołało wiele zamieszania. Wydaje się, że działa to dobrze zarówno dla Pythona 2.7.12, jak i 3.5.2.
Kluczem było ustawienie bufsize=1
dla buforowania linii, a następnie universal_newlines=True
przetworzyć jako plik tekstowy zamiast pliku binarnego, który wydaje się być domyślnym przy ustawieniu bufsize=1
.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG i VERBOSE to po prostu makra, które drukują wyjście do terminala.
To rozwiązanie jest IMHO 99,99% skuteczne, ponieważ nadal używa funkcji blokowania readline
, więc zakładamy, że proces podrzędny jest ładny i wyprowadza kompletne linie.
Z zadowoleniem przyjmuję opinie, aby ulepszyć rozwiązanie, ponieważ wciąż jestem nowy w Pythonie.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-06-27 22:01:20
Stworzyłem bibliotekę opartą na rozwiązaniu J. F. Sebastiana. Możesz go użyć.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:34:50
Bazując na odpowiedzi J. F. Sebastiana i kilku innych źródłach, stworzyłem prosty menedżer podprocesów. Zapewnia nieblokujący odczyt żądania, a także jednoczesne uruchamianie kilku procesów. Nie używa żadnego wywołania specyficznego dla systemu operacyjnego (o którym wiem) i dlatego powinien działać wszędzie.
Jest dostępny w pypi, więc po prostu pip install shelljob
. Przykłady i pełne dokumenty można znaleźć na stronie projektu .
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-10-31 10:09:25
EDIT: Ta implementacja nadal blokuje. Użyj zamiast tego odpowiedzi J. F. Sebastiana .
próbowałem odpowiedzi top , ale dodatkowe ryzyko i utrzymanie kodu wątku było niepokojące.
Przeglądając Moduł io (i będąc ograniczonym do 2.6), znalazłem BufferedReader. To moje bezgwintowe, nieblokujące rozwiązanie.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:02:56
Jest to przykład do uruchomienia poleceń interaktywnych w podprocesie, a stdout jest interaktywny za pomocą pseudo terminala. Możesz odnieść się do: https://stackoverflow.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:02:56
To rozwiązanie wykorzystuje moduł select
do "odczytu wszelkich dostępnych danych" ze strumienia IO. Ta funkcja blokuje początkowo dopóki dane nie będą dostępne, ale następnie odczytuje tylko dane, które są dostępne i nie blokuje dalej.
Biorąc pod uwagę fakt, że używa modułu select
, działa to tylko na Uniksie.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-02 20:55:47
Spotkałem się również z problemem opisanym przez Jesse i rozwiązałem go używając "select" jako Bradley, Andy i inni zrobili to, ale w trybie blokowania, aby uniknąć zajętej pętli. Używa atrapy rury jako fałszywego stdin. Wybierz bloki i poczekaj, aż stdin lub rura będzie gotowa. Po naciśnięciu klawisza stdin odblokowuje select, a wartość klawisza może zostać pobrana za pomocą read(1). Gdy inny wątek pisze do rury, rura odblokowuje select i może być traktowana jako wskazanie, że zapotrzebowanie na stdin jest skończone. Oto kod referencyjny:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-07 08:09:33
Spróbuj wexpect , który jest alternatywą dla systemu windows dla pexpect .
import wexpect
p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.') // regex pattern of any character
output_str = p.after()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-12-07 14:13:37
Oto moduł, który obsługuje nieblokujące odczyty i zapisy w tle w Pythonie:
Https://pypi.python.org/pypi/python-nonblock
Dostarcza funkcję,
Nonblock_read, który odczyta dane ze strumienia, jeśli jest dostępny, w przeciwnym razie zwróci pusty łańcuch (lub żaden, jeśli strumień jest zamknięty po drugiej stronie i wszystkie możliwe dane zostały odczytane)
Możesz również rozważyć podproces2 Pythona moduł,
Https://pypi.python.org/pypi/python-subprocess2
Który dodaje do modułu podprocesowego. Tak więc na obiekcie zwróconym z " podprocesu.Popen " dodano dodatkową metodę, runInBackground. Spowoduje to uruchomienie wątku i zwrócenie obiektu, który zostanie automatycznie wypełniony, gdy materiał zostanie zapisany na stdout/stderr, bez blokowania głównego wątku.
Enjoy!
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-01-16 21:47:15