Iterable Multiprocessing Queue Not Exiting
Solution 1:
The problem is that you're treating 'Done'
as a special-case item in the Queue
, which indicates that the iteration should stop. So, if you iterate over the Queue
using a for loop with your example, all that will be returned is 1
. However, you're claiming that the length of the Queue
is 2. This is screwing up the map
code, which is relying on that length to accurately represent the number of items in the iterable in order to know when all the results have returned from the workers:
class MapResult(ApplyResult):
def __init__(self, cache, chunksize, length, callback):
ApplyResult.__init__(self, cache, callback)
...
# _number_left is used to know when the MapResult is done
self._number_left = length//chunksize + bool(length % chunksize)
So, you need to make the length actually be accurate. You can do that a few ways, but I would recommend not requiring a sentinel to be loaded into the Queue
at all, and use get_nowait
instead:
import multiprocessing.queues as queues
import multiprocessing
from Queue import Empty
class I(queues.Queue):
def __init__(self, maxsize=0):
super(I, self).__init__(maxsize)
self.length = 0
... <snip>
def next(self):
try:
item = self.get_nowait()
except Empty:
raise StopIteration
return item
def thisworker(item):
print 'got this item: %s' % item
return item
q=I()
q.put(1)
the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)
Also, note that this approach isn't process safe. The length
attribute will only be correct if you only put
into the Queue
from a single process, and then never put
again after sending the Queue
to a worker process. It also won't work in Python 3 without adjusting the imports and implementation, because the constructor for multiprocessing.queues.Queue
has changed.
Instead of subclassing multiprocessing.queues.Queue
, I would recommend using the iter
built-in to iterate over the Queue
:
q = multiprocessing.Queue()
q.put(1)
q.put(2)
q.put(None) # None is our sentinel, you could use 'Done', if you wanted
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned
This will work on all versions of Python, is much less code, and is process-safe.
Edit:
Based on the requirements you mentioned in the comment to my answer, I think you're better off using imap
instead of map
, so that you don't need to know the length of the Queue
at all. The reality is, you can't accurately determine that, and in fact the length may end up growing as you're iterating. If you use imap
exclusively, then doing something similar to your original approach will work fine:
import multiprocessing
class I(object):
def __init__(self, maxsize=0):
self.q = multiprocessing.Queue(maxsize)
def __getattr__(self, attr):
if hasattr(self.q, attr):
return getattr(self.q, attr)
def __iter__(self):
return self
def next(self):
item = self.q.get()
if item == 'Done':
raise StopIteration
return item
def thisworker(item):
if item == 1:
q.put(3)
if item == 2:
q.put('Done')
print 'got this item: %s' % item
return item
q=I()
q.put(1)
q.put(2)
q.put(5)
the_pool = multiprocessing.Pool(2) # 2 workers
print list(the_pool.imap(thisworker, q))
Output:
got this item: 1
got this item: 5
got this item: 3
got this item: 2
[1, 2, 5, 3]
I got rid of the code that worried about the length, and used delegation instead of inheritance, for better Python 3.x compatibility.
Note that my original suggestion, to use iter(q.get, <sentinel>)
, still works here, too, as long as you use imap
instead of map
.
Post a Comment for "Iterable Multiprocessing Queue Not Exiting"