Python Queues memory leaks when called inside thre

2019-04-11 14:04发布

I have python TCP client and need to send media(.mpg) file in a loop to a 'C' TCP server.

I have following code, where in separate thread I am reading the 10K blocks of file and sending it and doing it all over again in loop, I think it is because of my implementation of thread module, or tcp send. I am using Queues to print the logs on my GUI ( Tkinter ) but after some times it goes out of memory..

UPDATE 1 - Added more code as requested

Thread class "Sendmpgthread" used to create thread to send data

.
. 
def __init__ ( self, otherparams,MainGUI):
    .
    .
    self.MainGUI = MainGUI
    self.lock = threading.Lock()
    Thread.__init__(self)

#This is the one causing leak, this is called inside loop
def pushlog(self,msg):
    self.MainGUI.queuelog.put(msg)

def send(self, mysocket, block):
    size = len(block)
    pos = 0;
    while size > 0:
        try:
            curpos = mysocket.send(block[pos:])
        except socket.timeout, msg:
            if self.over:
                 self.pushlog(Exit Send)
                return False
        except socket.error, msg:
            print 'Exception'     
            return False  
        pos = pos + curpos
        size = size - curpos
    return True

def run(self):
    media_file = None
    mysocket = None 

    try:
        mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        mysocket.connect((self.ip, string.atoi(self.port)))
        media_file = open(self.file, 'rb') 

        while not self.over:
            chunk = media_file.read(10000)
            if not chunk:   # EOF Reset it
                print 'resetting stream'
                media_file.seek(0, 0)
                continue
            if not self.send(mysocket, chunk): # If some error or thread is killed 
                break;

            #disabling this solves the issue
            self.pushlog('print how much data sent')       

    except socket.error, msg:
        print 'print exception'
    except Exception, msg:
        print 'print exception'

    try:
        if media_file is not None:
            media_file.close()
            media_file = None            
        if mysocket is not None:
            mysocket.close()
            mysocket = None
    finally:
            print 'some cleaning'   

def kill(self):
    self.over = True

I figured out that it is because of wrong implementation of Queue as commenting that piece resolves the issue

UPDATE 2 - MainGUI class which is called from above Thread class

class MainGUI(Frame):
    def __init__(self, other args):
       #some code
       .
       .
        #from the above thread class used to send data
        self.send_mpg_status = Sendmpgthread(params)
        self.send_mpg_status.start()     
        self.after(100, self.updatelog)
        self.queuelog = Queue.Queue()

    def updatelog(self):
       try:
           msg = self.queuelog.get_nowait() 

           while msg is not None:
               self.printlog(msg)
               msg = self.queuelog.get_nowait() 
        except Queue.Empty:
           pass

        if self.send_mpg_status: # only continue when sending   
            self.after(100, self.updatelog)

    def printlog(self,msg):
        #print in GUI

4条回答
一纸荒年 Trace。
2楼-- · 2019-04-11 14:17

I can't see anything obviously wrong with your code snippet.

To reduce memory usage a bit under Python 2.7, I'd use buffer(block, pos) instead of block[pos:]. Also I'd use mysocket.sendall(block) instead of your send method.

If the ideas above don't solve your problem, then the bug is most probably elsewhere in your code. Could you please post the shortest possible version of the full Python script which still grows out-of-memory (http://sscce.org/)? That increases your change of getting useful help.

查看更多
疯言疯语
3楼-- · 2019-04-11 14:30

Out of memory errors are indicative of data being generated but not consumed or released. Looking through your code I would guess these two areas:

  • Messages are being pushed onto a Queue.Queue() instance in the pushlog method. Are they being consumed?
  • The MainGui printlog method may be writing text somewhere. eg. Is it continually writing to some kind of GUI widget without any pruning of messages?

From the code you've posted, here's what I would try:

  1. Put a print statement in updatelog. If this is not being continually called for some reason such as a failed after() call, then the queuelog will continue to grow without bound.
  2. If updatelog is continually being called, then turn your focus to printlog. Comment the contents of this function to see if out of memory errors still occur. If they don't, then something in printlog may be holding on to the logged data, you'll need to dig deeper to find out what.

Apart from this, the code could be cleaned up a bit. self.queuelog is not created until after the thread is started which gives rise to a race condition where the thread may try to write into the queue before it has been created. Creation of queuelog should be moved to somewhere before the thread is started.

updatelog could also be refactored to remove redundancy:

