Read Subprocess Stdout And Stderr Concurrently
Solution 1:
I did that once.. here is some old code I wrote
classProcess_Communicator():
defjoin(self):
self.te.join()
self.to.join()
self.running = False
self.aggregator.join()
self.ti.join()
defenqueue_in(self):
while self.running and self.p.stdin isnotNone:
whilenot self.stdin_queue.empty():
s = self.stdin_queue.get()
self.p.stdin.write(str(s) + '\n\r')
passdefenqueue_output(self):
ifnot self.p.stdout or self.p.stdout.closed:
return
out = self.p.stdout
for line initer(out.readline, b''):
self.qo.put(line)
# out.flush()defenqueue_err(self):
ifnot self.p.stderr or self.p.stderr.closed:
return
err = self.p.stderr
for line initer(err.readline, b''):
self.qe.put(line)
defaggregate(self):
while (self.running):
self.update()
self.update()
defupdate(self):
line = ""try:
while self.qe.not_empty:
line = self.qe.get_nowait() # or q.get(timeout=.1)
self.unbblocked_err += line
except Queue.Empty:
pass
line = ""try:
while self.qo.not_empty:
line = self.qo.get_nowait() # or q.get(timeout=.1)
self.unbblocked_out += line
except Queue.Empty:
passwhilenot self.stdin_queue.empty():
s = self.stdin_queue.get()
self.p.stdin.write(str(s))
defget_stdout(self, clear=True):
ret = self.unbblocked_out
if clear:
self.unbblocked_out = ""return ret
defhas_stdout(self):
ret = self.get_stdout(False)
if ret == '':
returnNoneelse:
return ret
defget_stderr(self, clear=True):
ret = self.unbblocked_out
if clear:
self.unbblocked_out = ""return ret
defhas_stderr(self):
ret = self.get_stdout(False)
if ret == '':
returnNoneelse:
return ret
def__init__(self, subp):
'''This is a simple class that collects and aggregates the
output from a subprocess so that you can more reliably use
the class without having to block for subprocess.communicate.'''
self.p = subp
self.unbblocked_out = ""
self.unbblocked_err = ""
self.running = True
self.qo = Queue.Queue()
self.to = threading.Thread(name="out_read",
target=self.enqueue_output,
args=())
self.to.daemon = True# thread dies with the program
self.to.start()
self.qe = Queue.Queue()
self.te = threading.Thread(name="err_read",
target=self.enqueue_err,
args=())
self.te.daemon = True# thread dies with the program
self.te.start()
self.stdin_queue = Queue.Queue()
self.aggregator = threading.Thread(name="aggregate",
target=self.aggregate,
args=())
self.aggregator.daemon = True# thread dies with the program
self.aggregator.start()
pass
You may not need the whole example, but feel free to cut copy and paste what you need. It's also important to show how I did the threading.
Solution 2:
The code looks more complicated than the task requires. I don't see why do you need to call process.poll()
or queue.get_nowait()
here. To deliver subprocess' stdout/stderr to several sinks; you could start with teed_call()
that accepts arbitrary file-like objects: you could pass logfiles and special file-like objects that accumulates errors
in theirs .write()
methods.
To fix your code with minimal changes; you should call .join()
on the reader threads (even if process.poll()
is notNone
i.e., the subprocess exited; there could be some pending output. Joining reader's threads ensures that all output is read).
Post a Comment for "Read Subprocess Stdout And Stderr Concurrently"