multiprocessing/threading: data appending & output

2019-09-20 01:31发布

问题:

I have a lengthy function called run below that contains a few instances of appending data.

from multiprocessing import Process

data = []

def run():
    global data
    ...
    data.append(trace)
    ...

if __name__ == '__main__':
    jobs = []

    gen_count = 0
    leaked_count = 0
    system_count = 0

    N = 100

    for i in range(N):
        p = Process(target=run)
        jobs.append(p)
        p.start()

However, using multiprocessing no data is appended. In addition, the function run returns several values that need to be added to gen_count, leaked_count, and system_count and I am not sure how to retrieve those values. I chose multiprocessing because running run in a for-loop is slow and each iteration was independent of the rest. I would like to incorporate GPU acceleration in this code later for anyone that has any ideas on that.

So my questions are the following:

  1. Should I even be using multiprocessing as opposed to threading?
  2. Why is trace not being appended to data?
  3. How can I retrieve the output of run within the multiprocessing block?

Edit:

from plotly.offline import init_notebook_mode
import plotly.graph_objs as go
import plotly as py
import time
import Cross_Section_Loading
from multiprocess import Process, Pool, Queue, Manager, cpu_count
from functools import partial
import numpy as np
init_notebook_mode(connected=True)

...

def particle_func(x, y, z):

    leaked = 0
    nu = 0

    # get initial direction
    theta = np.random.uniform(0, np.pi, 1)
    phi = np.random.uniform(0, 2 * np.pi, 1)

    # compute energy via rejection sampling
    expfiss = lambda e: 0.453 * np.exp(-1.036 * e / 1.0e6) * np.sinh(np.sqrt(2.29 * e / 1.0e6))

    min_eng = np.min(E)
    max_eng = np.max(E)
    max_prob = np.max(expfiss(E))

    rejected = 1
    while rejected:
        a = np.random.uniform(min_eng, max_eng, 1)
        b = np.random.uniform(0, max_prob, 1)
        rel_prob = expfiss(a)
        if b <= rel_prob:
            energy = a
            rejected = 0

    alive = 1

    # vector to keep track of positions
    xvec = np.ones(1) * x
    yvec = np.ones(1) * y
    zvec = np.ones(1) * z

    while alive:
        # Get real/new cross-sections for corresponding energy
        index = energy_lookup(E, energy)

        interacted = 0
        total_distance = 0
        # Interacted may still be alive (scattering)
        while interacted == 0:

            ###################################################
            # Determine starting location for sample distance using sigma_total
            material_start = material_type(x, y)

            if material_start == 1:
                sig_tot = sigma_total_fuel(ENRICHMENT_1)[index]
            elif material_start == 2:
                sig_tot = sigma_total_fuel(ENRICHMENT_2)[index]
            elif material_start == 3:
                sig_tot = sigma_total_fuel(ENRICHMENT_3)[index]
            else:
                sig_tot = sigma_total_mod[index]

            ###################################################

            if material_start == 1 or material_start == 2 or material_start == 3:  # if in fuel pin

                # Get distance to edge of fuel rod (from fuel)
                d = distance_to_edge(x, y, phi)

                # get sample distance to collision
                s = -np.log(1.0 - np.random.random(1)) / sig_tot

                # Incidence on interface (denoted by code "no-interface")
                if d != 'no-interface':

                    # Sample distance is greater than interface distance (does not account for material change)
                    # Must convert between 2D and 3D
                    if s * np.sin(theta) > d:
                        total_distance += d / np.sin(theta)

                    # Sample distance is correct and interaction occurs
                    else:
                        total_distance += s
                        interacted = 1

                # Statement may be redundant but idk how to handle return from distance_to_rod
                else:
                    total_distance += s
                    interacted = 1

            else:               # if in moderator
                # get distance to edge of fuel rod (from moderator)
                d = distance_to_edge(x, y, phi)

                # get distance to collision
                s = -np.log(1.0 - np.random.random(1)) / sig_tot

                # Incidence on interface (denoted by code "no-interface")
                if d != 'no-interface':

                    # Sample distance is greater than interface distance (does not account for material change)
                    # Must convert between 2D and 3D
                    if s * np.sin(theta) > d:
                        total_distance += d / np.sin(theta)  # <- Right conversion?

                    # Sample distance is correct and interaction occurs
                    else:
                        total_distance += s
                        interacted = 1

                # Statement may be redundant but idk how to handle return from distance_to_rod
                else:
                    total_distance += s
                    interacted = 1

            # move particle
            z += total_distance * np.cos(theta)
            y += total_distance * np.sin(theta) * np.sin(phi)
            x += total_distance * np.sin(theta) * np.cos(phi)

        # material_end = material_type(x, y)
        #
        # if material_start != material_end:
        #     print("Neutron has crossed material interface(s)")

        # Trace/Track particle movement
        xvec = np.append(xvec, x)
        yvec = np.append(yvec, y)
        zvec = np.append(zvec, z)

        ###################################################

        # Leakage
        if x > X_BOUNDARY or x < -X_BOUNDARY:
            # Still need implementation
            leaked = 1
            alive = 0

        if y > Y_BOUNDARY or y < -Y_BOUNDARY:
            # Still need implementation
            leaked = 1
            alive = 0

        if z > HEIGHT or z < 0:
            # Still need implementation
            leaked = 1
            alive = 0

        ###################################################

        # Determine Type of interaction based on energy and corresponding cross-sections
        # In fuel
        material = material_type(x, y)
        if material == 1:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_1)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_1)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_1)[index]
            nu_temp = nu_fuel(ENRICHMENT_1)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        elif material == 2:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_2)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_2)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_2)[index]
            nu_temp = nu_fuel(ENRICHMENT_2)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        if material == 3:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_3)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_3)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_3)[index]
            nu_temp = nu_fuel(ENRICHMENT_3)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        # In water
        else:
            mod_scat = sigma_scatter_mod[index]
            mod_tot = sigma_total_mod[index]

            # scatter or absorb
            if np.random.random(1) < mod_scat / mod_tot:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_mod * energy, energy, 1)

            else:
                # absorbed
                alive = 0

        ###################################################

    return xvec, yvec, zvec, nu, leaked


