Python Multiprocessing storing data until further

2020-03-26 03:19发布

问题:

I have a large object of a type that cannot be shared between processes. It has methods to instantiate it and to work on its data.

The current way I'm doing it is I first instantiate the object in the main parent process and then pass it around to subprocesses when some event happens. The problem is that whenever the subprocesses run, they copy the object in memory every time which takes a while. I want to store it in memory that is only available to them so that they don't have to copy it each time they call that object's function.

How would I store an object just for that process's own use?

Edit: Code

class MultiQ:
    def __init__(self):
        self.pred = instantiate_predict() #here I instantiate the big object

    def enq_essay(self,essay):
        p = Process(target=self.compute_results, args=(essay,))
        p.start()

    def compute_results(self, essay):
        predictions = self.pred.predict_fields(essay) #computation in the large object that doesn't modify the object

This copies the large object in memory every time. I am trying to avoid that.

Edit 4: short code sample that runs on 20 newsgroups data

import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging
import os
import numpy as np
import cPickle as pickle


def get_20newsgroups_fnames():
    all_files = []
    for i, (root, dirs, files) in enumerate(os.walk("/home/roman/Desktop/20_newsgroups/")):
        if i>0:
            all_files.extend([os.path.join(root,file) for file in files])
    return all_files

documents = [unicode(open(f).read(), errors="ignore") for f in get_20newsgroups_fnames()]
logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
                              datefmt = '%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True


def free_memory():
    """
    Return free memory available, including buffer and cached memory
    """
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
                total += amount
    return total


def predict(large_object, essay="this essay will be predicted"):
    """this method copies the large object in memory which is what im trying to avoid"""
    vectorized_essay = large_object[0].transform(essay)
    large_object[1].predict(vectorized_essay)
    report_memory("done")


def train_and_model():
    """this is very similar to the instantiate_predict method from my first code sample"""
    tfidf_vect = ftext.TfidfVectorizer()
    X = tfidf_vect.fit_transform(documents)
    y = np.random.random_integers(0,1,19997)
    model = lm.LogisticRegression()
    model.fit(X, y)
    return (tfidf_vect, model)


def report_memory(label):
    f = free_memory()
    logger.warn('{l:<25}: {f}'.format(f=f, l=label))

def dump_large_object(large_object):
    f = open("large_object.obj", "w")
    pickle.dump(large_object, f, protocol=2)
    f.close()

def load_large_object():
    f = open("large_object.obj")
    large_object = pickle.load(f)
    f.close()
    return large_object

if __name__ == '__main__':
    report_memory('Initial')
    tfidf_vect, model = train_and_model()
    report_memory('After train_and_model')
    large_object = (tfidf_vect, model)
    procs = [mp.Process(target=predict, args=(large_object,))
             for i in range(mp.cpu_count())]
    report_memory('After Process')
    for p in procs:
        p.start()
    report_memory('After p.start')
    for p in procs:
        p.join()
    report_memory('After p.join')

Output 1:

19:01:39: [ MainProcess] Initial                  : 26585728
19:01:51: [ MainProcess] After train_and_model    : 25958924
19:01:51: [ MainProcess] After Process            : 25958924
19:01:51: [ MainProcess] After p.start            : 25925908
19:01:51: [   Process-1] done                     : 25725524
19:01:51: [   Process-2] done                     : 25781076
19:01:51: [   Process-4] done                     : 25789880
19:01:51: [   Process-3] done                     : 25802032
19:01:51: [ MainProcess] After p.join             : 25958272
roman@ubx64:$ du -h large_object.obj
4.6M    large_object.obj

So maybe the large object is not even large and my problem was in the memory usage from the transform method of tfidf vectorizer.

now if I change the main method to this:

