Multiprocessing with “fork” context blocks on Linu

2019-04-03 09:48发布

Problem description
I adjusted the code from this answer a little bit (see below). However when running this script on Linux (so command line: python script_name.py) it will print jobs running: x for all the jobs but then just seems to stuck after that. However when I use the spawn method (mp.set_start_method('spawn')) it works out fine and immediately starts printing the value of the counter variable (see listener method).

Question

  • Why does it work only when spawning processes?
  • How can I adjust the code so it works with fork? (because it is probably faster)

Code

import io
import csv
import multiprocessing as mp

NEWLINE = '\n'

def file_searcher(file_path):
    parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='\t')

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count())

    # put listener to work first
    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for row in parsed_file:
        print('jobs running: ' + str(len(jobs) + 1))
        job = pool.apply_async(worker, (row, q))
        jobs.append(job)

  # collect results from the workers through the pool result queue
    for job in jobs:
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

def worker(genome_row, q):
    complete_data = []
    #data processing
    #ftp connection to retrieve data
    #etc.
    q.put(complete_data)
    return complete_data

def listener(q):
    '''listens for messages on the q, writes to file. '''
    f = io.open('output.txt', 'w', encoding='utf-8')
    counter = 0
    while 1:
        m = q.get()
        counter +=1
        print(counter)
        if m == 'kill':
            break
        for x in m:
            f.write(x + NEWLINE)
        f.flush()
    f.close()

if __name__ == "__main__":
   file_searcher('path_to_some_tab_del_file.txt')

Processor info

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                20
On-line CPU(s) list:   0-19
Thread(s) per core:    1
Core(s) per socket:    1
Socket(s):             20
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 45
Model name:            Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz
Stepping:              2
CPU MHz:               2596.501
BogoMIPS:              5193.98
Hypervisor vendor:     VMware
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              25600K
NUMA node0 CPU(s):     0-19

Linux kernel version

3.10.0-514.26.2.el7.x86_64

Python version

Python 3.6.1 :: Continuum Analytics, Inc.

LOG
I added the code as suggested by @yacc, this will give the following log:

[server scripts]$ python main_v3.py
[INFO/SyncManager-1] child process calling self.run()
[INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6
[INFO/SyncManager-1] manager serving at '/tmp/pymp-2a9stjh6/listener-jxwseclw'
[DEBUG/MainProcess] requesting creation of a shared 'Queue' object
[DEBUG/SyncManager-1] 'Queue' callable returned object with id '7f0842da56a0'
[DEBUG/MainProcess] INCREF '7f0842da56a0'
[DEBUG/MainProcess] created semlock with handle 139673691570176
[DEBUG/MainProcess] created semlock with handle 139673691566080
[DEBUG/MainProcess] created semlock with handle 139673691561984
[DEBUG/MainProcess] created semlock with handle 139673691557888
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-6] child process calling self.run()
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-7] child process calling self.run()
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-17] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-18] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-19] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-20] child process calling self.run()
jobs running: 1
jobs running: 2
jobs running: 3
jobs running: 4
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-21] child process calling self.run()
jobs running: 5
jobs running: 6
jobs running: 7
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
jobs running: 8
written to file
jobs running: 9
jobs running: 10
[DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection
[DEBUG/ForkPoolWorker-2] making connection to manager
jobs running: 11
jobs running: 12
jobs running: 13
jobs running: 14
jobs running: 15
[DEBUG/SyncManager-1] starting server thread to service 'ForkPoolWorker-2'
jobs running: 16
jobs running: 17
jobs running: 18
jobs running: 19
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'

1条回答
我想做一个坏孩纸
2楼-- · 2019-04-03 09:57

As @jxh hinted, the differences between fork and spawn are important. The documentation on multiprocessing indicates in section 17.2.1.2 that the difference is: forking preserves the environment and things like stdin/out, whereas spawn just makes a fresh new process. I think you maybe have something in your environment which causes issues for the worker function, likely in the code behind your comments about other processing. Spawning gives you a clean slate and things are running fine under those conditions.

To determine what is going on, I would have each worker print diagnostic messages, probably written to a file unique to each worker. open/close that file each time you want to write a message, so the contents gets updated/flushed.

Fork should not be faster than spawn, because fork needs to copy the environment information to the new process. In any case, this is only the startup cost which is minimal, I assume, because the worker needs to do some computationally or I/O bound work that you want to parallelize.

查看更多
登录 后发表回答