Python Multiprocessing. How to enqueue XMLRPC Serv

2019-07-08 20:16发布

问题:

I am trying to send several parallel requests to a XMLRPC server (mosesserver). To launch the XMLRPC call, a ServerProxy object is needed (this object contains the URL of the server, among other things)

In serial execution, I can create this object at the beginning of the program (with a call to xmlrpclib.ServerProxy(server_URL)), store it in a variable and use it whenever I need it.

But when using a pool of processes, each process needs a different instance of this object. The task of each process is to read a line from an input and send it to the translation server via XMLRPC, wait for the result, get another line, send it again ...

It is easy to get that each process creates a new ServerProxy instance each time it calls XMLRPC, but the problem is that the first call takes longer than subsequent calls using the same ServerProxy object. So what I am trying to do is that each process creates a ServerProxy object the first time it makes a RPC call, and then it reuses that same ServerProxy object for the following calls (the way it is done in serial execution). But I am having problems to store that object.

import xmlrpclib
import sys
import os

import multiprocessing
import time

def func(dummy_argument,que):
  server = xmlrpclib.ServerProxy('http://myserver:6060/RPC2',)
#  server="HELLO"
  que.put(server)
  return

def run_parallel(line,num_procesos):
  pool = multiprocessing.Pool(processes=num_procesos)
  m = multiprocessing.Manager()
  q = m.Queue()
  for i in range (0, num_procesos):
    pool.apply_async(func, ("dummy",q))

  while not q.empty():
    print q.get()

  pool.close()
  pool.join()

  return

if __name__ == '__main__':
  line="test"
  run_parallel(line,4)

In the code above if I uncomment the line 'server = "HELLO"', everything seems to work fine, but when I remove that line, so the ServerProxy object 'server' has to be enqueued, I receive the following error:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 353, in _handle_results
    task = get()
TypeError: ('__init__() takes exactly 3 arguments (1 given)', <class 'xmlrpclib.Fault'>, ())

where the init method it refers to, is the constructor of the ApplyResult class (class whose instances are returned by Pool.apply_async())

The truth is that, even if I am able to store and retrieve this object correctly, I am not sure if I wil be able to use it in the way I intend, but at least I would like to try.

Anyone know where is the problem?

Or perhaps is there another (probably simpler) way to do what I need?

Thanks

回答1:

If I understood, your question is how to initialize the proxy only once in each subprocess. You can take advantage of the initializer argument of multiprocessing.Pool(), and the fact that each process has its own global variables:

import xmlrpclib
import multiprocessing

def init():
    global server
    server = xmlrpclib.ServerProxy('http://myserver:6060/RPC2',)

def func(argument):
    result = server.some_function(argument)
    return result

def run_parallel(line,num_procesos):
    pool = multiprocessing.Pool(processes=num_procesos, initializer=init)
    async_results = [pool.apply_async(func, ("dummy",q)) 
                     for i in range (0, num_procesos)]
    for r in async_results:
        print r.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
  line="test"
  run_parallel(line,4)

I'm also demonstrating a typical way of handling apply_async results there.