Skip to content Skip to sidebar Skip to footer

Iterable Multiprocessing Queue Not Exiting

import multiprocessing.queues as queues import multiprocessing class I(queues.Queue): def __init__(self, maxsize=0): super(I, self).__init__(maxsize) self.lengt

Solution 1:

The problem is that you're treating 'Done' as a special-case item in the Queue, which indicates that the iteration should stop. So, if you iterate over the Queue using a for loop with your example, all that will be returned is 1. However, you're claiming that the length of the Queue is 2. This is screwing up the map code, which is relying on that length to accurately represent the number of items in the iterable in order to know when all the results have returned from the workers:

class MapResult(ApplyResult):

    def __init__(self, cache, chunksize, length, callback):
        ApplyResult.__init__(self, cache, callback)
        ...
        # _number_left is used to know when the MapResult is done
        self._number_left = length//chunksize + bool(length % chunksize)

So, you need to make the length actually be accurate. You can do that a few ways, but I would recommend not requiring a sentinel to be loaded into the Queue at all, and use get_nowait instead:

import multiprocessing.queues as queues
import multiprocessing
from Queue import Empty

class I(queues.Queue):
    def __init__(self, maxsize=0):
        super(I, self).__init__(maxsize)
        self.length = 0 

    ... <snip>

    def next(self):
        try:
            item = self.get_nowait()
        except Empty:
            raise StopIteration
        return item


def thisworker(item):
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)

the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)

Also, note that this approach isn't process safe. The length attribute will only be correct if you only put into the Queue from a single process, and then never put again after sending the Queue to a worker process. It also won't work in Python 3 without adjusting the imports and implementation, because the constructor for multiprocessing.queues.Queue has changed.

Instead of subclassing multiprocessing.queues.Queue, I would recommend using the iter built-in to iterate over the Queue:

q = multiprocessing.Queue()
q.put(1)
q.put(2)
q.put(None)  # None is our sentinel, you could use 'Done', if you wanted
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned

This will work on all versions of Python, is much less code, and is process-safe.

Edit:

Based on the requirements you mentioned in the comment to my answer, I think you're better off using imap instead of map, so that you don't need to know the length of the Queue at all. The reality is, you can't accurately determine that, and in fact the length may end up growing as you're iterating. If you use imap exclusively, then doing something similar to your original approach will work fine:

import multiprocessing

class I(object):
    def __init__(self, maxsize=0):
        self.q = multiprocessing.Queue(maxsize)

    def __getattr__(self, attr):
        if hasattr(self.q, attr):
            return getattr(self.q, attr)

    def __iter__(self):
        return self

    def next(self):
        item = self.q.get()
        if item == 'Done':
            raise StopIteration
        return item


def thisworker(item):
    if item == 1:
        q.put(3)
    if item == 2:
        q.put('Done')
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)
q.put(2)
q.put(5)

the_pool = multiprocessing.Pool(2)  # 2 workers
print list(the_pool.imap(thisworker, q))

Output:

got this item: 1
got this item: 5
got this item: 3
got this item: 2
[1, 2, 5, 3]

I got rid of the code that worried about the length, and used delegation instead of inheritance, for better Python 3.x compatibility.

Note that my original suggestion, to use iter(q.get, <sentinel>), still works here, too, as long as you use imap instead of map.


Post a Comment for "Iterable Multiprocessing Queue Not Exiting"