Python threading with queue: how to avoid to use j

2020-05-07 09:34发布

I have a scenario with 2 threads:

  1. a thread waiting for messages from a socket (embedded in a C library - blocking call is "Barra.ricevi") then putting an element on a queue

  2. a thread waiting to get element from the queue and do something

Sample code

import Barra
import Queue    
import threading

posQu = Queue.Queue(maxsize=0)

def threadCAN():
    while True:
        canMsg = Barra.ricevi("can0")
        if canMsg[0] == 'ERR':
            print (canMsg)
        else:
            print ("Enqueued message"), canMsg
            posQu.put(canMsg)

thCan = threading.Thread(target = threadCAN)
thCan.daemon = True
thCan.start()

while True:
    posMsg = posQu.get()
    print ("Messagge from the queue"), posMsg

The result is that every time a new message is coming from the socket a new element is added to the queue, BUT the main thread that should get items from the queue is never woke up.

The output is as follow:

Enqueued message

Enqueued message

Enqueued message

Enqueued message

I expected to have:

Enqueued message

Messagge from the queue

Enqueued message

Messagge from the queue

The only way to solve this issue seams to add the line:

posQu.join()

at the end of the thread waiting for messages from the socket, and the line:

posQu.task_done()

at the end of the main thread.

In this case, after that a new message has been received from the socket, the thread is blocking waiting for the main thread to process the enqueued item.

Unfortunately this isn't the desired behavior since I would like a thread always ready to get messages from a socket and not waiting for a job to be compleated from another thread.

What I am doing wrong ? Thanks

Andrew (Italy)

1条回答
祖国的老花朵
2楼-- · 2020-05-07 10:00

This is likely because your Barra does not release the global interpreter lock (GIL) when Barra.ricevi. You may want to check this though.

The GIL ensures that only one thread can run at any one time (limiting the usefulness of threads in a multi-processor system). The GIL switches threads every 100 "ticks" -- a tick loosely mapping to bytecode instructions. See here for more details.

In your producer thread, not much happens outside of the C-library call. This means the producer thread will get to call Barra.ricevi a great many times before the GIL switches to another thread.

Solutions to this are to, in terms of increasing complexity:

  • Call time.sleep(0) after adding an item to the queue. This yields the thread so that another thread can run.
  • Use sys.setcheckinterval() to lower the amount of "ticks" executed before switching threads. This is will come at the cost of making the program much more computationally expensive.
  • Use multiprocessing rather than threading. This includes using multiprocessing.Queue instead of Queue.Queue.
  • Modify Barra so that it does release the GIL when its functions are called.

Example using multiprocessing. Be aware that when using multiprocessing, your processes no longer have an implied shared state. You will need to have a look at multiprocessing to see how to pass information between processes.

import Barra  
import multiprocessing

def threadCAN(posQu):
    while True:
        canMsg = Barra.ricevi("can0")
        if canMsg[0] == 'ERR':
            print(canMsg)
        else:
            print("Enqueued message", canMsg)
            posQu.put(canMsg)

if __name__ == "__main__":
    posQu = multiprocessing.Queue(maxsize=0)
    procCan = multiprocessing.Process(target=threadCAN, args=(posQu,))
    procCan.daemon = True
    procCan.start()

    while True:
        posMsg = posQu.get()
        print("Messagge from the queue", posMsg)
查看更多
登录 后发表回答