How to bind some variable to thread with concurren

2019-06-12 15:32发布

What I want to do is something like this:

class MyThread(threading.Thread):
    def __init__(self, host, port):
        threading.Thread.__init__(self)
        # self._sock = self.initsocket(host, port)
        self._id = random.randint(0, 100)

    def run(self):
        for i in range(3):
            print("current id: {}".format(self._id))

def main():
    ts = []
    for i in range(5):
        t = MyThread("localhost", 3001)
        t.start()
        ts.append(t)

    for t in ts:
        t.join()

I got these output:

current id: 10
current id: 10
current id: 13
current id: 43
current id: 13
current id: 10
current id: 83
current id: 83
current id: 83
current id: 13
current id: 98
current id: 43
current id: 98
current id: 43
current id: 98

This output is what I want. As you can see, my _id is different in different threads, but in single thread, I share the same _id.(_id is just one of these variables, I have many other similar variable).

Now, I want to do the same thing with multiprocessing.pool.ThreadPool

class MyProcessor():
    def __init__(self, host, port):
        # self._sock = self.initsocket(host, port)
        self._id = random.randint(0, 100)

    def __call__(self, i):
        print("current id: {}".format(self._id))
        return self._id * i

def main():
    with ThreadPool(5) as p:
        p.map(MyProcessor("localhost", 3001), range(15))

But now _id will be share by all threads:

current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58

And with concurrent.futures.ThreadPoolExecutor, I also try to do the same thing:

class MyProcessor():
    def __init__(self, host, port):
        # self.initsocket(host, port)
        self._id = random.randint(0, 100)

    def __call__(self, i):
        print("current id: {}".format(self._id))
        return self._id * i

def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        func = MyProcessor("localhost", 3001)
        futures = [executor.submit(func, i) for i in range(15)]
        for f in as_completed(futures):
            pass

Output is this:

current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94

Of course, I get this result is not strange, because I just call __init__ one time. But what I am asking is that:

How can I do the same thing with concurrent.futures.ThreadPoolExecutor and multiprocessing.pool.ThreadPool(and also please with no more global variable).

1条回答
劳资没心,怎么记你
2楼-- · 2019-06-12 16:21

There are a couple of issues going on here, and I will do my best to address all of them.

In the first example you give, you have full control over all the Threads that you create, and so each thread gets a unique ID in the initializer. The problem there is of course that you start all the threads at once, which is probably very inefficient for a large number of threads.

In both of the thread pool examples in the question, you initialize the ID once for the callable object, so of course you don't have separate IDs per thread. The correct way to do it would be to initialize an ID per thread, by doing it in the __call__ method:

class MyProcessor():
    def __init__(self, host, port):
        self.initsocket(host, port)

    def __call__(self, i):
        id_ = random.randint(0, 100)
        print("current id: {}".format(id_))
        return id_ * i

def main():
    func = MyProcessor("localhost", 3001)
    with ThreadPoolExecutor(max_workers=5) as executor:
        collections.deque(executor.map(MyProcessor, range(15)), maxlen=0)

Notice that you can shorten the concurrent.futures.ThreadPoolExecutor example by using the map method there as well, if all you care about is the final result and not the intermediate Future objects. The deque(..., maxlen=0) call is a standard idiom for consuming an iterator.

Given the gist you linked to in your comments, I understand why you want to have thread-local data. However, you certainly do not need a global variable to achieve that result. Here are a couple of alternatives:

  1. Just add your thread-local data to self in the initializer, and voila, it is accessible to all calls without being global:

    def __init__(self, host, port):
        self.thread_local = threading.local()
    
    def __call__(self, i):
        try:
            id_ = self.thread_local.id_
        except AttributeError:
            id_ = random.randint(0, 100)
        ...
    
  2. Use function-local data instead of thread-local data. You are using thread-local data to avoid passing your connection (in the gist) to some private functions. That is not a real need, just an aesthetic choice. You can always have def _send_data(self, conn, **kwargs) and def _recv_data(self, conn), since the only place a connection actually comes from is __call__ anyway.

While there may be situations where option #1 is a possibility, I highly recommend that you do not use it with any type of thread pool manager. A thread pool may reuse the same thread to run tasks sequentially from the queue that they are submitted to. This means that you will end up with the same connection in a task that should have opened its own. It would have been fine in your original example, where you create all your threads independently, but it may not be fine when you have multiple calls to MyProcessor on a recycled pool thread.

查看更多
登录 后发表回答