The strength of Twisted (for python) is its asynchronous framework (I think). I've written an image processing server that takes requests via Perspective Broker. It works great as long as I feed it less than a couple hundred images at a time. However, sometimes it gets spiked with hundreds of images at virtually the same time. Because it tries to process them all concurrently the server crashes.
As a solution I'd like to queue up the remote_calls on the server so that it only processes ~100 images at a time. It seems like this might be something that Twisted already does, but I can't seem to find it. Any ideas on how to start implementing this? A point in the right direction? Thanks!
One ready-made option that might help with this is
twisted.internet.defer.DeferredSemaphore
. This is the asynchronous version of the normal (counting) semaphore you might already know if you've done much threaded programming.A (counting) semaphore is a lot like a mutex (a lock). But where a mutex can only be acquired once until a corresponding release, a (counting) semaphore can be configured to allow an arbitrary (but specified) number of acquisitions to succeed before any corresponding releases are required.
Here's an example of using
DeferredSemaphore
to run ten asynchronous operations, but to run at most three of them at once:DeferredSemaphore
also has explicitacquire
andrelease
methods, but therun
method is so convenient it's almost always what you want. It calls theacquire
method, which returns aDeferred
. To that firstDeferred
, it adds a callback which calls the function you passed in (along with any positional or keyword arguments). If that function returns aDeferred
, then to that secondDeferred
a callback is added which calls therelease
method.The synchronous case is handled as well, by immediately calling
release
. Errors are also handled, by allowing them to propagate but making sure the necessaryrelease
is done to leave theDeferredSemaphore
in a consistent state. The result of the function passed torun
(or the result of theDeferred
it returns) becomes the result of theDeferred
returned byrun
.Another possible approach might be based on
DeferredQueue
andcooperate
.DeferredQueue
is mostly like a normal queue, but itsget
method returns aDeferred
. If there happen to be no items in the queue at the time of the call, theDeferred
will not fire until an item is added.Here's an example:
Note that the
async
worker function is the same as the one from the first example. However, this time, there's also aworker
function which is explicitly pulling jobs out of theDeferredQueue
and processing them withasync
(by addingasync
as a callback to theDeferred
returned byget
). Theworker
generator is driven bycooperate
, which iterates it once after eachDeferred
it yields fires. The main loop, then, starts three of these worker generators so that three jobs will be in progress at any given time.This approach involves a bit more code than the
DeferredSemaphore
approach, but has some benefits which may be interesting. First,cooperate
returns aCooperativeTask
instance which has useful methods likepause
,resume
, and a couple others. Also, all jobs assigned to the same cooperator will cooperate with each other in scheduling, so as not to overload the event loop (and this is what gives the API its name). On theDeferredQueue
side, it's also possible to set limits on how many items are pending processing, so you can avoid completely overloading your server (for example, if your image processors get stuck and stop completing tasks). If the code callingput
handles the queue overflow exception, you can use this as pressure to try to stop accepting new jobs (perhaps shunting them to another server, or alerting an administrator). Doing similar things withDeferredSemaphore
is a bit trickier, since there's no way to limit how many jobs are waiting to be able to acquire the semaphore.You might also like the txRDQ (Resizable Dispatch Queue) I wrote. Google it, it's in the tx collection on LaunchPad. Sorry I don't have more time to reply - about to go onstage.
Terry