def updatelog(self):
       try:
           while True:
               msg = self.queuelog.get_nowait() 
               self.printlog(msg)
        except Queue.Empty:
           pass

And I assume the the kill function is called from the GUI thread. To avoid thread race conditions, the self.over should be a thread safe variable such as a threading.Event object.

def __init__(...):
    self.over = threading.Event()

def kill(self):
    self.over.set()
查看更多
走好不送
4楼-- · 2019-04-11 14:35

There is no data piling up in your TCP sending loop.

Memory error is probably caused by logging queue, as you have not posted complete code try using following class for logging:

from threading import Thread, Event, Lock
from time import sleep, time as now


class LogRecord(object):
    __slots__ = ["txt", "params"]
    def __init__(self, txt, params):
        self.txt, self.params = txt, params

class AsyncLog(Thread):
    DEBUGGING_EMULATE_SLOW_IO = True

    def __init__(self, queue_max_size=15, queue_min_size=5):
        Thread.__init__(self)
        self.queue_max_size, self.queue_min_size = queue_max_size, queue_min_size
        self._queuelock = Lock()
        self._queue = []            # protected by _queuelock
        self._discarded_count = 0   # protected by _queuelock
        self._pushed_event = Event()
        self.setDaemon(True)
        self.start()

    def log(self, message, **params):
        with self._queuelock:
            self._queue.append(LogRecord(message, params))
            if len(self._queue) > self.queue_max_size:
                # empty the queue:
                self._discarded_count += len(self._queue) - self.queue_min_size
                del self._queue[self.queue_min_size:] # empty the queue instead of creating new list (= [])
            self._pushed_event.set()

    def run(self):
        while 1: # no reason for exit condition here
            logs, discarded_count = None, 0
            with self._queuelock:
                if len(self._queue) > 0:
                    # select buffered messages for printing, releasing lock ASAP
                    logs = self._queue[:]
                    del self._queue[:]
                    self._pushed_event.clear()
                    discarded_count = self._discarded_count
                    self._discarded_count = 0
            if not logs:
                self._pushed_event.wait()
                self._pushed_event.clear()
                continue
            else:
                # print logs
                if discarded_count:
                    print ".. {0} log records missing ..".format(discarded_count)
                for log_record in logs:
                    self.write_line(log_record)
                if self.DEBUGGING_EMULATE_SLOW_IO:
                    sleep(0.5)

    def write_line(self, log_record):
        print log_record.txt, " ".join(["{0}={1}".format(name, value) for name, value in log_record.params.items()])



if __name__ == "__main__":
    class MainGUI:
        def __init__(self):
            self._async_log = AsyncLog()
            self.log = self._async_log.log # stored as bound method

        def do_this_test(self):
            print "I am about to log 100 times per sec, while text output frequency is 2Hz (twice per second)"

            def log_100_records_in_one_second(itteration_index):
                for i in xrange(100):
                    self.log("something happened", timestamp=now(), session=3.1415, itteration=itteration_index)
                    sleep(0.01)

            for iter_index in range(3):
                log_100_records_in_one_second(iter_index)

    test = MainGUI()
    test.do_this_test()

I have noticed that you do not sleep() anywhere in the sending loop, this means data is read as fast as it can and is sent as fast as it can. Note that this is not desirable behavior when playing media files - container time-stamps are there to dictate data-rate.

查看更多
啃猪蹄的小仙女
5楼-- · 2019-04-11 14:38

Since printlog is adding to a tkinter text control, the memory occupied by that control will grow with each message (it has to store all the log messages in order to display them).

Unless storing all the logs is critical, a common solution is to limit the maximum number of log lines displayed.

A naive implementation is to eliminate extra lines from the begining after the control reaches a maximum number of messages. Add a function to get the number of lines in the control and then, in printlog something similar to:

while getnumlines(self.edit) > self.maxloglines:
    self.edit.delete('1.0', '1.end')

(above code not tested)

update: some general guidelines

Keep in mind that what might look like a memory leak does not always mean that a function is wrong, or that the memory is no longer accessible. Many times there is missing cleanup code for a container that is accumulating elements.

A basic general approach for this kind of problems:

  • form an opinion on what part of the code might be causing the problem
  • check it by commenting that code out (or keep commenting code until you find a candidate)
  • look for containers in the responsible code, add code to print their size
  • decide what elements can be safely removed from that container, and when to do it
  • test the result
查看更多
登录 后发表回答