report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
procs = [mp.Process(target=predict, args=(large_object,))
         for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
    p.start()
report_memory('After p.start')
for p in procs:
    p.join()
report_memory('After p.join')

I get these results: Output 2:

20:07:23: [ MainProcess] Initial                  : 26578356
20:07:23: [ MainProcess] After loading the object : 26544380
20:07:23: [ MainProcess] After Process            : 26544380
20:07:23: [ MainProcess] After p.start            : 26523268
20:07:24: [   Process-1] done                     : 26338012
20:07:24: [   Process-4] done                     : 26337268
20:07:24: [   Process-3] done                     : 26439444
20:07:24: [   Process-2] done                     : 26438948
20:07:24: [ MainProcess] After p.join             : 26542860

Then I changed the main method to this:

report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
predict(large_object)
report_memory('After Process')

And got these results: Output 3:

20:13:34: [ MainProcess] Initial                  : 26572580
20:13:35: [ MainProcess] After loading the object : 26538356
20:13:35: [ MainProcess] done                     : 26513804
20:13:35: [ MainProcess] After Process            : 26513804

At this point I have no idea what's going on, but the multiprocessing definitely uses more memory.

回答1:

Linux uses copy-on-write, which means when a subprocess is forked, the global variables in each subprocess share the same memory address until the value is modified. Only when a value is modified is the it copied.

So in theory, if the large object is not modified, it can be used by the subprocesses without consuming more memory. Let's test that theory.

Here is your code, spruced up with a bit of memory usage logging:

import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging

logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
                              datefmt='%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True


def predict(essay="this essay will be predicted"):
    """this method copies the large object in memory which is what im trying to avoid"""
    vectorized_essay = large_object[0].transform(essay)
    large_object[1].predict(vectorized_essay)
    report_memory("done")


def train_and_model():
    """this is very similar to the instantiate_predict method from my first code sample"""
    tfidf_vect = ftext.TfidfVectorizer()
    N = 100000
    corpus = [
        'This is the first document.',
        'This is the second second document.',
        'And the third one.',
        'Is this the first document?', ] * N
    y = [1, 0, 1, 0] * N
    report_memory('Before fit_transform')
    X = tfidf_vect.fit_transform(corpus)
    model = lm.LogisticRegression()
    model.fit(X, y)
    report_memory('After model.fit')
    return (tfidf_vect, model)


def free_memory():
    """
    Return free memory available, including buffer and cached memory
    """
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
                total += amount
    return total


def gen_change_in_memory():
    f = free_memory()
    diff = 0
    while True:
        yield diff
        f2 = free_memory()
        diff = f - f2
        f = f2
change_in_memory = gen_change_in_memory().next

def report_memory(label):
    logger.warn('{l:<25}: {d:+d}'.format(d=change_in_memory(), l=label))

if __name__ == '__main__':
    report_memory('Initial')
    tfidf_vect, model = train_and_model()
    report_memory('After train_and_model')
    large_object = (tfidf_vect, model)
    procs = [mp.Process(target=predict) for i in range(mp.cpu_count())]
    report_memory('After Process')
    for p in procs:
        p.start()
    for p in procs:
        p.join()
    report_memory('After p.join')

It yields:

21:45:01: [ MainProcess] Initial                  : +0
21:45:01: [ MainProcess] Before fit_transform     : +3224
21:45:12: [ MainProcess] After model.fit          : +153572
21:45:12: [ MainProcess] After train_and_model    : -3100
21:45:12: [ MainProcess] After Process            : +0
21:45:12: [   Process-1] done                     : +2232
21:45:12: [   Process-2] done                     : +2976
21:45:12: [   Process-3] done                     : +3596
21:45:12: [   Process-4] done                     : +3224
21:45:12: [ MainProcess] After p.join             : -372

The number reported is the change in KiB of free memory (including cached and buffers). So, for example, the change in free memory between 'Initial' and 'After train_and_model' was about 150MB. Thus, the large_object requires about 150MB.

Then, after 4 subprocesses are are completed, a much smaller amount of memory -- about 12MB total -- has been consumed. The memory consumed could be due to the creation of the subprocess plus memory used by the transform and predict methods.

So it appears that the large_object is not being copied since if were we should have seen an increase of about 150MB in memory consumed.


Commentary on your run on 20 newsgroups:

Here are the changes in free memory:

On 20 newsgroups data:

| Initial               |       0 |
| After train_and_model |  626804 | <-- Large object requires 627M
| After Process         |       0 |
| After p.start         |   33016 |
| done                  |  200384 | 
| done                  |  -55552 |
| done                  |   -8804 |
| done                  |  -12152 |
| After p.join          | -156240 |

So it looks like instantiating the large object requires 627MB. I am clueless as to why an additional 200+MB have been consumed after the first done is reached.

Using load_large_object:

| Initial                  |       0 |
| After loading the object |   33976 |
| After Process            |       0 |
| After p.start            |   21112 |
| done                     |  185256 |
| done                     |     744 |
| done                     | -102176 |
| done                     |     496 |
| After p.join             | -103912 |

Apparently, the large_object itself requires only 34MB, the rest of the memory, 627-34 = 593MB must have been consumed by the fit_transform and fit methods called in train_and_model.

Using single process:

| Initial                  |     0 |
| After loading the object | 34224 |
| done                     | 24552 |
| After Process            |     0 |

This is plausible.

So, the data you've accumulated seems to support the claim that large object itself is not being copied by each subprocess. But a new mystery arises: Why is there a huge consumption of memory between "After p.start" and the first "done". I don't know the answer to that.


You might try putting report_memory calls around

vectorized_essay = large_object[0].transform(essay)

and

large_object[1].predict(vectorized_essay)

to see where the extra memory is being consumed. My guess is that one of these scikit-learn methods is choosing to allocate this (relatively) huge amount of memory.



回答2:

I ended up using the RPC servers using Rabbit MQ.Rabbit MQ Tutorial for RPC/Python. So I created the number of servers equivalent to number of CPUs on my machine. These servers start up once and allocate the memory for the model and the vectorizer once and hold on to it while operational. Additional advantages were

  1. Some of the processing could be easily sent to a different machine if one gets overwhelmed
  2. If the computation fails on one server, it can be easily sent to a different server
  3. Memory allocation process was not instant in the original code, so overall running time on my dataset dropped from 18 seconds to 12 seconds per query because the memory is pre-allocated.

Overall, my code became much cleaner as well.