High Memory Usage Using Python Multiprocessing

2019-01-13 04:21发布

问题:

I have seen a couple of posts on memory usage using Python Multiprocessing module. However the questions don't seem to answer the problem I have here. I am posting my analysis with the hope that some one can help me.

Issue

I am using multiprocessing to perform tasks in parallel and I noticed that the memory consumption by the worker processes grow indefinitely. I have a small standalone example that should replicate what I notice.

import multiprocessing as mp
import time

def calculate(num):
    l = [num*num for num in range(num)]
    s = sum(l)
    del l       # delete lists as an  option
    return s

if __name__ == "__main__":
    pool = mp.Pool(processes=2)
    time.sleep(5)
    print "launching calculation"
    num_tasks = 1000
    tasks =  [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
    for f in tasks:    
        print f.get(5)
    print "calculation finished"
    time.sleep(10)
    print "closing  pool"
    pool.close()
    print "closed pool"
    print "joining pool"
    pool.join()
    print "joined pool"
    time.sleep(5)

System

I am running Windows and I use the task manager to monitor the memory usage. I am running Python 2.7.6.

Observation

I have summarized the memory consumption by the 2 worker processes below.

+---------------+----------------------+----------------------+
|  num_tasks    |  memory with del     | memory without del   |
|               | proc_1   | proc_2    | proc_1   | proc_2    |
+---------------+----------------------+----------------------+
| 1000          | 4884     | 4694      | 4892     | 4952      |
| 5000          | 5588     | 5596      | 6140     | 6268      |
| 10000         | 6528     | 6580      | 6640     | 6644      |
+---------------+----------------------+----------------------+

In the table above, I tried to change the number of tasks and observe the memory consumed at the end of all calculation and before join-ing the pool. The 'del' and 'without del' options are whether I un-comment or comment the del l line inside the calculate(num) function respectively. Before calculation, the memory consumption is around 4400.

  1. It looks like manually clearing out the lists results in lower memory usage for the worker processes. I thought the garbage collector would have taken care of this. Is there a way to force garbage collection?
  2. It is puzzling that with increase in number of tasks, the memory usage keeps growing in both cases. Is there a way to limit the memory usage?

I have a process that is based on this example, and is meant to run long term. I observe that this worker processes are hogging up lots of memory(~4GB) after an overnight run. Doing a join to release memory is not an option and I am trying to figure out a way without join-ing.

This seems a little mysterious. Has anyone encountered something similar? How can I fix this issue?

回答1:

I did a lot of research, and couldn't find a solution to fix the problem per se. But there is a decent work around that prevents the memory blowout for a small cost, worth especially on server side long running code.

The solution essentially was to restart individual worker processes after a fixed number of tasks. The Pool class in python takes maxtasksperchild as an argument. You can specify maxtasksperchild=1000 thus limiting 1000 tasks to be run on each child process. After reaching the maxtasksperchild number, the pool refreshes its child processes. Using a prudent number for maximum tasks, one can balance the max memory that is consumed, with the start up cost associated with restarting back-end process. The Pool construction is done as :

pool = mp.Pool(processes=2,maxtasksperchild=1000)

I am putting my full solution here so it can be of use to others!

import multiprocessing as mp
import time

def calculate(num):
    l = [num*num for num in range(num)]
    s = sum(l)
    del l       # delete lists as an  option
    return s

if __name__ == "__main__":

    # fix is in the following line #
    pool = mp.Pool(processes=2,maxtasksperchild=1000)

    time.sleep(5)
    print "launching calculation"
    num_tasks = 1000
    tasks =  [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
    for f in tasks:    
        print f.get(5)
    print "calculation finished"
    time.sleep(10)
    print "closing  pool"
    pool.close()
    print "closed pool"
    print "joining pool"
    pool.join()
    print "joined pool"
    time.sleep(5)