Undocumented Managed VM task queue RPCFailedError

2019-05-10 15:37发布

I'm running into a very peculiar and undocumented issue with a GAE Managed VM and Task Queues. I understand that the Managed VM service is in beta, so this question may not be relevant forever, but it's definitely causing me lots of headache now.

The main symptom of the issue is that, in certain (not completely known to me) circumstances, I'm seeing the following error/traceback:

  File "/home/vmagent/my_app/some_file.py", line 265, in some_ndb_tasklet
    res = yield some_task.add_async('some-task-queue-name')
  File "/home/vmagent/python_vm_runtime/google/appengine/ext/ndb/tasklets.py", line 472, in _on_rpc_completion
    result = rpc.get_result()
  File "/home/vmagent/python_vm_runtime/google/appengine/api/apiproxy_stub_map.py", line 613, in get_result
    return self.__get_result_hook(self)
  File "/home/vmagent/python_vm_runtime/google/appengine/api/taskqueue/taskqueue.py", line 1948, in ResultHook
    rpc.check_success()
  File "/home/vmagent/python_vm_runtime/google/appengine/api/apiproxy_stub_map.py", line 579, in check_success
    self.__rpc.CheckSuccess()
  File "/home/vmagent/python_vm_runtime/google/appengine/ext/vmruntime/vmstub.py", line 312, in _WaitImpl
    raise self._ErrorException(*_DEFAULT_EXCEPTION)
RPCFailedError: The remote RPC to the application server failed for call taskqueue.BulkAdd().

I've gone through my local App Engine SDK to trace this through, and I can get up to the last line of the trace, but google/appengine/ext/vmruntime/ doesn't exist on my machine at all, so I have no idea what's happening in vmstub.py. From looking at the local code, some_task.add_async('the-queue') is spinning up an RPC and waiting for it to finish, but this error is not what the except apiproxy_errors.ApplicationError, e: at line 1949 of taskqueue.py is expecting...

The code that's generating the error looks something like this:

@ndb.tasklet
def kickoff_tasks(batch_of_payloads):
    for task_payload in batch_of_payloads:
        # task_payload is a dict
        task = taskqueue.Task(
            url='/the/handler/url',
            params=payload)
        res = yield task.add_async('some-valid-task-queue-name')

Other things worth noting:

  • this code itself is running in a task handler kicked off by another task.
  • I first saw this error before implementing this sort of batching, and assumed the issue was because I had added too many tasks from within a task handler.
  • In some cases, I can run this successfully with a batch size of 100, but in others, it fails consistently (depending on the data in the payloads) at 100, and sometimes succeeds at batch sizes of 50.
  • The task payloads themselves include batches of items, and are tuned to be just small enough to fit in a task. App Engine advertises a maximum task size of 100KB, so I'm keeping the payloads to under 90,000 bytes right now. Lowering the size even more doesn't seem to help any.
  • I've also tried implementing an exponential backoff to retry the kickoff_tasks method when this error appears, but it seems that once the error is raised, I can't add any other tasks at all from within the same handler (i.e. I can't kickoff a "continue from where you left off" task, I just have to let this one fail and restart itself)

So, my question is, what is actually causing this error? How can I avoid it, or fix this so that I'm handling it correctly?

2条回答
Fickle 薄情
2楼-- · 2019-05-10 16:34

This is a known issue that is being worked on. There are actually two issues - the RPC failure itself and the lack of handling of the RPCFailedError exception by the SDK.

There is some public discussion of the issue here.

查看更多
Ridiculous、
3楼-- · 2019-05-10 16:41

If you're using App Engine Flexible and the python-compat-multicore image, a new bug popped up related to App Engine using a newer version of the requests library that broke the communication between App Engine Flexible and the datastore. You can fix this error by monkey patching the library in your appengine_config.py file.

Add the following code to appengine_config.py:

try:
    import appengine.ext.vmruntime.vmstub as vmstub
except ImportError:
    pass
else:
    if isinstance(vmstub.DEFAULT_TIMEOUT, (int, long)):
        # Newer requests libraries do not accept integers as header values. 
        # Be sure to convert the header value before sending. 
        # See Support Case ID 11235929.
        vmstub.DEFAULT_TIMEOUT = bytes(vmstub.DEFAULT_TIMEOUT)

Note that if you do not have an appengine_config.py file, you can just create it in your base project directory (wherever you put your app.yaml file). This file gets run during App Engine startup..

查看更多
登录 后发表回答