Working with deque object across multiple processe

2019-07-27 02:19发布

I'm trying to reduce the processing time of reading a database with roughly 100,000 entries, but I need them to be formatted a specific way, in an attempt to do this, I tried to use python's multiprocessing.map function which works perfectly except that I can't seem to get any form of queue reference to work across them.

I've been using information from Filling a queue and managing multiprocessing in python to guide me for using queues across multiple processes, and Using a global variable with a thread to guide me for using global variables across threads. I've gotten the software to work, but when I check the list/queue/dict/map length after running the process, it always returns zero

I've written a simple example to show what I mean: You have to run the script as a file, the map's initialize function does not work from the interpreter.

from multiprocessing import Pool
from collections import deque

global_q = deque()

def my_init(q):
    global global_q
    global_q = q
    q.append("Hello world")


def map_fn(i):
    global global_q
    global_q.append(i)


if __name__ == "__main__":
    with Pool(3, my_init, (global_q,)) as pool:
        pool.map(map_fn, range(3))
    for p in range(len(global_q)):
        print(global_q.pop())

Theoretically, when I pass the queue object reference from the main thread to the worker threads using the pool function, and then initialize that thread's global variables using with the given function, then when I insert elements into the queue from the map function later, that object reference should still be pointing to the original queue object reference (long story short, everything should end up in the same queue, because they all point to the same location in memory).

So, I expect:

Hello World
Hello World
Hello World
1
2
3

of course, the 1, 2, 3's are in arbitrary order, but what you'll see on the output is ''.

How come when I pass object references to the pool function, nothing happens?

2条回答
兄弟一词,经得起流年.
2楼-- · 2019-07-27 02:30

Here's an example of how to share something between processes by extending the multiprocessing.managers.BaseManager class to support deques.

There's a section in the documentation about creating Customized managers.

import collections
from multiprocessing import Pool
from multiprocessing.managers import BaseManager


class DequeManager(BaseManager):
    pass

class DequeProxy(object):
    def __init__(self, *args):
        self.deque = collections.deque(*args)
    def __len__(self):
        return self.deque.__len__()
    def appendleft(self, x):
        self.deque.appendleft(x)
    def append(self, x):
        self.deque.append(x)
    def pop(self):
        return self.deque.pop()
    def popleft(self):
        return self.deque.popleft()

# Currently only exposes a subset of deque's methods.
DequeManager.register('DequeProxy', DequeProxy,
                      exposed=['__len__', 'append', 'appendleft',
                               'pop', 'popleft'])


process_shared_deque = None  # Global only within each process.


def my_init(q):
    global process_shared_deque  # Initialize module-level global.
    process_shared_deque = q
    q.append("Hello world")

def map_fn(i):
    process_shared_deque.append(i)  # deque's don't have a "put()" method.


if __name__ == "__main__":
    manager = DequeManager()
    manager.start()
    shared_deque = manager.DequeProxy()

    with Pool(3, my_init, (shared_deque,)) as pool:
        pool.map(map_fn, range(3))

    for p in range(len(shared_deque)):  # Show left-to-right contents.
        print(shared_deque.popleft())

Output:

Hello world
0
1
2
Hello world
Hello world
查看更多
戒情不戒烟
3楼-- · 2019-07-27 02:30

You cant use global variable for multiprocesing.

Pass to the function multiprocessing queue.

from multiprocessing import Queue
queue= Queue() 

def worker(q):
    q.put(something)

Also you are propably experiencing that the code is allright, but as the pool create separate processes, even the errors are separeted and therefore you dont see the code not only isnt working, but that it throws error.

The reason why your output is '', is because nothing was appended to your q/global_q. And if it was appended, then only some variable, that may be called global_q, but its totally different one than your global_q in your main thread

Try to print('Hello world') inside the function you want to multiprocess and you will see by yourself, that nothing is actually printed at all. That processes is simply outside of your main thread and the only way to access that process is by multiprocessing Queues. You access the Queue by queue.put('something') and something = queue.get()

Try to understand this code and you will do well:

import multiprocessing as mp

shared_queue = mp.Queue() # This will be shared among all procesess, but you need to pass the queue as an argument in the process. You CANNOT use it as global variable. Understand that the functions kind of run in total different processes and nothing can really access them... Except multiprocessing.Queue - that can be shared across all processes.


def channel(que,channel_num):
    que.put(channel_num)

if __name__ == '__main__':
    processes = [mp.Process(target=channel, args=(shared_queue, channel_num)) for channel_num in range(8)]

    for p in processes:
        p.start()


    for p in processes: # wait for all results to close the pool
        p.join()

    for i in range(8): # Get data from Queue. (you can get data out of it at any time actually)
        print(shared_queue.get())
查看更多
登录 后发表回答