What I want to do is something like this:
class MyThread(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)
def run(self):
for i in range(3):
print("current id: {}".format(self._id))
def main():
ts = []
for i in range(5):
t = MyThread("localhost", 3001)
t.start()
ts.append(t)
for t in ts:
t.join()
I got these output:
current id: 10
current id: 10
current id: 13
current id: 43
current id: 13
current id: 10
current id: 83
current id: 83
current id: 83
current id: 13
current id: 98
current id: 43
current id: 98
current id: 43
current id: 98
This output is what I want. As you can see, my _id
is different in different threads, but in single thread, I share the same _id
.(_id
is just one of these variables, I have many other similar variable).
Now, I want to do the same thing with multiprocessing.pool.ThreadPool
class MyProcessor():
def __init__(self, host, port):
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)
def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i
def main():
with ThreadPool(5) as p:
p.map(MyProcessor("localhost", 3001), range(15))
But now _id
will be share by all threads:
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
And with concurrent.futures.ThreadPoolExecutor
, I also try to do the same thing:
class MyProcessor():
def __init__(self, host, port):
# self.initsocket(host, port)
self._id = random.randint(0, 100)
def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i
def main():
with ThreadPoolExecutor(max_workers=5) as executor:
func = MyProcessor("localhost", 3001)
futures = [executor.submit(func, i) for i in range(15)]
for f in as_completed(futures):
pass
Output is this:
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
Of course, I get this result is not strange, because I just call __init__
one time. But what I am asking is that:
How can I do the same thing with concurrent.futures.ThreadPoolExecutor
and multiprocessing.pool.ThreadPool
(and also please with no more global variable).
There are a couple of issues going on here, and I will do my best to address all of them.
In the first example you give, you have full control over all the
Thread
s that you create, and so each thread gets a unique ID in the initializer. The problem there is of course that you start all the threads at once, which is probably very inefficient for a large number of threads.In both of the thread pool examples in the question, you initialize the ID once for the callable object, so of course you don't have separate IDs per thread. The correct way to do it would be to initialize an ID per thread, by doing it in the
__call__
method:Notice that you can shorten the
concurrent.futures.ThreadPoolExecutor
example by using themap
method there as well, if all you care about is the final result and not the intermediateFuture
objects. Thedeque(..., maxlen=0)
call is a standard idiom for consuming an iterator.Given the gist you linked to in your comments, I understand why you want to have thread-local data. However, you certainly do not need a global variable to achieve that result. Here are a couple of alternatives:
Just add your thread-local data to
self
in the initializer, and voila, it is accessible to all calls without being global:Use function-local data instead of thread-local data. You are using thread-local data to avoid passing your connection (in the gist) to some private functions. That is not a real need, just an aesthetic choice. You can always have
def _send_data(self, conn, **kwargs)
anddef _recv_data(self, conn)
, since the only place a connection actually comes from is__call__
anyway.While there may be situations where option #1 is a possibility, I highly recommend that you do not use it with any type of thread pool manager. A thread pool may reuse the same thread to run tasks sequentially from the queue that they are submitted to. This means that you will end up with the same connection in a task that should have opened its own. It would have been fine in your original example, where you create all your threads independently, but it may not be fine when you have multiple calls to
MyProcessor
on a recycled pool thread.