How To Use Multiprocessing To Parallelize Two Calls To The Same Function, With Different Arguments, In A For Loop?
Solution 1:
Since I built this answer in patches, scroll down for the best solution to this problem
You need specify to exactly how you want things to run. As far as I can tell, you want two processes to run at most, but also at least. Also, you do not want the heavy call to hold up the fast ones. One simple non-optimal way to run is:
from multiprocessing import Process
def func(counter,somearg):
j = 0
for i in range(counter): j+=i
print(somearg)
def loop(counter,arglist):
for i in range(10):
func(counter,arglist[i])
heavy = Process(target=loop,args=[1000000,['heavy'+str(i) for i in range(10)]])
light = Process(target=loop,args=[500000,['light'+str(i) for i in range(10)]])
heavy.start()
light.start()
heavy.join()
light.join()
The output here is (for one example run):
light0
heavy0
light1
light2
heavy1
light3
light4
heavy2
light5
light6
heavy3
light7
light8
heavy4
light9
heavy5
heavy6
heavy7
heavy8
heavy9
You can see the last part is sub-optimal, since you have a sequence of heavy runs - which means there is one process instead of two.
An easy way to optimize this, if you can estimate how much longer is the heavy process running. If it's twice as slow, as here, just run 7 iterations of heavy first, join the light process, and have it run the additional 3.
Another way is to run the heavy process in pairs, so at first you have 3 processes until the fast process ends, and then continues with 2.
The main point is separating the heavy and light calls to another process entirely - so while the fast calls complete one after the other quickly you can work your slow stuff. Once th fast ends, it's up to you how elaborate do you want to continue, but I think for now estimating how to break up the heavy calls is good enough. This is it for my example:
from multiprocessing import Process
def func(counter,somearg):
j = 0
for i in range(counter): j+=i
print(somearg)
def loop(counter,amount,arglist):
for i in range(amount):
func(counter,arglist[i])
heavy1 = Process(target=loop,args=[1000000,7,['heavy1'+str(i) for i in range(7)]])
light = Process(target=loop,args=[500000,10,['light'+str(i) for i in range(10)]])
heavy2 = Process(target=loop,args=[1000000,3,['heavy2'+str(i) for i in range(7,10)]])
heavy1.start()
light.start()
light.join()
heavy2.start()
heavy1.join()
heavy2.join()
with output:
light0
heavy10
light1
light2
heavy11
light3
light4
heavy12
light5
light6
heavy13
light7
light8
heavy14
light9
heavy15
heavy27
heavy16
heavy28
heavy29
Much better utilization. You can of course make this more advanced by sharing a queue for the slow process runs, so when the fast are done they can join as workers on the slow queue, but for only two different calls this may be overkill (though not much harder using the queue). The best solution:
from multiprocessing import Queue,Process
import queue
deffunc(index,counter,somearg):
j = 0for i inrange(counter): j+=i
print("Worker",index,':',somearg)
defworker(index):
try:
whileTrue:
func,args = q.get(block=False)
func(index,*args)
except queue.Empty: pass
q = Queue()
for i inrange(10):
q.put((func,(500000,'light'+str(i))))
q.put((func,(1000000,'heavy'+str(i))))
nworkers = 2
workers = []
for i inrange(nworkers):
workers.append(Process(target=worker,args=(i,)))
workers[-1].start()
q.close()
for worker in workers:
worker.join()
This is the best and most scalable solution for what you want. Output:
Worker 0 :light0Worker 0 :light1Worker 1 :heavy0Worker 1 :light2Worker 0 :heavy1Worker 0 :light3Worker 1 :heavy2Worker 1 :light4Worker 0 :heavy3Worker 0 :light5Worker 1 :heavy4Worker 1 :light6Worker 0 :heavy5Worker 0 :light7Worker 1 :heavy6Worker 1 :light8Worker 0 :heavy7Worker 0 :light9Worker 1 :heavy8Worker 0 :heavy9
Solution 2:
You might want to use a multiprocessing.Pool
of processes and map your myFunc
into it, like so:
from multiprocessing import Pool
import time
defmyFunc(arg1, arg2):
if arg1:
print ('do something that does not take too long')
time.sleep(0.01)
else:
print ('do something that takes long')
time.sleep(1)
defwrap(args):
return myFunc(*args)
if __name__ == "__main__":
p = Pool()
argStorage = [(True, False), (False, True)] * 12
p.map(wrap, argStorage)
I added a wrap
function, since the function passed to p.map
must accept a single argument. You could just as well adapt myFunc
to accept a tuple, if that's possible in your case.
My sample appStorage
constists of 24 items, where 12 of them will take 1sec to process, and 12 will be done in 10ms. In total, this script runs in 3-4 seconds (I have 4 cores).
Solution 3:
One possible implementation could be as follow:
import concurrent.futures
import math
list_of_args = [arg1, arg2]
defmy_func(arg):
....
print ('do something that takes long')
defmain():
with concurrent.futures.ProcessPoolExecutor() as executor:
for arg, result inzip(list_of_args, executor.map(is_prime, list_of_args)):
print('my_func({0}) => {1}'.format(arg, result))
executor.map
is like the built in function, the map method allows multiple calls to a provided function, passing each of the items in an iterable to that function.
Post a Comment for "How To Use Multiprocessing To Parallelize Two Calls To The Same Function, With Different Arguments, In A For Loop?"