Python - How can I make this code asynchronous?

2020-06-12 11:38发布

问题:

Here's some code that illustrates my problem:

def blocking1():
    while True:
        yield 'first blocking function example'

def blocking2():
    while True:
        yield 'second blocking function example'

for i in blocking1():
    print 'this will be shown'

for i in blocking2():
    print 'this will not be shown'

I have two functions which contain while True loops. These will yield data which I will then log somewhere (most likely, to an sqlite database).

I've been playing around with threading and have gotten it working. However, I don't really like it... What I would like to do is make my blocking functions asynchronous. Something like:

def blocking1(callback):
    while True:
        callback('first blocking function example')

def blocking2(callback):
    while True:
        callback('second blocking function example')

def log(data):
    print data

blocking1(log)
blocking2(log)

How can I achieve this in Python? I've seen the standard library comes with asyncore and the big name in this game is Twisted but both of these seem to be used for socket IO.

How can I async my non-socket related, blocking functions?

回答1:

A blocking function is a function which doesn't return, but still leaves your process idle - unable to complete more work.

You're asking us to make your blocking functions non-blocking. However – unless you're writing an operating system – you don't have any blocking functions. You might have functions which block because they make calls to blocking system calls, or you might have functions which "block" because they do a lot of computation.

Making the former type of function non-blocking is impossible without making the underlying system call non-blocking. Depending on what that system call is, it may be difficult to make it non-blocking without also adding an event loop to your program; you don't just need to make the call and have it not block, you also have to make another call to determine that the result of that call will be delivered somewhere you could associate it.

The answer to this question is a very long python program and a lot of explanations of different OS interfaces and how they work, but luckily I already wrote that answer on a different site; I called it Twisted. If your particular task is already supported by a Twisted reactor, then you're in luck. Otherwise, as long as your task maps to some existing operating system concept, you can extend a reactor to support it. Practically speaking there are only 2 of these mechanisms: file descriptors on every sensible operating system ever, and I/O Completion Ports on Windows.

In the other case, if your functions are consuming a lot of CPU, and therefore not returning, they're not really blocking; your process is still chugging along and getting work done. There are three ways to deal with that:

  • separate threads
  • separate processes
  • if you have an event loop, write a task that periodically yields, by writing the task in such a way that it does some work, then asks the event loop to resume it in the near future in order to allow other tasks to run.

In Twisted this last technique can be accomplished in various ways, but here's a syntactically convenient trick that makes it easy:

from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue

@inlineCallbacks
def slowButSteady():
    result = SomeResult()
    for something in somethingElse:
        result.workHardForAMoment(something)
        yield deferLater(reactor, 0, lambda : None)
    returnValue(result)


回答2:

You can use generators for cooperative multitasking, but you have to write your own main loop that passes control between them.

Here's a (very simple) example using your example above:

def blocking1():
    while True:
        yield 'first blocking function example'

def blocking2():
    while True:
        yield 'second blocking function example'


tasks = [blocking1(), blocking2()]

# Repeat until all tasks have stopped
while tasks:
    # Iterate through all current tasks. Use
    # tasks[:] to copy the list because we
    # might mutate it.
    for t in tasks[:]:
        try:
            print t.next()
        except StopIteration:
            # If the generator stops, remove it from the task list
            tasks.remove(t)

You could further improve it by allowing the generators to yield new generators, which then could be added to tasks, but hopefully this simplified example will give the general idea.



回答3:

The twisted framework is not just sockets. It has asynchronous adapters for many scenarios, including interacting with subprocesses. I recommend taking a closer look at that. It does what you are trying to do.



回答4:

If you don't want to use full OS threading, you might try Stackless, which is a variant of Python that adds many interesting features, including "microthreads". There are a number of good examples that you will find helpful.



回答5:

Your code isn’t blocking. blocking1() and it’s brother return iterators immediately (not blocking), and neither does a single iteration block (in your case).

If you want to “eat” from both iterators one-by-one, don’t make your program try to eat up “blocking1()” entirely, before continuing...

for b1, b2 in zip(blocking1(), blocking2()):
    print 'this will be shown', b1, 'and this, too', b2