##################################################################


def run(data_test):

    ###################################################

    # Uniformly Dispersed Source (Cylinder)
    # x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
    # y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
    # z = np.random.uniform(-HEIGHT, HEIGHT, 1)

    # Uniformly Dispersed FUEL Source (Cylinder)
    rejected = 1
    while rejected:
        x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
        y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
        z = np.random.uniform(-HEIGHT, HEIGHT, 1)
        if material_type(x, y):
            rejected = 0

    ###################################################

    # Get normal particle info (trace)
    x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
    leaked_count = leaked
    gen_count = nu
    system_count = (1 + nu - leaked)

    # particle_trace = go.Scatter3d(
    #     x=x_vec,
    #     y=y_vec,
    #     z=z_vec,
    #     mode='lines',
    #     line=dict(color='rgb(173, 255, 47)')
    # )
    #
    # data_test.append(particle_trace)

    data_test.append((x_vec, y_vec, z_vec))

    ###################################################

    nu_vec = [nu]
    x_vecs = [x_vec]
    y_vecs = [y_vec]
    z_vecs = [z_vec]

    if nu > 0:
        print("{} neutrons generated for neutron {}".format(nu, i))

    else:
        print("No neutrons generated for neutron {}".format(i + 1))

    t = 0
    recent_nus = nu_vec
    while np.any(recent_nus) != 0:

        print(nu_vec[-t:])

        tracker = 0

        nu_vec_temp = []

        x_vecs_temp = []
        y_vecs_temp = []
        z_vecs_temp = []

        for a in range(len(nu_vec[-t:])):

            x = x_vecs[-(a + 1)][-1]
            y = y_vecs[-(a + 1)][-1]
            z = z_vecs[-(a + 1)][-1]

            for j in range(nu_vec[-(a + 1)]):
                x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
                leaked_count += leaked

                print("Particle {} starting coords:".format(j + 1), x_vec[0], y_vec[0], z_vec[0])
                print("Particle {} ending coords:".format(j + 1), x_vec[-1], y_vec[-1], z_vec[-1])
                print("Particle {} nu value".format(j + 1), nu)

                nu_vec_temp.append(nu)
                tracker += 1

                x_vecs_temp.append(x_vec)
                y_vecs_temp.append(y_vec)
                z_vecs_temp.append(z_vec)

                # time.sleep(1)

                # particle_trace = go.Scatter3d(
                #     x=x_vec,
                #     y=y_vec,
                #     z=z_vec,
                #     mode='lines',
                #     line=dict(color='rgb(255, 0, 0)')
                # )

                # data_test.append(particle_trace)
                data_test.append((x_vec, y_vec, z_vec))

            print()
            t = tracker

        nu_vec.extend(nu_vec_temp)
        x_vecs.extend(x_vecs_temp)
        y_vecs.extend(y_vecs_temp)
        z_vecs.extend(z_vecs_temp)

        recent_nus = nu_vec_temp

        print("Continuing fission:", (np.any(recent_nus) != 0))

    return leaked_count, gen_count, system_count

