Asyncio function works when called from script but

2020-07-18 10:31发布

问题:

I'm a novice with Python and these libraries/modules. I'm writing a simple ping-test network scanner as a learning project.

I first developed a script using asyncio to ping addresses on a network

#ip_test.py
import asyncio
import ipaddress

async def ping(addr):
    proc = await asyncio.create_subprocess_exec(
        'ping','-W','1','-c','3',addr,
        stdout=asyncio.subprocess.PIPE
    )
    await proc.wait()
    return proc.returncode

async def pingMain(net):
    #hosts() returns list of Ipv4Address objects
    result = await asyncio.gather(*(ping(str(addr)) for addr in net.hosts()))
    return result

def getHosts(net_): #net_ is an Ipv4Network object
    return asyncio.run(pingMain(net_))
    #Returns list of response codes which I then zip with the list of searched ips

When I open python and run the following, it works as expected:

import ip_test as iptest
import ipaddress
print(iptest.getHosts(ipaddress.ip_network('192.168.1.0/29')))
#prints: [0, 0, 0, 1, 1, 1] as expected on this network

However, the ultimate goal is to take input from the user via form input (the results are recorded to a database, this is a simplified example for illustrative purposes). I collect the input via a flask route:

@app.route("/newscan",methods=['POST'])
def newScan():
    form = request.form
    networkstring = form.get('network') + "/" + form.get('mask')
    result = iptest.getHosts(ipaddress.ip_network(networkstring))
    return result

When I call the module this way, I get an error: Runtime Error: Cannot add child handler, the child watcher does not have a loop attached.

Why does this work when I import the module and run the function from the command line, but not when I call it with the same input from a flask route?

EDIT: Traceback:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2463, in __call__
  return self.wsgi_app(environ, start_response)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2449, in wsgi_app
  response = self.handle_exception(e)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1866, in handle_exception
  reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
  raise value
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2446, in wsgi_app
  response = self.full_dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
  rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1820, in handle_user_exception
  reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
  raise value
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
  rv = self.dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1935, in dispatch_request
  return self.view_functions[rule.endpoint](**req.view_args)
File "/app/app.py", line 41, in newScan
  result = iptest.getHosts(ipaddress.ip_network(networkstring))
File "/app/ip_test.py", line 22, in getHosts
  res = asyncio.run(pingMain(net_))
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
  return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
  return future.result()
File "/app/ip_test.py", line 15, in pingMain
  result = await asyncio.gather(*(ping(str(addr)) for addr in net.hosts()))
File "/app/ip_test.py", line 7, in ping
  stdout=asyncio.subprocess.PIPE
File "/usr/local/lib/python3.7/asyncio/subprocess.py", line 217, in create_subprocess_exec
  stderr=stderr, **kwds)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1529, in subprocess_exec
  bufsize, **kwargs)
File "/usr/local/lib/python3.7/asyncio/unix_events.py", line 193, in _make_subprocess_transport
  self._child_watcher_callback, transp)
File "/usr/local/lib/python3.7/asyncio/unix_events.py", line 930, in add_child_handler
  "Cannot add child handler, "
RuntimeError: Cannot add child handler, the child watcher does not have a loop attached

回答1:

You are trying to run your async subprocess from a thread other than the main thread. This requires some initial setup from the main thread, see the Subprocesses and Threads section of the asyncio Subprocesses documentation:

Standard asyncio event loop supports running subprocesses from different threads, but there are limitations:

  • An event loop must run in the main thread.
  • The child watcher must be instantiated in the main thread before executing subprocesses from other threads. Call the get_child_watcher() function in the main thread to instantiate the child watcher.

What is happening here is that your WSGI server is using multiple threads to handle incoming requests, so the request handler is not running on the main thread. But your code uses asyncio.run() to start a new event loop, and so your asyncio.create_subprocess_exec() call will fail as there is no child watcher on the main thread.

You'd have to start a loop (and not stop it) from the main thread, and call asyncio.get_child_watcher() on that thread, for your code not to fail:

# to be run on the main thread, set up a subprocess child watcher
assert threading.current_thread() is threading.main_thread()
asyncio.get_event_loop()
asyncio.get_child_watcher()

Note: this restriction only applies to Python versions up to Python 3.7, the restriction has been lifted in Python 3.8.

However, just to run a bunch of subprocesses and wait for these to complete, using asyncio is overkill; your OS can run subprocesses in parallel just fine. Just use subprocess.Popen() and check each process via the Popen.poll() method:

import subprocess

def ping_proc(addr):
    return subprocess.Popen(
        ['ping', '-W', '1', '-c', '3', addr],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL
    )

def get_hosts(net):
    # hosts() returns list of Ipv4Address objects
    procs = [ping_proc(str(addr)) for addr in net.hosts()]
    while any(p.poll() is None for p in procs):
        time.sleep(0.1)
    return [p.returncode for p in procs]

Popen.poll() does not block; if Popen.returncode is not yet set it checks for the process status with the OS with waitpid([pid], WNOHANG) and returns either None if the process is still running, or the now-available returncode value. The above just checks for those statuses in a loop with a short sleep in between to avoid thrashing.

The asyncio subprocess wrapper (on POSIX at least) either uses a SIGCHLD signal handler to be notified of child processes exiting or (in Python 3.8) uses a separate thread per child process to use a blocking waitpid() call on each subprocess created. You could implement the same signal handler, but take into account that signal handlers can only be registered on the main thread, so you'd have to jump through several hoops to communicate incoming SIGCHLD signal information to the right thread.