Why does this parallel search and replace does not

2019-07-29 03:58发布

问题:

I have a very long list of tweets (2 millions) and I use regexes to search and replace text in these tweets.

I run this using a joblib.Parallel map (joblib is the parallel backend used by scikit-learn).

My problem is that I can see in Windows' Task Manager that my script does not use 100% of each CPU. It doesn't use 100% of the RAM nor the disk. So I don't understand why it won't go faster.

There is probably synchronization delays somewhere but I can't find what nor where.

The code:

# file main.py
import re
from joblib import delayed, Parallel

def make_tweets():
    tweets = load_from_file()  # this is list of strings

    regex = re.compile(r'a *a|b *b')  # of course more complex IRL, with lookbehind/forward
    mydict = {'aa': 'A', 'bb': 'B'}  

    def handler(match):
        return mydict[match[0].replace(' ', '')]

    def replace_in(tweet)
        return re.sub(regex, handler, tweet)

    # -1 mean all cores
    # I have 6 cores that can run 12 threads
    with Parallel(n_jobs=-1) as parallel:
        tweets2 = parallel(delayed(replace_in)(tweet) for tweet in tweets)

    return tweets2

And here's the Task Manager:


Edit: final word

The answer is that the worker processes were slowed down by joblib synchronization: joblib sends the tweets in small chunks (one by one?) to the workers, which makes them wait. Using multiprocessing.Pool.map with a chunksize of len(tweets)/cpu_count() made the workers utilize 100% of the CPU.

Using joblib, the running time was around 12mn. Using multiprocessing it is 4mn. With multiprocessing, Each worker thread consumed around 50mb memory.

回答1:

after a bit of playing I think it's because joblib is spending all its time coordinating parallel running of everything and no time actually doing any useful work. at least for me under OSX and Linux — I don't have any MS Windows machines

I started by loading packages, pulling in your code, and generating a dummy file:

from random import choice
import re

from multiprocessing import Pool
from joblib import delayed, Parallel

regex = re.compile(r'a *a|b *b')  # of course more complex IRL, with lookbehind/forward
mydict = {'aa': 'A', 'bb': 'B'}  

def handler(match):
    return mydict[match[0].replace(' ', '')]

def replace_in(tweet):
    return re.sub(regex, handler, tweet)

examples = [
    "Regex replace isn't that computationally expensive... I would suggest using Pandas, though, rather than just a plain loop",
    "Hmm I don't use pandas anywhere else, but if it makes it faster, I'll try! Thanks for the suggestion. Regarding the question: expensive or not, if there is no reason for it to use only 19%, it should use 100%"
    "Well, is tweets a generator, or an actual list?",
    "an actual list of strings",
    "That might be causing the main process to have the 419MB of memory, however, that doesn't mean that list will be copied over to the other processes, which only need to work over slices of the list",
    "I think joblib splits the list in roughly equal chunks and sends these chunks to the worker processes.",
    "Maybe, but if you use something like this code, 2 million lines should be done in less than a minute (assuming an SSD, and reasonable memory speeds).",
    "My point is that you don't need the whole file in memory. You could type tweets.txt | python replacer.py > tweets_replaced.txt, and use the OS's native speeds to replace data line-by-line",
    "I will try this",
    "no, this is actually slower. My code takes 12mn using joblib.parallel and for line in f_in: f_out.write(re.sub(..., line)) takes 21mn. Concerning CPU and memory usage: CPU is same (17%) and memory much lower (60Mb) using files. But I want to minimize time spent, not memory usage.",
    "I moved this to chat because StackOverflow suggested it",
    "I don't have experience with joblib. Could you try the same with Pandas? pandas.pydata.org/pandas-docs/…",
]

with open('tweets.txt', 'w') as fd:
    for i in range(2_000_000):
        print(choice(examples), file=fd)

(see if you can guess where I got the lines from!)

as a baseline, I tried using naive solution:

with open('tweets.txt') as fin, open('tweets2.txt', 'w') as fout:
    for l in fin:
        fout.write(replace_in(l))

this takes 14.0s (wall clock time) on my OSX laptop, and 5.15s on my Linux desktop. Note that changing your definition of replace_in to use do regex.sub(handler, tweet) instead of re.sub(regex, handler, tweet) reduces the above to 8.6s on my laptop, but I'll not use this change below.

I then tried your joblib package:

with open('tweets.txt') as fin, open('tweets2.txt', 'w') as fout:
    with Parallel(n_jobs=-1) as parallel:
        for l in parallel(delayed(replace_in)(tweet) for tweet in fin):
            fout.write(l)

which takes 1min 16s on my laptop, and 34.2s on my desktop. CPU utilisation was pretty low as the child/worker tasks were all waiting for the coordinator to send them work most of the time.

I then tried using the multiprocessing package:

with open('tweets.txt') as fin, open('tweets2.txt', 'w') as fout:
    with Pool() as pool:
        for l in pool.map(replace_in, fin, chunksize=1024):
            fout.write(l)

which took 5.95s on my laptop and 2.60s on my desktop. I also tried with a chunk size of 8 which took 22.1s and 8.29s respectively. the chunk size allows the pool to send large chunks of work to its children, so it can spend less time coordinating and more time getting useful work done.

I'd therefore hazard a guess that joblib isn't particularly useful for this sort of usage as it doesn't seem to have a notion of chunksize.