Problem with multi threaded Python app and socket

2019-01-02 15:58发布

I'm investigating a problem with a Python app running on an Ubuntu machine with 4G of RAM. The tool will be used to audit servers (we prefer to roll our own tools). It uses threads to connect to lots of servers and many of the TCP connections fail. However, if I add a delay of 1 second between kicking off each thread then most connections succeed. I have used this simple script to investigate what may be happening:

#!/usr/bin/python

import sys
import socket
import threading
import time

class Scanner(threading.Thread):
    def __init__(self, host, port):
        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.status = ""

    def run(self):
        self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sk.settimeout(20)
        try:
            self.sk.connect((self.host, self.port))
        except Exception, err:
            self.status = str(err)
        else:
            self.status = "connected"
        finally:
            self.sk.close()


def get_hostnames_list(filename):
    return open(filename).read().splitlines()

if (__name__ == "__main__"):
    hostnames_file = sys.argv[1]
    hosts_list = get_hostnames_list(hostnames_file)
    threads = []
    for host in hosts_list:
        #time.sleep(1)
        thread = Scanner(host, 443)
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()
        print "Host: ", thread.host, " : ", thread.status

If I run this with the time.sleep(1) commented out against, say, 300 hosts many of the connections fail with a timeout error, whereas they don't timeout if I put the delay of one second in. I did try the app on another Linux distro running on a more powerful machine and there weren't as many connect errors? Is it due to a kernel limitation? Is there anything I can do to get the connection to work without putting in the delay?

UPDATE

I have also tried a program that limited the number of threads available in a pool. By reducing this down to 20 I can get all connects to work, but it only checks about 1 host a second. So whatever I try (putting in a sleep(1) or limiting the number of concurrent threads) I don't seem to able to check more than 1 host every second.

UPDATE

I just found this question which seems similar to what I am seeing.

UPDATE

I wonder if writing this using twisted might help? Could anyone show what my example would look like written using twisted?

5条回答
一个人的天荒地老
2楼-- · 2019-01-02 16:15

I wonder if writing this using twisted might help? Could anyone show what my example would look like written using twisted?

This variant is much faster than the code that uses gevent:

#!/usr/bin/env python
import sys
from timeit import default_timer as timer

from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.python   import log

info = log.msg

class NoopProtocol(protocol.Protocol):
    def makeConnection(self, transport):
        transport.loseConnection()

def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30):
    info("connecting %s" % host)
    cc = protocol.ClientCreator(reactor, NoopProtocol)
    d = cc.connectSSL(host, port, contextFactory, timeout)
    d.addCallbacks(lambda _: info("done %s" % host),
                   lambda f: info("error %s reason: %s" % (host, f.value)))
    return d

def n_at_a_time(it, n):
    """Iterate over `it` concurently `n` items at a time.

    `it` - an iterator creating Deferreds
    `n`  - number of concurrent iterations
    return a deferred that fires on completion
    """
    return defer.DeferredList([task.coiterate(it) for _ in xrange(n)])

def main():
    try:
        log.startLogging(sys.stderr, setStdout=False)

        info("getting hostname list")
        hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
        hosts_list = open(hosts_file).read().splitlines()

        info("spawning jobs")
        start = timer()        
        jobs = (connect(host, 443, timeout=2) for host in hosts_list)
        d = n_at_a_time(jobs, n=20) # limit number of simultaneous connections
        d.addCallback(lambda _: info("%d hosts took us %.2g seconds" % (
            len(hosts_list), timer() - start)))
        d.addBoth(lambda _: (info("the end"), reactor.stop()))
    except:
        log.err()
        reactor.stop()

if __name__=="__main__":
    reactor.callWhenRunning(main)
    reactor.run()

Here's a variant that uses t.i.d.inlineCallbacks. It requires Python 2.5 or newer. It allows to write the asynchronous code in a synchronous (blocking) manner:

#!/usr/bin/env python
import sys
from timeit import default_timer as timer

from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.python   import log

info = log.msg

class NoopProtocol(protocol.Protocol):
    def makeConnection(self, transport):
        transport.loseConnection()

@defer.inlineCallbacks
def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30):
    info("connecting %s" % host)
    cc = protocol.ClientCreator(reactor, NoopProtocol)
    try:
        yield cc.connectSSL(host, port, contextFactory, timeout)
    except Exception, e:
        info("error %s reason: %s" % (host, e))
    else:
        info("done %s" % host)

def n_at_a_time(it, n):
    """Iterate over `it` concurently `n` items at a time.

    `it` - an iterator creating Deferreds
    `n`  - number of concurrent iterations
    return a deferred that fires on completion
    """
    return defer.DeferredList([task.coiterate(it) for _ in xrange(n)])

@defer.inlineCallbacks
def main():
    try:
        log.startLogging(sys.stderr, setStdout=False)

        info("getting hostname list")
        hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
        hosts_list = open(hosts_file).read().splitlines()

        info("spawning jobs")
        start = timer()        
        jobs = (connect(host, 443, timeout=2) for host in hosts_list)
        yield n_at_a_time(jobs, n=20) # limit number of simultaneous connections
        info("%d hosts took us %.2g seconds" % (len(hosts_list), timer()-start))
        info("the end")
    except:
        log.err()
    finally:
        reactor.stop()

if __name__=="__main__":
    reactor.callWhenRunning(main)
    reactor.run()
查看更多
弹指情弦暗扣
3楼-- · 2019-01-02 16:17

How about a real threadpool?

#!/usr/bin/env python3

# http://code.activestate.com/recipes/577187-python-thread-pool/

from queue import Queue
from threading import Thread

class Worker(Thread):
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try: func(*args, **kargs)
            except Exception as exception: print(exception)
            self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        self.tasks.join()

Example:

import threadpool
pool = threadpool.ThreadPool(20) # 20 threads
pool.add_task(print, "test")
pool.wait_completion()

It's in python 3, but shouldn't be too hard to convert to 2.x. I am not surprised if this fixes your problem.

查看更多
像晚风撩人
4楼-- · 2019-01-02 16:17

First of all, try using nonblocking sockets. Another reason would be that you are consuming all of the ephemeral ports. Try removing the limit on that.

查看更多
旧时光的记忆
5楼-- · 2019-01-02 16:25

Python 3.4 introduces new provisional API for asynchronous IO -- asyncio module.

This approach is similar to twisted-based answer:

#!/usr/bin/env python3.4
import asyncio
import logging
from contextlib import closing

class NoopProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        transport.close()

info = logging.getLogger().info

@asyncio.coroutine
def connect(loop, semaphor, host, port=443, ssl=True, timeout=15):
    try:
        with (yield from semaphor):
            info("connecting %s" % host)
            done, pending = yield from asyncio.wait(
                [loop.create_connection(NoopProtocol, host, port, ssl=ssl)],
                loop=loop, timeout=timeout)
            if done:
                next(iter(done)).result()
    except Exception as e:
        info("error %s reason: %s" % (host, e))
    else:
        if pending:
            info("error %s reason: timeout" % (host,))
            for ft in pending:
                ft.cancel()
        else:
            info("done %s" % host)

@asyncio.coroutine
def main(loop):
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
    limit, timeout, hosts = parse_cmdline()

    # connect `limit` concurrent connections
    sem = asyncio.BoundedSemaphore(limit)
    coros = [connect(loop, sem, host, timeout=timeout) for host in hosts]
    if coros:
        yield from asyncio.wait(coros, loop=loop)

if __name__=="__main__":
    with closing(asyncio.get_event_loop()) as loop:
        loop.run_until_complete(main(loop))

As well as twisted variant it uses NoopProtocol that does nothing but disconnects immediately on successful connection.

Number of concurrent connections is limited using a semaphore.

The code is coroutine-based.

Example

To find out how many successful ssl connections we can make to the first 1000 hosts from top million Alexa list:

