I have a lengthy function called run
below that contains a few instances of appending data.
from multiprocessing import Process
data = []
def run():
global data
...
data.append(trace)
...
if __name__ == '__main__':
jobs = []
gen_count = 0
leaked_count = 0
system_count = 0
N = 100
for i in range(N):
p = Process(target=run)
jobs.append(p)
p.start()
However, using multiprocessing no data is appended. In addition, the function run
returns several values that need to be added to gen_count
, leaked_count
, and system_count
and I am not sure how to retrieve those values. I chose multiprocessing because running run
in a for
-loop is slow and each iteration was independent of the rest. I would like to incorporate GPU acceleration in this code later for anyone that has any ideas on that.
So my questions are the following:
- Should I even be using multiprocessing as opposed to threading?
- Why is
trace
not being appended todata
? - How can I retrieve the output of
run
within the multiprocessing block?
Edit:
from plotly.offline import init_notebook_mode
import plotly.graph_objs as go
import plotly as py
import time
import Cross_Section_Loading
from multiprocess import Process, Pool, Queue, Manager, cpu_count
from functools import partial
import numpy as np
init_notebook_mode(connected=True)
...
def particle_func(x, y, z):
leaked = 0
nu = 0
# get initial direction
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
# compute energy via rejection sampling
expfiss = lambda e: 0.453 * np.exp(-1.036 * e / 1.0e6) * np.sinh(np.sqrt(2.29 * e / 1.0e6))
min_eng = np.min(E)
max_eng = np.max(E)
max_prob = np.max(expfiss(E))
rejected = 1
while rejected:
a = np.random.uniform(min_eng, max_eng, 1)
b = np.random.uniform(0, max_prob, 1)
rel_prob = expfiss(a)
if b <= rel_prob:
energy = a
rejected = 0
alive = 1
# vector to keep track of positions
xvec = np.ones(1) * x
yvec = np.ones(1) * y
zvec = np.ones(1) * z
while alive:
# Get real/new cross-sections for corresponding energy
index = energy_lookup(E, energy)
interacted = 0
total_distance = 0
# Interacted may still be alive (scattering)
while interacted == 0:
###################################################
# Determine starting location for sample distance using sigma_total
material_start = material_type(x, y)
if material_start == 1:
sig_tot = sigma_total_fuel(ENRICHMENT_1)[index]
elif material_start == 2:
sig_tot = sigma_total_fuel(ENRICHMENT_2)[index]
elif material_start == 3:
sig_tot = sigma_total_fuel(ENRICHMENT_3)[index]
else:
sig_tot = sigma_total_mod[index]
###################################################
if material_start == 1 or material_start == 2 or material_start == 3: # if in fuel pin
# Get distance to edge of fuel rod (from fuel)
d = distance_to_edge(x, y, phi)
# get sample distance to collision
s = -np.log(1.0 - np.random.random(1)) / sig_tot
# Incidence on interface (denoted by code "no-interface")
if d != 'no-interface':
# Sample distance is greater than interface distance (does not account for material change)
# Must convert between 2D and 3D
if s * np.sin(theta) > d:
total_distance += d / np.sin(theta)
# Sample distance is correct and interaction occurs
else:
total_distance += s
interacted = 1
# Statement may be redundant but idk how to handle return from distance_to_rod
else:
total_distance += s
interacted = 1
else: # if in moderator
# get distance to edge of fuel rod (from moderator)
d = distance_to_edge(x, y, phi)
# get distance to collision
s = -np.log(1.0 - np.random.random(1)) / sig_tot
# Incidence on interface (denoted by code "no-interface")
if d != 'no-interface':
# Sample distance is greater than interface distance (does not account for material change)
# Must convert between 2D and 3D
if s * np.sin(theta) > d:
total_distance += d / np.sin(theta) # <- Right conversion?
# Sample distance is correct and interaction occurs
else:
total_distance += s
interacted = 1
# Statement may be redundant but idk how to handle return from distance_to_rod
else:
total_distance += s
interacted = 1
# move particle
z += total_distance * np.cos(theta)
y += total_distance * np.sin(theta) * np.sin(phi)
x += total_distance * np.sin(theta) * np.cos(phi)
# material_end = material_type(x, y)
#
# if material_start != material_end:
# print("Neutron has crossed material interface(s)")
# Trace/Track particle movement
xvec = np.append(xvec, x)
yvec = np.append(yvec, y)
zvec = np.append(zvec, z)
###################################################
# Leakage
if x > X_BOUNDARY or x < -X_BOUNDARY:
# Still need implementation
leaked = 1
alive = 0
if y > Y_BOUNDARY or y < -Y_BOUNDARY:
# Still need implementation
leaked = 1
alive = 0
if z > HEIGHT or z < 0:
# Still need implementation
leaked = 1
alive = 0
###################################################
# Determine Type of interaction based on energy and corresponding cross-sections
# In fuel
material = material_type(x, y)
if material == 1:
sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_1)[index]
sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_1)[index]
sig_tot_temp = sigma_total_fuel(ENRICHMENT_1)[index]
nu_temp = nu_fuel(ENRICHMENT_1)[index]
# scatter or absorb
if np.random.random(1) < sig_scat_temp / sig_tot_temp:
# scatter, pick new angles & energy
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
energy = np.random.uniform(alpha_fuel * energy, energy, 1)
elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:
# Determine number of neutrons produced from fission
# round or int or both?
nu = int(round(nu_temp))
alive = 0
else:
# absorbed
alive = 0
#############################
elif material == 2:
sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_2)[index]
sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_2)[index]
sig_tot_temp = sigma_total_fuel(ENRICHMENT_2)[index]
nu_temp = nu_fuel(ENRICHMENT_2)[index]
# scatter or absorb
if np.random.random(1) < sig_scat_temp / sig_tot_temp:
# scatter, pick new angles & energy
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
energy = np.random.uniform(alpha_fuel * energy, energy, 1)
elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:
# Determine number of neutrons produced from fission
# round or int or both?
nu = int(round(nu_temp))
alive = 0
else:
# absorbed
alive = 0
#############################
if material == 3:
sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_3)[index]
sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_3)[index]
sig_tot_temp = sigma_total_fuel(ENRICHMENT_3)[index]
nu_temp = nu_fuel(ENRICHMENT_3)[index]
# scatter or absorb
if np.random.random(1) < sig_scat_temp / sig_tot_temp:
# scatter, pick new angles & energy
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
energy = np.random.uniform(alpha_fuel * energy, energy, 1)
elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:
# Determine number of neutrons produced from fission
# round or int or both?
nu = int(round(nu_temp))
alive = 0
else:
# absorbed
alive = 0
#############################
# In water
else:
mod_scat = sigma_scatter_mod[index]
mod_tot = sigma_total_mod[index]
# scatter or absorb
if np.random.random(1) < mod_scat / mod_tot:
# scatter, pick new angles & energy
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
energy = np.random.uniform(alpha_mod * energy, energy, 1)
else:
# absorbed
alive = 0
###################################################
return xvec, yvec, zvec, nu, leaked
##################################################################
def run(data_test):
###################################################
# Uniformly Dispersed Source (Cylinder)
# x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
# y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
# z = np.random.uniform(-HEIGHT, HEIGHT, 1)
# Uniformly Dispersed FUEL Source (Cylinder)
rejected = 1
while rejected:
x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
z = np.random.uniform(-HEIGHT, HEIGHT, 1)
if material_type(x, y):
rejected = 0
###################################################
# Get normal particle info (trace)
x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
leaked_count = leaked
gen_count = nu
system_count = (1 + nu - leaked)
# particle_trace = go.Scatter3d(
# x=x_vec,
# y=y_vec,
# z=z_vec,
# mode='lines',
# line=dict(color='rgb(173, 255, 47)')
# )
#
# data_test.append(particle_trace)
data_test.append((x_vec, y_vec, z_vec))
###################################################
nu_vec = [nu]
x_vecs = [x_vec]
y_vecs = [y_vec]
z_vecs = [z_vec]
if nu > 0:
print("{} neutrons generated for neutron {}".format(nu, i))
else:
print("No neutrons generated for neutron {}".format(i + 1))
t = 0
recent_nus = nu_vec
while np.any(recent_nus) != 0:
print(nu_vec[-t:])
tracker = 0
nu_vec_temp = []
x_vecs_temp = []
y_vecs_temp = []
z_vecs_temp = []
for a in range(len(nu_vec[-t:])):
x = x_vecs[-(a + 1)][-1]
y = y_vecs[-(a + 1)][-1]
z = z_vecs[-(a + 1)][-1]
for j in range(nu_vec[-(a + 1)]):
x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
leaked_count += leaked
print("Particle {} starting coords:".format(j + 1), x_vec[0], y_vec[0], z_vec[0])
print("Particle {} ending coords:".format(j + 1), x_vec[-1], y_vec[-1], z_vec[-1])
print("Particle {} nu value".format(j + 1), nu)
nu_vec_temp.append(nu)
tracker += 1
x_vecs_temp.append(x_vec)
y_vecs_temp.append(y_vec)
z_vecs_temp.append(z_vec)
# time.sleep(1)
# particle_trace = go.Scatter3d(
# x=x_vec,
# y=y_vec,
# z=z_vec,
# mode='lines',
# line=dict(color='rgb(255, 0, 0)')
# )
# data_test.append(particle_trace)
data_test.append((x_vec, y_vec, z_vec))
print()
t = tracker
nu_vec.extend(nu_vec_temp)
x_vecs.extend(x_vecs_temp)
y_vecs.extend(y_vecs_temp)
z_vecs.extend(z_vecs_temp)
recent_nus = nu_vec_temp
print("Continuing fission:", (np.any(recent_nus) != 0))
return leaked_count, gen_count, system_count
##################################################################
if __name__ == '__main__':
jobs = []
manager = Manager()
list_ = manager.list()
for _ in range(cpu_count() - 1):
p = Process(target=run, args=(list_,))
jobs.append(p)
p.start()
p.join()
while True: # stops main thread from completing execution
time.sleep(5) # wait 5 second before checking if processes are terminated
if all([not x.is_alive() for x in jobs]): # check if all processes terminated
break # breaks the loop
# print("\nTotal number of neutrons in system:", system_count)
# print("Total number of neutrons generated from {} neutron source: {}".format(N, gen_count))
# print("System Multiplication factor:", gen_count/N)
# print("Total number of leaked neutrons:", leaked_count)
layout = go.Layout(
title='Monte Carlo Assembly',
autosize=True,
showlegend=False,
height=1000,
width=1000,
scene=dict(zaxis=dict(range=[-1, HEIGHT + 1]),
yaxis=dict(range=[-(Y_DIM * PITCH + 5), (Y_DIM * PITCH + 5)]),
xaxis=dict(range=[-(X_DIM * PITCH + 5), (X_DIM * PITCH + 5)])
),
)
fig = go.Figure(data=data, layout=layout)
py.offline.plot(fig, filename='file.html')
Output & Error Message:
/Users/sterlingbutters/anaconda/bin/python "/Users/sterlingbutters/PycharmProjects/Monte Carlo Simulation/MC Plotly (Cylindrical Assembly) Reflector.py"
For 17 x 17 assembly, 9 x 9 is needed. Your shape: (9, 9)
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
[(array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294]))]
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Process finished with exit code 0
Multiprocessing spawns a different Process with it's own global variables copies from current environment. All the changes in variable made in that process does not reflect in parent process. You need to share memory between the process and variables in shared memory can be exchanged.
You can use
multiprocessing.Manager
to create a shared object like list or dictionary, and manipulate that object.Processes are assigned to different cores/thread of your processor. If you have a 4 core/8 thread system, spawn a maximum of 7 processes to maximize performance, any more than that some processes will interfere with other processes and can slow down/reduce the cpu time allotted to your os which can crash your system. It's always the cpu cores/cpu threads -1 processes for stable processing leaving atleast one core to os to handle other operations.
You can modify your code like this
The way multiprocessing works, each subtask runs in its own memory space and gets its own copy of any global variables. A common way around this limitation to effectively have shared data is to use a
multiprocessing.Manager
to coordinate concurrent access to it and transparently prevent any problems simultaneous access to it might cause.Below is an example of doing that based on your sample code. It also uses a
multiprocessing.Pool()
which makes it easy to create a fixed-size collection of process objects that can each provide asynchronous results each subtask (or wait until all of them are finished before retrieving them, as is being done here).Output: