Python's multiprocessing returns more results than tasks where given -
i'm trying use multiprocessing simulation run, evaluate different input values @ same time.
therefore, googled lot in last weeks , got not pretty (somehow) works. problem now, returns more output have given tasks , don't understand why.
sometimes each simulation run returns 1 value expected in example below expect result of e.g. simulation run 5 [23]. differs, simulation run produces more output expected. when increase number of periods e.g. 2, generate 4 output values cannot figure out why is.
could please give me hint how change that? cannot find answer , i'm getting quite frustrated :( suggestions on how improve code appreciated i'm quite new python , love far :)
this simplified code use:
import numpy np multiprocessing import process, queue import multiprocessing itertools import repeat class simulation(process): nr = 1 mean = 5 stddev = 3 periods = 10 result = [] def generate_value(self): generatedvalue = max(int(round(np.random.normal(self.mean, self.stddev), 0)), 0) return generatedvalue def runsimulation(self): in range(self.periods): self.result.append(self.generate_value()) return self.result def worker(mean, stddev, periods, nr, queue): sim = simulation() sim.nr = nr sim.periods = periods sim.mean = mean sim.stddev = stddev results = sim.runsimulation() queue.put(results) print("simulation run " + str(nr) + " done result of " + str(results) + " (input: mean: " + str(mean) + ", std. dev.: " + str(stddev) + ")") if __name__ == '__main__': m = multiprocessing.manager() queue = m.queue() cpus = multiprocessing.cpu_count() # cpus = 8 workers = multiprocessing.pool(processes=cpus) mean = [50, 60, 70, 80, 90] stddev = [10, 10, 10, 10, 10] periods = 1 nr = list(range(1,len(mean) + 1)) workers.starmap(worker, zip(mean, stddev, repeat(periods), nr, repeat(queue))) workers.close() workers.join() finalsimulationresults = [] in range(len(mean)): finalsimulationresults.append(queue.get()) print(finalsimulationresults)
which results in e.g. this:
simulation run 1 done result of [23] (input: mean: 50, std. dev.: 10) simulation run 2 done result of [55] (input: mean: 60, std. dev.: 10) simulation run 3 done result of [64] (input: mean: 70, std. dev.: 10) simulation run 5 done result of [23, 89] (input: mean: 90, std. dev.: 10) simulation run 4 done result of [78] (input: mean: 80, std. dev.: 10) [[23], [55], [64], [23, 89], [78]]
it works :). not fast expected (only 2 times faster 8 cores) might has same problem, here's working code:
import numpy np multiprocessing import process, queue import multiprocessing itertools import repeat class simulation(): def __init__(self, nr, mean, std_dev, periods): self.result = [] self.nr = nr self.mean = mean self.stddev = std_dev self.periods = periods def generate_value(self): generatedvalue = max(int(round(np.random.normal(self.mean, self.stddev), 0)), 0) return generatedvalue def runsimulation(self): in range(self.periods): self.result.append(self.generate_value()) return self.result def worker(mean, stddev, periods, nr, queue): sim = simulation(nr=nr,mean=mean,std_dev=stddev,periods=periods) results = sim.runsimulation() queue.put(results) print("simulation run " + str(nr) + " done result of " + str(results) + " (input: mean: " + str(mean) + ", std. dev.: " + str(stddev) + ")") if __name__ == '__main__': start = time.time() m = multiprocessing.manager() queue = m.queue() cpus = multiprocessing.cpu_count() workers = multiprocessing.pool(processes=cpus) mean = [50, 60, 70, 80, 90] stddev = [10, 10, 10, 10, 10] periods = 100 nr = list(range(1,len(mean) + 1)) workers.starmap(worker, zip(mean, stddev, repeat(periods), nr, repeat(queue))) workers.close() workers.join() finalsimulationresults = [] in range(len(mean)): finalsimulationresults.append(queue.get()) print(finalsimulationresults)
the way assign attributes class makes attributes class attributes. way shared between every instance of class. in case doesn't appear immideatly because in every process have 1 instance of class , class object not shared between processes. if worker finished enough can task class object reused , class attributes work "as expected".
to circumvent should assign instance attributes (i.e. attributes should different instance instance) in __init__
function:
class simulation(process): def __init__(self, nr, mean, std_dev, periods): self.nr = nr self.mean = mean self.std_dev = std_dev self.periods = periods self.result = [] def generate_value(self): generatedvalue = max(int(round(np.random.normal(self.mean, self.stddev), 0)), 0) return generatedvalue def runsimulation(self): in range(self.periods): self.result.append(self.generate_value()) return self.result
for further information see the documentation
that said don't think should use process class in way using it. pool
automatically handles process creating , need tell do. rewriting code:
def task(nr, mean, std_dev, periods, results): in range(periods): results.append(max(int(round(np.random.normal(self.mean, self.stddev), 0)), 0)) return results m = multiprocessing.manager() queue = m.queue() cpu_count = multiprocessing.cpu_count() # cpus = 8 pool = multiprocessing.pool(processes=cpus) mean = [50, 60, 70, 80, 90] stddev = [10, 10, 10, 10, 10] periods = 1 nr = list(range(1,len(mean) + 1)) pool.starmap(task, zip(mean, stddev, repeat(periods), nr, repeat(queue))) pool.close() pool.join()
should work (not tested).
Comments
Post a Comment