##################################################################

if __name__ == '__main__':
    jobs = []

    manager = Manager()
    list_ = manager.list()
    for _ in range(cpu_count() - 1):
        p = Process(target=run, args=(list_,))
        jobs.append(p)
        p.start()
        p.join()
    while True:  # stops main thread from completing execution
        time.sleep(5)  # wait 5 second before checking if processes are terminated
        if all([not x.is_alive() for x in jobs]):  # check if all processes terminated
            break  # breaks the loop

# print("\nTotal number of neutrons in system:", system_count)
# print("Total number of neutrons generated from {} neutron source: {}".format(N, gen_count))
# print("System Multiplication factor:", gen_count/N)
# print("Total number of leaked neutrons:", leaked_count)


layout = go.Layout(
    title='Monte Carlo Assembly',
    autosize=True,
    showlegend=False,
    height=1000,
    width=1000,
    scene=dict(zaxis=dict(range=[-1, HEIGHT + 1]),
               yaxis=dict(range=[-(Y_DIM * PITCH + 5), (Y_DIM * PITCH + 5)]),
               xaxis=dict(range=[-(X_DIM * PITCH + 5), (X_DIM * PITCH + 5)])
               ),
)

fig = go.Figure(data=data, layout=layout)
py.offline.plot(fig, filename='file.html')

Output & Error Message:

/Users/sterlingbutters/anaconda/bin/python "/Users/sterlingbutters/PycharmProjects/Monte Carlo Simulation/MC Plotly (Cylindrical Assembly) Reflector.py"
For 17 x 17 assembly, 9 x 9 is needed. Your shape: (9, 9)
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
[(array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294]))]
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable

Process finished with exit code 0

回答1:

Multiprocessing spawns a different Process with it's own global variables copies from current environment. All the changes in variable made in that process does not reflect in parent process. You need to share memory between the process and variables in shared memory can be exchanged.

You can use multiprocessing.Manager to create a shared object like list or dictionary, and manipulate that object.

Processes are assigned to different cores/thread of your processor. If you have a 4 core/8 thread system, spawn a maximum of 7 processes to maximize performance, any more than that some processes will interfere with other processes and can slow down/reduce the cpu time allotted to your os which can crash your system. It's always the cpu cores/cpu threads -1 processes for stable processing leaving atleast one core to os to handle other operations.

You can modify your code like this

from multiprocessing import Process, Manager
import time

def run(list_):
    list_.append(trace)

if __name__ == "__main__":
    jobs = []
    gen_count = 0
    leaked_count = 0
    system_count = 0

    with Manager() as manager:
        list_ = manager.list()
        for _ in range(multiprocessing.cpu_count()-1):
            p = Process(target=run,args=(list_))
            jobs.append(p)
            p.start()
        while True: #stops main thread from completing execution
            time.sleep(5) #wait 5 second before checking if processes are terminated
            if all([not x.is_alive() for x in jobs]): #check if all processes terminated
                break #breaks the loop 


回答2:

The way multiprocessing works, each subtask runs in its own memory space and gets its own copy of any global variables. A common way around this limitation to effectively have shared data is to use a multiprocessing.Manager to coordinate concurrent access to it and transparently prevent any problems simultaneous access to it might cause.

Below is an example of doing that based on your sample code. It also uses a multiprocessing.Pool() which makes it easy to create a fixed-size collection of process objects that can each provide asynchronous results each subtask (or wait until all of them are finished before retrieving them, as is being done here).

from functools import partial
import multiprocessing

def run(data, i):
    data.append('trace%d' % i)
    return 1, 2, 3  # values to add to gen_count, leaked_count, and system_count

if __name__ == '__main__':
    N = 10
    manager = multiprocessing.Manager()  # create SyncManager
    data = manager.list()  # create a shared list
    pool = multiprocessing.Pool()

    async_result = pool.map_async(partial(run, data), range(N))
    values = tuple(zip(*async_result.get()))
    gen_count = sum(values[0])
    leaked_count = sum(values[1])
    system_count = sum(values[2])

    print(data)
    print('Totals:  gen_count {}, leaked_count {}, system_count {}'.format(
            gen_count, leaked_count, system_count))

Output:

['trace0', 'trace1', 'trace2', 'trace4', 'trace3', 'trace5', 'trace8', 'trace6', 'trace7', 'trace9']
Totals:  gen_count 10, leaked_count 20, system_count 30