python socket.connect timeout error in mulithread

2019-03-27 10:37发布

问题:

Like the following, I'd like to communicate with many PC's in a specific IP range.

My PC ---+------> Client A PC
         +------> Client B PC
         +------> Client C PC
         .................
         +------> Client Z PC

Because there are too many clients to communicate, I tried it by mulit-threading. socket.connect() continuously produces time-out error. If I try it in a single-thread, there's no problem.

I googled and found the below :

Python Interpreter blocks Multithreaded DNS requests?

saying that in some platform, socket module could be thread unsafe.

So I changed my code into multi-processing. However it still produces the same error.

In the following code sample, test_single() finishes normal. test_mp() and test_mt() both make time-out error.

Have you ever experienced such abnormal behavior? The testing environment is Windows XP SP3, python 2.5.4. Also tried on python 2.6.6 and 2.7.0, same error.

import multiprocessing
import Queue
import socket
import threading

PROCESS_NUM = 5
PORT = 8888

def search_proc(ip):
    try:
        csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        csock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        csock.settimeout(5.0)
        csock.connect((ip, PORT))
        csock.shutdown(socket.SHUT_RDWR)
        csock.close()
        return ip, "ok"
    except socket.error, msg:
        return ip, "fail", msg

def mp_connect(ip_range):
    pool = multiprocessing.Pool( PROCESS_NUM )
    for output in pool.imap_unordered(search_proc, ip_range):
        print output

def test_mp():
    ip_range = []
    for i in range(256):
        ip_range.append("192.168.123.%d"%(i,))

    mp_connect(ip_range)

def test_mt():
    def search_thread(ip_queue):
        while True:
            ip = ip_queue.get()
            print search_proc(ip)
            ip_queue.task_done()
    ip_queue = Queue.Queue()

    for i in range(256):
        ip_queue.put("192.168.123.%d"%(i,))

    for i in range(PROCESS_NUM):
        th = threading.Thread(target=search_thread, args=(ip_queue,))
        th.setDaemon(True)
        th.start()

    ip_queue.join()

def test_single():
    ip_range = []
    for i in range(256):
        print search_proc("192.168.123.%d"%(i,))

if __name__ == "__main__":
    multiprocessing.freeze_support()
    test_mp()
    #test_single()
    #test_mt()

回答1:

David Beazley has done some great research around the Python GIL and how that affects IO and multithreading. You can find information about his research here, here.