Skip to content Skip to sidebar Skip to footer

Starmap Combined With Tqdm?

I am doing some parallel processing, as follows: with mp.Pool(8) as tmpPool: results = tmpPool.starmap(my_function, inputs) where inputs look like: [(1,0.2312),(5,0.52

Solution 1:

It's not possible with starmap(), but it's possible with a patch adding Pool.istarmap(). It's based on the code for imap(). All you have to do, is create the istarmap.py-file and import the module to apply the patch before you make your regular multiprocessing-imports.

Python <3.8

# istarmap.py for Python <3.8import multiprocessing.pool as mpp


defistarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """if self._state != mpp.RUN:
        raise ValueError("Pool not running")

    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self._cache)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Python 3.8+

# istarmap.py for Python 3.8+import multiprocessing.pool as mpp


defistarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    self._check_running()
    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Then in your script:

import istarmap  # import to apply patchfrom multiprocessing import Pool
import tqdm    


deffoo(a, b):
    for _ inrange(int(50e6)):
        passreturn a, b    


if __name__ == '__main__':

    with Pool(4) as pool:
        iterable = [(i, 'x') for i inrange(10)]
        for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
                           total=len(iterable)):
            pass

Solution 2:

The simplest way would probably be to apply tqdm() around the inputs, rather than the mapping function. For example:

inputs = zip(param1, param2, param3)
with mp.Pool(8) as pool:
    results = pool.starmap(my_function, tqdm.tqdm(inputs, total=len(inputs)))

Solution 3:

As Darkonaut mentioned, as of this writing there's no istarmap natively available. If you want to avoid patching, you can add a simple *_star function as a workaround. (This solution inspired by this tutorial.)

import tqdm
import multiprocessing

defmy_function(arg1, arg2, arg3):
  return arg1 + arg2 + arg3

defmy_function_star(args):
    return my_function(*args)

jobs = 4with multiprocessing.Pool(jobs) as pool:
    args = [(i, i, i) for i inrange(10000)]
    results = list(tqdm.tqdm(pool.imap(my_function_star, args), total=len(args))

Some notes:

I also really like corey's answer. It's cleaner, though the progress bar does not appear to update as smoothly as my answer. Note that corey's answer is several orders of magnitude faster with the code I posted above with chunksize=1 (default). I'm guessing this is due to multiprocessing serialization, because increasing chunksize (or having a more expensive my_function) makes their runtime comparable.

I went with my answer for my application since my serialization/function cost ratio was very low.

Solution 4:

The temporary solution: rewriting the method to-be-parallelized with imap.

Post a Comment for "Starmap Combined With Tqdm?"