$ curl -O http://s3.amazonaws.com/alexa-static/top-1m.csv.zip
$ unzip *.zip
$ /usr/bin/time perl -nE'say $1 if /\d+,([^\s,]+)$/' top-1m.csv | head -1000 |\
    python3.4 asyncio_ssl.py - --timeout 60 |& tee asyncio.log

The result is less than half of all connections are successful. On average, it checks ~20 hosts per second. Many sites timed out after a minute. If host doesn't match hostnames from server's certificate then the connection also fails. It includes example.com vs. www.example.com -like comparisons.

查看更多
君临天下
6楼-- · 2019-01-02 16:28

You could try gevent:

from gevent.pool import Pool    
from gevent import monkey; monkey.patch_all() # patches stdlib    
import sys
import logging    
from httplib import HTTPSConnection
from timeit import default_timer as timer    
info = logging.getLogger().info

def connect(hostname):
    info("connecting %s", hostname)
    h = HTTPSConnection(hostname, timeout=2)
    try: h.connect()
    except IOError, e:
        info("error %s reason: %s", hostname, e)
    else:
        info("done %s", hostname)
    finally:
        h.close()

def main():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")    
    info("getting hostname list")
    hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
    hosts_list = open(hosts_file).read().splitlines()    
    info("spawning jobs")
    pool = Pool(20) # limit number of concurrent connections
    start = timer()
    for _ in pool.imap(connect, hosts_list):
        pass
    info("%d hosts took us %.2g seconds", len(hosts_list), timer() - start)

if __name__=="__main__":
    main()

It can process more than one host per second.

Output

2011-01-31 11:08:29,052 getting hostname list
2011-01-31 11:08:29,052 spawning jobs
2011-01-31 11:08:29,053 connecting www.yahoo.com
2011-01-31 11:08:29,053 connecting www.abc.com
2011-01-31 11:08:29,053 connecting www.google.com
2011-01-31 11:08:29,053 connecting stackoverflow.com
2011-01-31 11:08:29,053 connecting facebook.com
2011-01-31 11:08:29,054 connecting youtube.com
2011-01-31 11:08:29,054 connecting live.com
2011-01-31 11:08:29,054 connecting baidu.com
2011-01-31 11:08:29,054 connecting wikipedia.org
2011-01-31 11:08:29,054 connecting blogspot.com
2011-01-31 11:08:29,054 connecting qq.com
2011-01-31 11:08:29,055 connecting twitter.com
2011-01-31 11:08:29,055 connecting msn.com
2011-01-31 11:08:29,055 connecting yahoo.co.jp
2011-01-31 11:08:29,055 connecting taobao.com
2011-01-31 11:08:29,055 connecting google.co.in
2011-01-31 11:08:29,056 connecting sina.com.cn
2011-01-31 11:08:29,056 connecting amazon.com
2011-01-31 11:08:29,056 connecting google.de
2011-01-31 11:08:29,056 connecting google.com.hk
2011-01-31 11:08:29,188 done www.google.com
2011-01-31 11:08:29,189 done google.com.hk
2011-01-31 11:08:29,224 error wikipedia.org reason: [Errno 111] Connection refused
2011-01-31 11:08:29,225 done google.co.in
2011-01-31 11:08:29,227 error msn.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,228 error live.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,250 done google.de
2011-01-31 11:08:29,262 done blogspot.com
2011-01-31 11:08:29,271 error www.abc.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,465 done amazon.com
2011-01-31 11:08:29,467 error sina.com.cn reason: [Errno 111] Connection refused
2011-01-31 11:08:29,496 done www.yahoo.com
2011-01-31 11:08:29,521 done stackoverflow.com
2011-01-31 11:08:29,606 done youtube.com
2011-01-31 11:08:29,939 done twitter.com
2011-01-31 11:08:33,056 error qq.com reason: timed out
2011-01-31 11:08:33,057 error taobao.com reason: timed out
2011-01-31 11:08:33,057 error yahoo.co.jp reason: timed out
2011-01-31 11:08:34,466 done facebook.com
2011-01-31 11:08:35,056 error baidu.com reason: timed out
2011-01-31 11:08:35,057 20 hosts took us 6 seconds
查看更多
登录 后发表回答