Skip to content Skip to sidebar Skip to footer

Two Functions In Parallel With Multiple Arguments And Return Values

I've got two separate functions. Each of them takes quite a long time to execute. def function1(arg): do_some_stuff_here return result1 def function2(arg1, arg2, arg3):

Solution 1:

First of all, Process, Pool and Queue all have different use case.

Process is used to spawn a process by creating the Process object.

from multiprocessing import Process

defmethod1():
    print"in method1"print"in method1"defmethod2():
    print"in method2"print"in method2"

p1 = Process(target=method1) # create a process object p1
p1.start()                   # starts the process p1
p2 = Process(target=method2)
p2.start()

Pool is used to parallelize execution of function across multiple input values.

from multiprocessing import Pool

defmethod1(x):
    print x
    print x**2return x**2

p = Pool(3)
result = p.map(method1, [1,4,9]) 
print result          # prints [1, 16, 81]

Queue is used to communicate between processes.

from multiprocessing import Process, Queue

defmethod1(x, l1):
    print"in method1"print"in method1"
    l1.put(x**2)
    return x

defmethod2(x, l2):
    print"in method2"print"in method2"
    l2.put(x**3)
    return x

l1 = Queue()
p1 = Process(target=method1, args=(4, l1, ))  
l2 = Queue()
p2 = Process(target=method2, args=(2, l2, )) 
p1.start()   
p2.start()      
print l1.get()          # prints 16print l2.get()          # prints 8

Now, for your case you can use Process & Queue(3rd method) or you can manipulate the pool method to work (below)

import itertools
from multiprocessing import Pool
import sys

defmethod1(x):         
    print x
    print x**2return x**2defmethod2(x):        
    print x
    print x**3return x**3defunzip_func(a, b):  
    return a, b    

defdistributor(option_args):
    option, args = unzip_func(*option_args)    # unzip option and args 

    attr_name = "method" + str(option)            
    # creating attr_name depending on option argument

    value = getattr(sys.modules[__name__], attr_name)(args) 
    # call the function with name 'attr_name' with argument argsreturn value


option_list = [1,2]      # for selecting the method number
args_list = [4,2]        
# list of arg for the corresponding method, (argument 4 is for method1)

p = Pool(3)              # creating pool of 3 processes

result = p.map(distributor, itertools.izip(option_list, args_list)) 
# calling the distributor function with args zipped as (option1, arg1), (option2, arg2) by itertools packageprint result             # prints [16,8]

Hope this helps.

Solution 2:

Here is an example of:

  1. Run a single function with multiple inputs in parallel using a Pool (square function) Interesting Side Note the mangled op on lines for "5 981 25"
  2. Run multiple functions with different inputs (Both args and kwargs) and collect their results using a Pool (pf1, pf2, pf3 functions)
import datetime
import multiprocessing
import time
import random

from multiprocessing import Pool

defsquare(x):
    # calculate the square of the value of xprint(x, x*x)
    return x*x

defpf1(*args, **kwargs):
    sleep_time = random.randint(3, 6)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf1", args, sleep_time, datetime.datetime.now()))
    print("Keyword Args from pf1: %s" % kwargs)
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf1 done at %s\n" % datetime.datetime.now())
    return (sum(*args), kwargs)

defpf2(*args):
    sleep_time = random.randint(7, 10)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf2", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf2 done at %s\n" % datetime.datetime.now())
    returnsum(*args)

defpf3(*args):
    sleep_time = random.randint(0, 3)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf3", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf3 done at %s\n" % datetime.datetime.now())
    returnsum(*args)

defsmap(f, *arg):
    iflen(arg) == 2:
        args, kwargs = arg
        return f(list(args), **kwargs)
    eliflen(arg) == 1:
        args = arg
        return f(*args)


if __name__ == '__main__':

    # Define the dataset
    dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

    # Output the datasetprint ('Dataset: ' + str(dataset))

    # Run this with a pool of 5 agents having a chunksize of 3 until finished
    agents = 5
    chunksize = 3with Pool(processes=agents) as pool:
        result = pool.map(square, dataset)
    print("Result of Squares : %s\n\n" % result)
    with Pool(processes=3) as pool:
        result = pool.starmap(smap, [(pf1, [1,2,3], {'a':123, 'b':456}), (pf2, [11,22,33]), (pf3, [111,222,333])])

    # Output the resultprint ('Result: %s ' % result)


Output:
*******

Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
1124394166367498645981251010011121121441316914196
Result of Squares : [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]


Process : ForkPoolWorker-6  Function : pf1  Args: ([1, 2, 3],)  sleeping for3  Time : 2020-07-2000:51:56.477299

Keyword Args from pf1: {'a': 123, 'b': 456}
Process : ForkPoolWorker-7  Function : pf2  Args: ([11, 22, 33],)   sleeping for8  Time : 2020-07-2000:51:56.477371

Process : ForkPoolWorker-8  Function : pf3  Args: ([111, 222, 333],)    sleeping for1  Time : 2020-07-2000:51:56.477918

ForkPoolWorker-8    pf3 done at 2020-07-2000:51:57.478808

ForkPoolWorker-6    pf1 done at 2020-07-2000:51:59.478877

ForkPoolWorker-7    pf2 done at 2020-07-2000:52:04.478016

Result: [(6, {'a': 123, 'b': 456}), 66, 666] 

Process finished with exit code 0

Solution 3:

This is another example I just found, hope it helps, nice and easy ;)

from multiprocessing import Pool

defsquare(x):
    return x * x

defcube(y):
    return y * y * y

pool = Pool(processes=20)

result_squares = pool.map_async(square, range(10))
result_cubes = pool.map_async(cube, range(10))

print result_squares.get(timeout=3)
print result_cubes.get(timeout=3)

Post a Comment for "Two Functions In Parallel With Multiple Arguments And Return Values"