Python's multiprocessing returns more results than tasks where given -


i'm trying use multiprocessing simulation run, evaluate different input values @ same time.

therefore, googled lot in last weeks , got not pretty (somehow) works. problem now, returns more output have given tasks , don't understand why.

sometimes each simulation run returns 1 value expected in example below expect result of e.g. simulation run 5 [23]. differs, simulation run produces more output expected. when increase number of periods e.g. 2, generate 4 output values cannot figure out why is.

could please give me hint how change that? cannot find answer , i'm getting quite frustrated :( suggestions on how improve code appreciated i'm quite new python , love far :)

this simplified code use:

import numpy np multiprocessing import process, queue import multiprocessing itertools import repeat  class simulation(process):     nr = 1     mean = 5     stddev = 3     periods = 10     result = []      def generate_value(self):         generatedvalue = max(int(round(np.random.normal(self.mean, self.stddev), 0)), 0)         return generatedvalue      def runsimulation(self):         in range(self.periods):             self.result.append(self.generate_value())         return self.result  def worker(mean, stddev, periods, nr, queue):     sim = simulation()     sim.nr = nr     sim.periods = periods     sim.mean = mean     sim.stddev = stddev     results = sim.runsimulation()     queue.put(results)     print("simulation run " + str(nr) + " done result of " + str(results)           + " (input: mean: " + str(mean) + ", std. dev.: " + str(stddev) + ")")  if __name__ == '__main__':     m = multiprocessing.manager()     queue = m.queue()     cpus = multiprocessing.cpu_count() # cpus = 8     workers = multiprocessing.pool(processes=cpus)      mean = [50, 60, 70, 80, 90]     stddev = [10, 10, 10, 10, 10]     periods = 1     nr = list(range(1,len(mean) + 1))      workers.starmap(worker, zip(mean, stddev, repeat(periods), nr, repeat(queue)))     workers.close()     workers.join()      finalsimulationresults = []     in range(len(mean)):         finalsimulationresults.append(queue.get())     print(finalsimulationresults) 

which results in e.g. this:

simulation run 1 done result of [23] (input: mean: 50, std. dev.: 10) simulation run 2 done result of [55] (input: mean: 60, std. dev.: 10) simulation run 3 done result of [64] (input: mean: 70, std. dev.: 10) simulation run 5 done result of [23, 89] (input: mean: 90, std. dev.: 10) simulation run 4 done result of [78] (input: mean: 80, std. dev.: 10) [[23], [55], [64], [23, 89], [78]] 

it works :). not fast expected (only 2 times faster 8 cores) might has same problem, here's working code:

import numpy np multiprocessing import process, queue import multiprocessing itertools import repeat  class simulation():     def __init__(self, nr, mean, std_dev, periods):         self.result = []         self.nr = nr         self.mean = mean         self.stddev = std_dev         self.periods = periods      def generate_value(self):         generatedvalue = max(int(round(np.random.normal(self.mean, self.stddev), 0)), 0)         return generatedvalue      def runsimulation(self):         in range(self.periods):             self.result.append(self.generate_value())         return self.result  def worker(mean, stddev, periods, nr, queue):     sim = simulation(nr=nr,mean=mean,std_dev=stddev,periods=periods)     results = sim.runsimulation()     queue.put(results)     print("simulation run " + str(nr) + " done result of " + str(results)           + " (input: mean: " + str(mean) + ", std. dev.: " + str(stddev) + ")")  if __name__ == '__main__':     start = time.time()     m = multiprocessing.manager()     queue = m.queue()     cpus = multiprocessing.cpu_count()     workers = multiprocessing.pool(processes=cpus)      mean = [50, 60, 70, 80, 90]     stddev = [10, 10, 10, 10, 10]     periods = 100     nr = list(range(1,len(mean) + 1))      workers.starmap(worker, zip(mean, stddev, repeat(periods), nr, repeat(queue)))     workers.close()     workers.join()      finalsimulationresults = []     in range(len(mean)):         finalsimulationresults.append(queue.get())      print(finalsimulationresults) 

the way assign attributes class makes attributes class attributes. way shared between every instance of class. in case doesn't appear immideatly because in every process have 1 instance of class , class object not shared between processes. if worker finished enough can task class object reused , class attributes work "as expected".

to circumvent should assign instance attributes (i.e. attributes should different instance instance) in __init__ function:

class simulation(process):      def __init__(self, nr, mean, std_dev, periods):         self.nr = nr         self.mean = mean         self.std_dev = std_dev         self.periods = periods         self.result = []      def generate_value(self):         generatedvalue = max(int(round(np.random.normal(self.mean, self.stddev), 0)), 0)         return generatedvalue      def runsimulation(self):         in range(self.periods):             self.result.append(self.generate_value())         return self.result 

for further information see the documentation

that said don't think should use process class in way using it. pool automatically handles process creating , need tell do. rewriting code:

def task(nr, mean, std_dev, periods, results):     in range(periods):         results.append(max(int(round(np.random.normal(self.mean, self.stddev), 0)), 0))     return results   m = multiprocessing.manager() queue = m.queue() cpu_count = multiprocessing.cpu_count() # cpus = 8 pool = multiprocessing.pool(processes=cpus)  mean = [50, 60, 70, 80, 90] stddev = [10, 10, 10, 10, 10] periods = 1 nr = list(range(1,len(mean) + 1))  pool.starmap(task, zip(mean, stddev, repeat(periods), nr, repeat(queue))) pool.close() pool.join() 

should work (not tested).


Comments

Popular posts from this blog

sequelize.js - Sequelize group by with association includes id -

java - Android raising EPERM (Operation not permitted) when attempting to send UDP packet after network connection -

c++ - Migration from QScriptEngine to QJSEngine -