How to fix multithreading/multiprocessing with dic

2019-08-01 02:03发布

I'm making over 100K calls to an api, using 2 functions I reach out to the api with the first function and grab the sysinfo(a dict) for each host, then with the second function I go through sysinfo and grab the IP addresses. I'm looking for a way to speed this up but never used multiprocessing/threading before(currently takes about 3 hours).

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

#pool = ThreadPool(4)
p = Pool(5)

#obviously I removed a lot of the code that generates some of these
#variables, but this is the part that slooooows everything down. 

def get_sys_info(self, host_id, appliance):
    sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
    return sysinfo

def get_ips_from_sysinfo(self, sysinfo):
    sysinfo = sysinfo["data"]
    network_array = sysinfo.get("networkArray", {})
    network_info = network_array.get("networkInfo", [])
    ips = []
    for ni in network_info:
        ip_array = ni.get("ipArray", {})
        ip_info = ip_array.get("ipInfo", [])
        for i in ip_info:
            ips.append(i)
    return ips

if __name__ == "__main__":
    for i in ids:
        sysinfo = rr.get_sys_info(i, appliance)
        hostname = sysinfo.get("data", {}).get("hostname")
        try:
            ips = p.map(rr.get_ips_from_sysinfo(sysinfo))
        except Exception as e:
            rr.logger.error("Exception on {} -- {}".format(hostname, e))
            continue

#Tried calling it here
ips = p.map(rr.get_ips_from_sysinfo(sysinfo))

I have to go through over 100,000 of these api calls, and this is really the part that slows everything down.

I think I've tried everything and gotten every possible iterable, missing argument error.

I'd just really appreciate any type of help. Thank you!

4条回答
干净又极端
2楼-- · 2019-08-01 02:44

For whatever reason I was a little leary about calling an instance method in numerous threads - but it seems to work. I made this toy example using concurrent.futures - hopefully it mimics your actual situation well enough. This submits 4000 instance method calls to a thread pool of (at max) 500 workers. Playing around with the max_workers value I found that execution time improvements were pretty linear up to about a 1000 workers then the improvement ratio started tailing off.

import concurrent.futures, time, random

a = [.001*n for n in range(1,4001)]

class F:
    def __init__(self, name):
        self.name = f'{name}:{self.__class__.__name__}'
    def apicall(self,n):
        wait = random.choice(a)
        time.sleep(wait)
        return (n,wait, self.name)

f = F('foo')

if __name__ == '__main__':
    nworkers = 500
    with concurrent.futures.ThreadPoolExecutor(nworkers) as executor:
#        t = time.time()
        futures = [executor.submit(f.apicall, n) for n in range(4000)]
        results = [future.result() for future in concurrent.futures.as_completed(futures)]
#        t = time.time() - t
#    q = sum(r[1] for r in results)
#    print(f'# workers:{nworkers} - ratio:{q/t}')

I didn't account for possible Exceptions being thrown during the method call but the example in the docs is pretty clear how to handle that.

查看更多
太酷不给撩
3楼-- · 2019-08-01 02:48

So... after days of looking at the suggestions on here(thank you so much!!!) And a couple outside reading (Fluent Python Ch 17 and Effective Python 59 Specific Ways..)

def get_ips_from_sysinfo(urls):
    sysinfo = lx_request(urls)
    ip_dict =[]
    sysinfo = sysinfo["data"]
    hostname = sysinfo.get("hostname")
    network_array = sysinfo.get("networkArray", {})
    network_info = network_array.get("networkInfo", [])
    ips = []
    entry = {}
    entry["hostname"] = hostname
    entry["ip_addrs"] = []
    for ni in network_info:
        ip_array = ni.get("ipArray", {})
        ip_info = ip_array.get("ipInfo", [])
        for ip in ip_info:
            ip_addr = ip.get("ipAddress", None)
            if not ip_addr:
                ip_addr = ip.get("ipv6Address", None)
            if ip is None:
                continue
            if not is_ip_private(ip_addr):
                entry["ip_addrs"].append(ip_addr)
        if len(entry["ip_addrs"]) == 0:
            continue
        else:
            ip_dict.append(entry)
        return ip_dict

