How to synchronize a python dict with multiprocess

2019-01-13 02:25发布

问题:

I am using Python 2.6 and the multiprocessing module for multi-threading. Now I would like to have a synchronized dict (where the only atomic operation I really need is the += operator on a value).

Should I wrap the dict with a multiprocessing.sharedctypes.synchronized() call? Or is another way the way to go?

回答1:

Intro

There seems to be a lot of arm-chair suggestions and no working examples. None of the answers listed here even suggest using multiprocessing and this is quite a bit disappointing and disturbing. As python lovers we should support our built-in libraries, and while parallel processing and synchronization is never a trivial matter, I believe it can be made trivial with proper design. This is becoming extremely important in modern multi-core architectures and cannot be stressed enough! That said, I am far from satisfied with the multiprocessing library, as it is still in its infancy stages with quite a few pitfalls, bugs, and being geared towards functional programming (which I detest). Currently I still prefer the Pyro module (which is way ahead of its time) over multiprocessing due to multiprocessing's severe limitation in being unable to share newly created objects while the server is running. The "register" class-method of the manager objects will only actually register an object BEFORE the manager (or its server) is started. Enough chatter, more code:

Server.py

from multiprocessing.managers import SyncManager


class MyManager(SyncManager):
    pass


syncdict = {}
def get_dict():
    return syncdict

if __name__ == "__main__":
    MyManager.register("syncdict", get_dict)
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.start()
    raw_input("Press any key to kill server".center(50, "-"))
    manager.shutdown()

In the above code example, Server.py makes use of multiprocessing's SyncManager which can supply synchronized shared objects. This code will not work running in the interpreter because the multiprocessing library is quite touchy on how to find the "callable" for each registered object. Running Server.py will start a customized SyncManager that shares the syncdict dictionary for use of multiple processes and can be connected to clients either on the same machine, or if run on an IP address other than loopback, other machines. In this case the server is run on loopback (127.0.0.1) on port 5000. Using the authkey parameter uses secure connections when manipulating syncdict. When any key is pressed the manager is shutdown.

Client.py

from multiprocessing.managers import SyncManager
import sys, time

class MyManager(SyncManager):
    pass

MyManager.register("syncdict")

if __name__ == "__main__":
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.connect()
    syncdict = manager.syncdict()

    print "dict = %s" % (dir(syncdict))
    key = raw_input("Enter key to update: ")
    inc = float(raw_input("Enter increment: "))
    sleep = float(raw_input("Enter sleep time (sec): "))

    try:
         #if the key doesn't exist create it
         if not syncdict.has_key(key):
             syncdict.update([(key, 0)])
         #increment key value every sleep seconds
         #then print syncdict
         while True:
              syncdict.update([(key, syncdict.get(key) + inc)])
              time.sleep(sleep)
              print "%s" % (syncdict)
    except KeyboardInterrupt:
         print "Killed client"

The client must also create a customized SyncManager, registering "syncdict", this time without passing in a callable to retrieve the shared dict. It then uses the customized SycnManager to connect using the loopback IP address (127.0.0.1) on port 5000 and an authkey establishing a secure connection to the manager started in Server.py. It retrieves the shared dict syncdict by calling the registered callable on the manager. It prompts the user for the following:

  1. The key in syncdict to operate on
  2. The amount to increment the value accessed by the key every cycle
  3. The amount of time to sleep per cycle in seconds

The client then checks to see if the key exists. If it doesn't it creates the key on the syncdict. The client then enters an "endless" loop where it updates the key's value by the increment, sleeps the amount specified, and prints the syncdict only to repeat this process until a KeyboardInterrupt occurs (Ctrl+C).

Annoying problems

  1. The Manager's register methods MUST be called before the manager is started otherwise you will get exceptions even though a dir call on the Manager will reveal that it indeed does have the method that was registered.
  2. All manipulations of the dict must be done with methods and not dict assignments (syncdict["blast"] = 2 will fail miserably because of the way multiprocessing shares custom objects)
  3. Using SyncManager's dict method would alleviate annoying problem #2 except that annoying problem #1 prevents the proxy returned by SyncManager.dict() being registered and shared. (SyncManager.dict() can only be called AFTER the manager is started, and register will only work BEFORE the manager is started so SyncManager.dict() is only useful when doing functional programming and passing the proxy to Processes as an argument like the doc examples do)
  4. The server AND the client both have to register even though intuitively it would seem like the client would just be able to figure it out after connecting to the manager (Please add this to your wish-list multiprocessing developers)

Closing

I hope you enjoyed this quite thorough and slightly time-consuming answer as much as I have. I was having a great deal of trouble getting straight in my mind why I was struggling so much with the multiprocessing module where Pyro makes it a breeze and now thanks to this answer I have hit the nail on the head. I hope this is useful to the python community on how to improve the multiprocessing module as I do believe it has a great deal of promise but in its infancy falls short of what is possible. Despite the annoying problems described I think this is still quite a viable alternative and is pretty simple. You could also use SyncManager.dict() and pass it to Processes as an argument the way the docs show and it would probably be an even simpler solution depending on your requirements it just feels unnatural to me.



回答2:

I would dedicate a separate process to maintaining the "shared dict": just use e.g. xmlrpclib to make that tiny amount of code available to the other processes, exposing via xmlrpclib e.g. a function taking key, increment to perform the increment and one taking just the key and returning the value, with semantic details (is there a default value for missing keys, etc, etc) depending on your app's needs.

Then you can use any approach you like to implement the shared-dict dedicated process: all the way from a single-threaded server with a simple dict in memory, to a simple sqlite DB, etc, etc. I suggest you start with code "as simple as you can get away with" (depending on whether you need a persistent shared dict, or persistence is not necessary to you), then measure and optimize as and if needed.



回答3:

In response to an appropriate solution to the concurrent-write issue. I did very quick research and found that this article is suggesting a lock/semaphore solution. (http://effbot.org/zone/thread-synchronization.htm)

While the example isn't specificity on a dictionary, I'm pretty sure you could code a class-based wrapper object to help you work with dictionaries based on this idea.

If I had a requirement to implement something like this in a thread safe manner, I'd probably use the Python Semaphore solution. (Assuming my earlier merge technique wouldn't work.) I believe that semaphores generally slow down thread efficiencies due to their blocking nature.

From the site:

A semaphore is a more advanced lock mechanism. A semaphore has an internal counter rather than a lock flag, and it only blocks if more than a given number of threads have attempted to hold the semaphore. Depending on how the semaphore is initialized, this allows multiple threads to access the same code section simultaneously.

semaphore = threading.BoundedSemaphore()
semaphore.acquire() # decrements the counter
... access the shared resource; work with dictionary, add item or whatever.
semaphore.release() # increments the counter


回答4:

Is there a reason that the dictionary needs to be shared in the first place? Could you have each thread maintain their own instance of a dictionary and either merge at the end of the thread processing or periodically use a call-back to merge copies of the individual thread dictionaries together?

I don't know exactly what you are doing, so keep in my that my written plan may not work verbatim. What I'm suggesting is more of a high-level design idea.