Python 3 - How to properly setup this multiprocess

2019-08-02 08:02发布

I have a file with like 10,000 rows, each row represents parameters to a download job. I have like 5 custom downloaders. Each job can take anywhere from 5 seconds to 2 minutes. How would I create something that iterates through the 10,000 rows, assigning each job to a downloader if that downloader isn't currently working?

EDIT:

The difficult part for me is that each Downloader is an instance of a class, and the differences between the instances are the port_numbers I specify when I instantiate each of the 5 Downloader objects. So I have a = Downloader(port_number=7751) ... e = Downloader(port_number=7755). Then, if I were to use a Downloader I would do a.run(row).

How do I define the workers as these a, b, c, d, e rather than a downloader function?

2条回答
欢心
2楼-- · 2019-08-02 08:26

There are many ways to do it - the simplest way would be to just use multiprocessing.Pool and let it organize the workers for you - 10k rows is not all that much, let's say that an average URL is even a full kilobyte long it will still take only 10MB of memory and memory is cheap.

So, just read the file in memory and map it to multiprocessing.Pool to do your bidding:

from multiprocessing import Pool

def downloader(param):  # our downloader process
    # download code here
    # param will hold a line from your file (including newline at the end, strip before use)
    # e.g. res = requests.get(param.strip())
    return True  # lets provide some response back

if __name__ == "__main__":  # important protection for cross-platform use

    with open("your_file.dat", "r") as f:  # open your file
        download_jobs = f.readlines()  # store each line in a list

    download_pool = Pool(processes=5)  # make our pool use 5 processes
    responses = download_pool.map(downloader, download_jobs)  # map our data, line by line
    download_pool.close()  # lets exit cleanly
    # you can check the responses for each line in the `responses` list

You can also use threading instead of multiprocessing (or multiprocessing.pool.ThreadPool as a drop-in replacement for this) to do everything within a single process if you need shared memory. A single thread is more than enough for download purposes unless you're doing additional processing.

UPDATE

If you want your downloaders to run as class instances, you can transform the downloader function into a factory for your Downloader instances, and then just pass what you need to instantiate those instances alongside your URLs. Here is a simple Round-Robin approach:

from itertools import cycle
from multiprocessing import Pool

class Downloader(object):

    def __init__(self, port_number=8080):
        self.port_number = port_number

    def run(self, url):
        print("Downloading {} on port {}".format(url, self.port_number))

def init_downloader(params):  # our downloader initializator
    downloader = Downloader(**params[0])  # instantiate our downloader
    downloader.run(params[1])  # run our downloader
    return True  # you can provide your

if __name__ == "__main__":  # important protection for cross-platform use

    downloader_params = [  # Downloaders will be initialized using these params
        {"port_number": 7751},
        {"port_number": 7851},
        {"port_number": 7951}
    ]

    downloader_cycle = cycle(downloader_params)  # use cycle for round-robin distribution
    with open("your_file.dat", "r") as f:  # open your file
        # read our file line by line and attach downloader params to it
        download_jobs = [[next(downloader_cycle), row.strip()] for row in f]

    download_pool = Pool(processes=5)  # make our pool use 5 processes
    responses = download_pool.map(init_downloader, download_jobs)  # map our data
    download_pool.close()  # lets exit cleanly
    # you can check the responses for each line in the `responses` list

Keep in mind that this is not the most balanced solution as it can happen to have two Downloader instances with the same port running, but it will average over large enough data.

If you want to make sure that you don't have two Downloader instances running off of the same port, you'll either need to build your own pool, or you'll need to create a central process that will issue ports to your Downloader instances when they need them.

查看更多
Lonely孤独者°
3楼-- · 2019-08-02 08:31

Read in your 10000 rows into a list of strings.

with open('foo.dat') as f:
    data = f.readlines()

Assuming that the data does not include a port number and the edited question mentions 5 ports, you should add that to the data.

data = [(p, d) for p, d in zip(itertools.cycle([7751, 7752, 7753, 7754, 7755]), data)]

Write a function that takes one of those tuples as an argument, splits it, creates a Downloader object and runs it.

def worker(target):
    port, params = target
    d = Downloader(port_number=port)
    d.run(params)
    return params # for lack of more information.

Use the imap_unordered method of a multiprocessing.Pool, giving it the function you've defined and the list of tuples as arguments.

The iterator returned by imap_unordered will start yielding results as soon as they become available. You could print them to show the progress.

p = multiprocessing.Pool()
for params in p.imap_unordered(worker, data):
    print('Finished downloading', params)

Edit

P.S: if the only method of your Downloader object you'll ever use is run(), it should not be an object. It is a function in disguise! Look up the "stop writing classes" video on Youtube and watch it.

查看更多
登录 后发表回答