urls = get_sys_info(appliance, ids)

def main():
    pool = ThreadPoolExecutor(max_workers = 15)
    results = list(tqdm(pool.map(get_ips_from_sysinfo, urls), total=len(urls)))
    with open("ip_array.json", "w+") as f:
        json.dump(results, f,  indent=2, sort_keys=True)

main()

*Modified this works now, hope it helps someone else

查看更多
We Are One
4楼-- · 2019-08-01 02:57

you can use threads and queue to communicate, first you will start get_ips_from_sysinfo as a single thread to monitor and process any finished sysinfo which will store output in output_list then fire all get_sys_info threads, be careful not to run out of memory with 100k threads

from threading import Thread
from queue import Queue

jobs = Queue()  # buffer for sysinfo
output_list = []  # store ips

def get_sys_info(self, host_id, appliance):
    sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
    jobs.put(sysinfo)  # add sysinfo to jobs queue
    return sysinfo  # comment if you don't need it

def get_ips_from_sysinfo(self):
    """it will run contineously untill finish all jobd"""
    while True:
        # get sysinfo from jobs queue
        sysinfo = jobs.get()  # it will wait here for new entry
        if sysinfo == 'exit':
            print('we are done here')
            break

        sysinfo = sysinfo["data"]
        network_array = sysinfo.get("networkArray", {})
        network_info = network_array.get("networkInfo", [])
        ips = []
        for ni in network_info:
            ip_array = ni.get("ipArray", {})
            ip_info = ip_array.get("ipInfo", [])
            for i in ip_info:
                ips.append(i)
        output_list.append(ips)


if __name__ == "__main__":
    # start our listner thread
    Thread(target=rr.get_ips_from_sysinfo)

    threads = []
    for i in ids:
        t = Thread(target=rr.get_sys_info, args=(i, appliance))
        threads.append(t)
        t.start()

    # wait for threads to finish then terminate get_ips_from_sysinfo() by send 'exit' flag
    for t in threads:
        t.join()

    jobs.put('exit')
查看更多
不美不萌又怎样
5楼-- · 2019-08-01 03:04

As @wwii commented, concurrent.futures offer some conveniences that you may help you, particularly since this looks like a batch job.

It appears that your performance hit is most likely to come from the network calls so multithreading is probably more suitable for your use case (here is a comparison with multiprocessing). If not, you can switch the pool from threads to processes while using the same APIs.

from concurrent.futures import ThreadPoolExecutor, as_completed
# You can import ProcessPoolExecutor instead and use the same APIs

def thread_worker(instance, host_id, appliance):
    """Wrapper for your class's `get_sys_info` method"""
    sysinfo = instance.get_sys_info(host_id, appliance)
    return sysinfo, instance

# instantiate the class that contains the methods in your example code
# I will call it `RR`
instances = (RR(*your_args, **your_kwds) for your_args, your_kwds 
    in zip(iterable_of_args, iterable_of_kwds))
all_host_ids = another_iterable
all_appliances = still_another_iterable

if __name__ == "__main__":
   with ThreadPoolExecutor(max_workers=50) as executor:  # assuming 10 threads per core; your example uses 5 processes
        pool = {executor.submit(thread_worker, instance, _id, _app): (_id, _app)
            for _id, _app in zip(instances, all_host_ids, all_appliances)}

        # handle the `sysinfo` dicts as they arrive
        for future in as_completed(pool):
            _result = future.result()
            if isinstance(_sysinfo, Exception):  # just one way of handling exceptions
                # do something
                print(f"{pool[future]} raised {future.result()}")
            else:
                # enqueue results for parallel processing in a separate stage, or
                # process the results serially
                _sysinfo, _instance = _result
                ips = _instance.get_ips_from_sysinfo(_sysinfo)
                # do something with `ips`

You can streamline this example by refactoring your methods into functions, if indeed they don't make use of state as seems to be the case in your code.

If extracting the sysinfo data is expensive, you can enqueue the results and in turn feed those to a ProcessPoolExecutor that calls get_ips_from_sysinfo on the queued dicts.

查看更多
登录 后发表回答