mulithreading environment and modules like pickle

2019-08-02 16:06发布

I am using "import threading" and python 3.4. Simple case, I have one main parent thread and one child thread. I need to save my dict to file from child thread. In thread function I have variable:

def thread_function(...)
    def save_to_file():
        this_thread_data.my_dict or nonlocal this_thread_data.my_dict 
        ... json or pickle


    this_thread_data = local()
    this_thread_data.my_dict = {...}
    ...

When I use pickle I get error

_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

When I use json I get error

TypeError: <threading.Event object at 0x7f49115a9588> is not JSON serializable

Will pickle or json work in multithreading environment or I need to use something else instead?

Thank you.

3条回答
小情绪 Triste *
2楼-- · 2019-08-02 16:54

Python threading (and multiprocessing) and pickling is broken and limited unless you jump outside the standard library.

If you use a fork of multiprocessing called pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing's map functions. This is because dill is used instead of pickle or cPickle, and dill can serialize almost anything in python. pathos.multiprocessing provides an interface to the threading module, just like the standard python module does.

pathos.multiprocessing also provides an asynchronous map function… and it can map functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))

See: What can multiprocessing and dill do together?

and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> #from pathos.multiprocessing import ThreadingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Having unusual stuff in a dict, doesn't matter…

>>> d = {'1':add, '2':t, '3':Test, '4':range(10), '5':1}
>>>                        
>>> def items(x):
...   return x[0],x[1]
... 
>>> p.map(items, d.items())
[('1', <function add at 0x103b7e2a8>), ('3', <class '__main__.Test'>), ('2', <__main__.Test object at 0x103b7ad90>), ('5', 1), ('4', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]

By the way, if you wanted to pickle a thread lock, you can do that too.

>>> import dill as pickle
>>> import threading
>>> lock = threading.Lock()
>>> 
>>> pickle.loads(pickle.dumps(lock))
<thread.lock object at 0x10c534650>

It looks like you wanted to build some sort of closure that would automatically store function calls to a file or at least to a serialized string. If that's what you want, you could try klepto, which gives you a decorator that you apply to your function and you get caching to memory or disk or to a database. Klepto can use pickle or json, but it's augmented by dill, so it can serialize almost anything in python -- so don't worry about what's in your dict… just serialize it.

from klepto import lru_cache as memoize
from klepto.keymaps import picklemap
dumps = picklemap(serializer='dill')

class Adder(object):
    """A simple class with a memoized method"""

    @memoize(keymap=dumps, ignore=('self','**'))
    def __call__(self, x, *args, **kwds):
        debug = kwds.get('debug', False)
        if debug:
            print ('debug:', x, args, kwds)
        return sum((x,)+args)
    add = __call__

add = Adder()
assert add(2,0) == 2
assert add(2,0,z=4) == 2          # cached (ignore z)
assert add(2,0,debug=False) == 2  # cached (ignore debug)
assert add(1,2,debug=False) == 3
assert add(1,2,debug=True) == 3   # cached (ignore debug)
assert add(4) == 4
assert add(x=4) == 4              # cached

Klepto enables you to have all your cached results available when you restart your code. In that case, you'd pick some file or database backend, then ensure you do a add.dump() to the archive… then restart python or whatever, and do add.load() to load the archived results.

Get the code here: https://github.com/uqfoundation

查看更多
Explosion°爆炸
3楼-- · 2019-08-02 16:57

Using pickle and json will work fine in a multi-threaded environment (but probably is not thread-safe so make sure the data you're pickling can't changing at the time, for example by using a lock). The catch is that you will be restricted to the kind of data you can actually save to disk.

Not all objects are serialisable, as you have found. The simplest approach is to make sure your dictionary only has values that are compatible with pickle or the json serialiser. For example, you seem to have stored a lock object in your dictionary that is making pickle fail. You might want to create a new dictionary with only the values that can be pickled, and then pickle that.

Alternatively, if you want to create a custom object to store your data, you can tell pickle exactly how to pickle it. This is more advanced and probably unnecessary in your case, but you can find more documentation here: https://docs.python.org/3.4/library/pickle.html#pickling-class-instances

查看更多
姐就是有狂的资本
4楼-- · 2019-08-02 17:06

There are better ways to share data between threads. If you're open to using processes instead of threads, I would recommend the python 'multiprocessing' module, specifically the 'Manager' class: https://docs.python.org/2/library/multiprocessing.html#managers. Here is a toy example:

from multiprocessing import Manager, Process

def on_separate_process(alist):
   print alist

manager = Manager()
alist = manager.list([1,2,3])

p = Process(target=on_separate_process, args=[alist])
p.start()

prints [1,2,3]

查看更多
登录 后发表回答