Python/Django polling of database has memory leak

2020-05-21 05:04发布

问题:

I've got a Python script running Django for database and memcache, but it's notably runnning as a standalone daemon (i.e. not responding to webserver requests). The daemon checks a Django model Requisition for objects with a status=STATUS_NEW, then marks them STATUS_WORKING and puts them into a queue.

A number of processes (created using the multiprocess package) will pull things out of the Queue and do work on the Requisition with the pr.id that was passed to the Queue. I believe the memory leak is probably in the following code (but it could be in the 'Worker' code on the other side of the Queue though this is unlikely because because the memory size is growing even when no Requisitions are coming up -- i.e. when the workers are all blocking on Queue.get()).

from requisitions.models import Requisition # our Django model
from multiprocessing import Queue

while True:
    # Wait for "N"ew requisitions, then pop them into the queue.
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
        pr.set_status(pr.STATUS_WORKING)
        pr.save()
        queue.put(pr.id)

    time.sleep(settings.DAEMON_POLL_WAIT)

Where settings.DAEMON_POLL_WAIT=0.01.

It seems if I leave this running for a period of time (i.e. a couple days) the Python process will grow to infinite size and eventually the system will run out of memory.

What's going on here (or how can I find out), and more importantly - how can you run a daemon that does this?

My first thought is to change the dynamic of the function, notably by putting the check for new Requisition objects into a django.core.cache cache, i.e.

from django.core.cache import cache

while True:
    time.sleep(settings.DAEMON_POLL_WAIT)
    if cache.get('new_requisitions'):
       # Possible race condition
       cache.clear()
       process_new_requisitions(queue)

 def process_new_requisitions(queue):
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
        pr.set_status(pr.STATUS_WORKING)
        pr.save()
        queue.put(pr.id)

The process that's creating Requisitions with status=STATUS_NEW can do a cache.set('new_requisitions', 1) (or alternatively we could catch a signal or Requisition.save() event where a new Requisition is being created and then set the flag in the cache from there).

However I'm not sure that the solution I've proposed here addresses the memory issues (which are probably related to garbage collection - so the scoping by way of the process_new_requisitions may solve the problem).

I'm grateful for any thoughts and feedback.

回答1:

You need to regularly reset a list of queries that Django keeps for debugging purposes. Normally it is cleared after every request, but since your application is not request based, you need to do this manually:

from django import db

db.reset_queries()

See also:

  • "Debugging Django memory leak with TrackRefs and Guppy" by Mikko Ohtamaa:

    Django keeps track of all queries for debugging purposes (connection.queries). This list is reseted at the end of HTTP request. But in standalone mode, there are no requests. So you need to manually reset to queries list after each working cycle

  • "Why is Django leaking memory?" in Django FAQ - it talks both about setting DEBUG to False, which is always important, and about clearing the list of queries using db.reset_queries(), important in applications like yours.



回答2:

Does the settings.py file for the daemon process have DEBUG = True? If so, Django keeps in memory a record of all the SQL it has run so far, which can lead to a memory leak.



回答3:

I had a lot of data crunching to do, so, my solution to this issue was using multiprocessing, and using pools to counteract whatever memory bloat was happening.

To keep it simple, I just defined some "global" (top-level, whatever the term is in Python) functions instead of trying to make things pickle-able.

Here it is in abstract form:

import multiprocessing as mp

WORKERS = 16 # I had 7 cores, allocated 16 because processing was I/O bound

# this is a global function
def worker(params):
  # do stuff
  return something_for_the_callback_to_analyze

# this is a global function
def worker_callback(worker_return_value):
  # report stuff, or pass

# My multiprocess_launch was inside of a class
def multiprocess_launcher(params):
  # somehow define a collection
  while True:
    if len(collection) == 0:
      break
    # Take a slice
    pool_sub_batch = []
    for _ in range(WORKERS):
      if collection: # as long as there's still something in the collection
        pool_sub_batch.append( collection.pop() )
    # Start a pool, limited to the slice
    pool_size = WORKERS
    if len(pool_sub_batch) < WORKERS:
      pool_size = len(pool_sub_batch)
    pool = mp.Pool(processes=pool_size)
    for sub_batch in pool_sub_batch:
      pool.apply_async(worker, args = (sub_batch), callback = worker_callback)
    pool.close()
    pool.join()
    # Loop, more slices


回答4:

Apart from db.reset_queries() and DEBUG = False tricks, here is another approach: Just spawn another process that performs the django query and feeds the queue. This process will work in its own memory context, and after performing your task it will release back your memory.

I believe that sometimes (if not always) it's inevitable to control memory issues with a long running process that performs heavy django transactions.