A few days ago I has asked a question on SO about helping me design a paradigm for structuring multiple HTTP requests
Here's the scenario. I would like a have a multi-producer, multi-consumer system. My producers crawl and scrape a few sites and add the links that it finds into a queue. Since I'll be crawling multiple sites, I would like to have multiple producers/crawlers.
The consumers/workers feed off this queue, make TCP/UDP requests to these links and saves the results to my Django DB. I would also like to have multiple-workers as each queue item is totally independent of each other.
People suggested that use a coroutine library for this i.e. Gevent or Eventlet. Having never worked with coroutines, I read that even though the programming paradigm is similar to threaded paradigms, only one thread is actively executing but when blocking calls occur - such as I/O calls - the stacks are switched in-memory and the other green thread takes over until it encounters some sort of a blocking I/O call. Hopefully I got this right? Here's the code from one of my SO posts:
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
producers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
print "Got item %s" % item
do_work(wid, item)
finally:
print "No more items"
q.task_done()
def producer():
while True:
item = random.randint(1, 11)
if item == 10:
print "Signal Received"
return
else:
print "Added item %s" % item
q.put(item)
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
# This doesn't work.
for j in range(2):
producers.append(gevent.spawn(producer))
# Uncommenting this makes this script work.
# producer()
q.join()
This works well because the sleep
calls are blocking calls and when a sleep
event occurs, another green thread takes over. This is a lot faster than sequential execution.
As you can see, I don't have any code in my program that purposely yields the execution of one thread to another thread. I fail to see how this fits into scenario above as I would like to have all the threads executing simultaneously.
All works fine, but I feel the throughput that I've achieved using Gevent/Eventlets is higher than the original sequentially running program but drastically lower than what could be achieved using real-threading.
If I were to re-implement my program using threading mechanisms, each of my producers and consumers could simultaneously be working without the need to swap stacks in and out like coroutines.
Should this be re-implemented using threading? Is my design wrong? I've failed to see the real benefits of using coroutines.
Maybe my concepts are little muddy but this is what I've assimilated. Any help or clarification of my paradigm and concepts would be great.
Thanks
As you can see, I don't have any code in my program that purposely
yields the execution of one thread to another thread. I fail to see
how this fits into scenario above as I would like to have all the
threads executing simultaneously.
There is a single OS thread but several greenlets. In your case gevent.sleep()
allows workers to execute concurrently. Blocking IO calls such as urllib2.urlopen(url).read()
do the same if you use urllib2
patched to work with gevent
(by calling gevent.monkey.patch_*()
).
See also A Curious Course on Coroutines and Concurrency to understand how a code can work concurrently in a single threaded environment.
To compare throughput differences between gevent, threading, multiprocessing you could write the code that compatible with all aproaches:
#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes
if concurrency_impl == 'gevent':
import gevent.monkey; gevent.monkey.patch_all()
import logging
import time
import random
from itertools import count, islice
info = logging.info
if concurrency_impl in ['gevent', 'threading']:
from Queue import Queue as JoinableQueue
from threading import Thread
if concurrency_impl == 'multiprocessing':
from multiprocessing import Process as Thread, JoinableQueue
The rest of the script is the same for all concurrency implementations:
def do_work(wid, value):
time.sleep(random.randint(0,2))
info("%d Task %s done" % (wid, value))
def worker(wid, q):
while True:
item = q.get()
try:
info("%d Got item %s" % (wid, item))
do_work(wid, item)
finally:
q.task_done()
info("%d Done item %s" % (wid, item))
def producer(pid, q):
for item in iter(lambda: random.randint(1, 11), 10):
time.sleep(.1) # simulate a green blocking call that yields control
info("%d Added item %s" % (pid, item))
q.put(item)
info("%d Signal Received" % (pid,))
Don't execute code at a module level put it in main()
:
def main():
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(process)d %(message)s")
q = JoinableQueue()
it = count(1)
producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
for t in producers+workers:
t.daemon = True
t.start()
for t in producers: t.join() # put items in the queue
q.join() # wait while it is empty
# exit main thread (daemon workers die at this point)
if __name__=="__main__":
main()
gevent is great when you have very many (green) threads. I tested it with thousands and it worked very well. you have make sure all libraries you use both for scraping and for saving to the db get green. afaik if they use python's socket, gevent injection ought to work. extensions written in C (e.g. mysqldb) would block however and you'd need to use green equivalents instead.
if you use gevent you could mostly do away with queues, spawn new (green) thread for every task, code for the thread being as simple as db.save(web.get(address))
. gevent will take care of preemption when some library in db or web blocks. it will work as long as your tasks fit in memory.
In this case, your problem is not with program speed (i.e choice of gevent or threading), but network IO throughput. That's (should be) the bottleneck that determines how fast the program runs.
Gevent is one nice way to make sure that is the bottleneck, and not your program's architecture.
This is the sort of process you'd want:
import gevent
from gevent.queue import Queue, JoinableQueue
from gevent.monkey import patch_all
patch_all() # Patch urllib2, etc
def worker(work_queue, output_queue):
for work_unit in work_queue:
finished = do_work(work_unit)
output_queue.put(finished)
work_queue.task_done()
def producer(input_queue, work_queue):
for url in input_queue:
url_list = crawl(url)
for work in url_list:
work_queue.put(work)
input_queue.task_done()
def do_work(work):
gevent.sleep(0) # Actually proces link here
return work
def crawl(url):
gevent.sleep(0)
return list(url) # Actually process url here
input = JoinableQueue()
work = JoinableQueue()
output = Queue()
workers = [gevent.spawn(worker, work, output) for i in range(0, 10)]
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)]
list_of_urls = ['foo', 'bar']
for url in list_of_urls:
input.put(url)
# Wait for input to finish processing
input.join()
print 'finished producing'
# Wait for workers to finish processing work
work.join()
print 'finished working'
# We now have output!
print 'output:'
for message in output:
print message
# Or if you'd like, you could use the output as it comes!
You don't need to wait for input and work queues to finish, I've just demonstrated that here.