Multiprocessing - Cancel Remaining Jobs In A Pool Without Destroying The Pool
Solution 1:
It's time for the fundamental theorem of software engineering: while multiprocessing.Pool
doesn't supply cancellation as a feature, we can add it by having a Pool
read from a carefully crafted iterable. It's not enough, however, to have a generator that yield
s values from a list but stops short on some signal, because the Pool
eagerly drains any generator given to it. So we need a very carefully crafted iterable.
A lazy Pool
The generic tool we need is a way to construct tasks for a Pool
only when a worker becomes available (or at most one task ahead, in case constructing them takes significant time). The basic idea is to slow down the thread collecting work for the Pool
with a semaphore upped only when a task is finished. (We know such a thread exists from the observable behavior of imap_unordered
.)
import multiprocessing
from threading import Semaphore
size=multiprocessing.cpu_count() # or whatever Pool size to use# How many workers are waiting for work? Add one to buffer one task.
work=Semaphore(size)
deffeed0(it):
it=iter(it)
try:
whileTrue:
# Don't ask the iterable until we have a customer, in case better# instructions become available:
work.acquire()
yieldnext(it)
except StopIteration: pass
work.release()
deffeed(p,f,it):
import sys,traceback
iu=p.imap_unordered(f,feed0(it))
whileTrue:
try: x=next(iu)
except StopIteration: returnexcept Exception: traceback.print_exception(*sys.exc_info())
work.release()
yield x
The try
in feed
prevents failures in the children from breaking the semaphore's count, but note that it does not protect against failures in the parent.
A cancelable iterator
Now that we have real-time control over the Pool
input, making whatever scheduling policy is straightforward. For example, here's something like itertools.chain
but with the ability to asynchronously discard any remaining elements from one of the input sequences:
import collections,queue
classCancel:
closed=False
cur=()
def__init__(self): self.data=queue.Queue() # of dequesdefadd(self,d):
d=collections.deque(d)
self.data.put(d)
return d
def__iter__(self):
whileTrue:
try: yield self.cur.popleft()
except IndexError:
self.cur=self.data.get()
if self.cur isNone: break @staticmethoddefcancel(d): d.clear()
defclose(self): self.data.put(None)
This is thread-safe (in CPython at least) despite the lack of locking because operations like deque.clear
are atomic with respect to Python inspection (and we don't separately check whether self.cur
is empty).
Usage
Making one of these looks like
pool=mp.Pool(size)
can=Cancel()
many=can.add(range(1000))
few=can.add(["some","words"])
can.close()
for x in feed(pool,assess_happiness,can):
if happy_with(x): can.cancel(many) # straight onto few, then out
where of course the add
s and close
could themselves be in the loop.
Solution 2:
The multiprocessing
module doesn't seem to have the concept of cancellation. You can use the concurrent.futures.ProcessPoolExecutor
wrapper and cancel the pending futures when you have enough results.
Here's an example that picks out 10 JPEGs from a set of paths, and cancels pending futures while leaving the process pool usable afterwards:
import concurrent.futures
definteresting_path(path):
"""Gives path if is a JPEG else ``None``."""withopen(path, 'rb') as f:
if f.read(3) == b'\xff\xd8\xff':
return path
returnNonedeffind_interesting(paths, count=10):
"""Yields count from paths which are 'interesting' by multiprocess task."""with concurrent.futures.ProcessPoolExecutor() as pool:
futures = {pool.submit(interesting_path, p) for p in paths}
print ('Started {}'.format(len(futures)))
for future in concurrent.futures.as_completed(futures):
res = future.result()
futures.remove(future)
if res isnotNone:
yield res
count -= 1if count == 0:
break
cancelled = 0for future in futures:
cancelled += future.cancel()
print ('Cancelled {}'.format(cancelled))
concurrent.futures.wait(futures)
# Can still use pool here for more processing as needed
Note that picking how to break up work into futures is still tricky, a bigger set is more overhead but can also mean less wasted work. This can also be adapted to Python 3.6 async syntax easily enough.
Post a Comment for "Multiprocessing - Cancel Remaining Jobs In A Pool Without Destroying The Pool"