Python Multiprocessing and Serializing Data

2019-02-26 14:13发布

问题:

I am running a script on a school computer using the multiprocessing module. I am serializing the data frequently. It can be summarized by the code below:

import multiprocessing as mp
import time, pickle

def simulation(j):
    data = []
    for k in range(10):
        data.append(k)
        time.sleep(1)
        file = open('data%d.pkl'%j, 'wb')
        pickle.dump(data, file)
        file.close()
if __name__ == '__main__':
    processes = []
    processes.append(mp.Process(target = simulation, args = (1,) ))
    processes.append(mp.Process(target = simulation, args = (2,) ))
    for process in processes:
        process.start()
    for process in processes:
        process.join()

So when I actually run my code for many more simulations and what I imagine to be more intensive varied tasks, I get the following error: IOError: [Errno 5] Input/output error usually preceded by file.open(...) or file.close().

My questions:

  • How do I fix this error in my script?
  • What does this error mean for a python newcomer? References appreciated.

Some more notes about my procedure:

  • Instead of setting the multiprocess attribute daemon to be True, I use screen to run the script and then detach. This allows me also to disconnect without worrying about my script stopping.
  • This seemed to be a related question about printing using the subprocess module. I did not explicitly use daemon as I said, so not sure if this will help.
  • This usually happens after running for about a day and occurs on different processes at different times.

回答1:

Your program looks pretty good. In this case IOError just means "bad things happened." The entire set of simulated data became to large for the Python process, so it exited with the mysterious message.

A couple improvements in the following version:

  • Once some data has been produced, append it to a data file, then zap it from memory. The program should have roughly the same RAM use over time, rather than using up more and more, then crashing.

  • Conveniently, if a file is a concatenation of pickle objects, we can easily print out each one later for further examination. Example code shown.

Have fun!

source

import multiprocessing as mp
import glob, time, pickle, sys

def simulation(j):
    for k in range(10):
        datum = {'result': k}
        time.sleep(1)
        with open('data%d.pkl'%j, 'ab') as dataf:
            pickle.dump(datum, dataf)

def show():
    for datname in glob.glob('data*.pkl'):
        try:
            print '*'*8, datname
            with open(datname, 'rb') as datf:
                while True:
                    print pickle.load(datf)
        except EOFError:
            pass

def do_sim():
    processes = []
    processes.append(mp.Process(target = simulation, args = (1,) ))
    processes.append(mp.Process(target = simulation, args = (2,) ))
    for process in processes:
        process.start()
    for process in processes:
        process.join()

if __name__ == '__main__':
    if '--show' in sys.argv:
        show()
    else:
        do_sim()

output of "python ./msim.py --show"

******** data2.pkl
{'result': 0}
{'result': 1}
{'result': 2}
{'result': 3}
{'result': 4}
{'result': 5}
{'result': 6}
{'result': 7}
{'result': 8}
{'result': 9}
******** data1.pkl
{'result': 0}
{'result': 1}
{'result': 2}
{'result': 3}
{'result': 4}
{'result': 5}
{'result': 6}
{'result': 7}
{'result': 8}
{'result': 9}