可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I'm making over 100K calls to an api, using 2 functions I reach out to the api with the first function and grab the sysinfo(a dict) for each host, then with the second function I go through sysinfo and grab the IP addresses. I'm looking for a way to speed this up but never used multiprocessing/threading before(currently takes about 3 hours).
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
#pool = ThreadPool(4)
p = Pool(5)
#obviously I removed a lot of the code that generates some of these
#variables, but this is the part that slooooows everything down.
def get_sys_info(self, host_id, appliance):
sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
return sysinfo
def get_ips_from_sysinfo(self, sysinfo):
sysinfo = sysinfo["data"]
network_array = sysinfo.get("networkArray", {})
network_info = network_array.get("networkInfo", [])
ips = []
for ni in network_info:
ip_array = ni.get("ipArray", {})
ip_info = ip_array.get("ipInfo", [])
for i in ip_info:
ips.append(i)
return ips
if __name__ == "__main__":
for i in ids:
sysinfo = rr.get_sys_info(i, appliance)
hostname = sysinfo.get("data", {}).get("hostname")
try:
ips = p.map(rr.get_ips_from_sysinfo(sysinfo))
except Exception as e:
rr.logger.error("Exception on {} -- {}".format(hostname, e))
continue
#Tried calling it here
ips = p.map(rr.get_ips_from_sysinfo(sysinfo))
I have to go through over 100,000 of these api calls, and this is really the part that slows everything down.
I think I've tried everything and gotten every possible iterable, missing argument error.
I'd just really appreciate any type of help. Thank you!
回答1:
you can use threads and queue to communicate, first you will start get_ips_from_sysinfo
as a single thread to monitor and process any finished sysinfo
which will store output in output_list
then fire all get_sys_info
threads, be careful not to run out of memory with 100k threads
from threading import Thread
from queue import Queue
jobs = Queue() # buffer for sysinfo
output_list = [] # store ips
def get_sys_info(self, host_id, appliance):
sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
jobs.put(sysinfo) # add sysinfo to jobs queue
return sysinfo # comment if you don't need it
def get_ips_from_sysinfo(self):
"""it will run contineously untill finish all jobd"""
while True:
# get sysinfo from jobs queue
sysinfo = jobs.get() # it will wait here for new entry
if sysinfo == 'exit':
print('we are done here')
break
sysinfo = sysinfo["data"]
network_array = sysinfo.get("networkArray", {})
network_info = network_array.get("networkInfo", [])
ips = []
for ni in network_info:
ip_array = ni.get("ipArray", {})
ip_info = ip_array.get("ipInfo", [])
for i in ip_info:
ips.append(i)
output_list.append(ips)
if __name__ == "__main__":
# start our listner thread
Thread(target=rr.get_ips_from_sysinfo)
threads = []
for i in ids:
t = Thread(target=rr.get_sys_info, args=(i, appliance))
threads.append(t)
t.start()
# wait for threads to finish then terminate get_ips_from_sysinfo() by send 'exit' flag
for t in threads:
t.join()
jobs.put('exit')
回答2:
As @wwii commented, concurrent.futures
offer some conveniences that you may help you, particularly since this looks like a batch job.
It appears that your performance hit is most likely to come from the network calls so multithreading is probably more suitable for your use case (here is a comparison with multiprocessing). If not, you can switch the pool from threads to processes while using the same APIs.
from concurrent.futures import ThreadPoolExecutor, as_completed
# You can import ProcessPoolExecutor instead and use the same APIs
def thread_worker(instance, host_id, appliance):
"""Wrapper for your class's `get_sys_info` method"""
sysinfo = instance.get_sys_info(host_id, appliance)
return sysinfo, instance
# instantiate the class that contains the methods in your example code
# I will call it `RR`
instances = (RR(*your_args, **your_kwds) for your_args, your_kwds
in zip(iterable_of_args, iterable_of_kwds))
all_host_ids = another_iterable
all_appliances = still_another_iterable
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=50) as executor: # assuming 10 threads per core; your example uses 5 processes
pool = {executor.submit(thread_worker, instance, _id, _app): (_id, _app)
for _id, _app in zip(instances, all_host_ids, all_appliances)}
# handle the `sysinfo` dicts as they arrive
for future in as_completed(pool):
_result = future.result()
if isinstance(_sysinfo, Exception): # just one way of handling exceptions
# do something
print(f"{pool[future]} raised {future.result()}")
else:
# enqueue results for parallel processing in a separate stage, or
# process the results serially
_sysinfo, _instance = _result
ips = _instance.get_ips_from_sysinfo(_sysinfo)
# do something with `ips`
You can streamline this example by refactoring your methods into functions, if indeed they don't make use of state as seems to be the case in your code.
If extracting the sysinfo
data is expensive, you can enqueue the results and in turn feed those to a ProcessPoolExecutor
that calls get_ips_from_sysinfo
on the queued dicts.
回答3:
For whatever reason I was a little leary about calling an instance method in numerous threads - but it seems to work. I made this toy example using concurrent.futures - hopefully it mimics your actual situation well enough. This submits 4000 instance method calls to a thread pool of (at max) 500 workers. Playing around with the max_workers
value I found that execution time improvements were pretty linear up to about a 1000 workers then the improvement ratio started tailing off.
import concurrent.futures, time, random
a = [.001*n for n in range(1,4001)]
class F:
def __init__(self, name):
self.name = f'{name}:{self.__class__.__name__}'
def apicall(self,n):
wait = random.choice(a)
time.sleep(wait)
return (n,wait, self.name)
f = F('foo')
if __name__ == '__main__':
nworkers = 500
with concurrent.futures.ThreadPoolExecutor(nworkers) as executor:
# t = time.time()
futures = [executor.submit(f.apicall, n) for n in range(4000)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
# t = time.time() - t
# q = sum(r[1] for r in results)
# print(f'# workers:{nworkers} - ratio:{q/t}')
I didn't account for possible Exceptions being thrown during the method call but the example in the docs is pretty clear how to handle that.
回答4:
So... after days of looking at the suggestions on here(thank you so much!!!) And a couple outside reading (Fluent Python Ch 17 and Effective Python 59 Specific Ways..)
def get_ips_from_sysinfo(urls):
sysinfo = lx_request(urls)
ip_dict =[]
sysinfo = sysinfo["data"]
hostname = sysinfo.get("hostname")
network_array = sysinfo.get("networkArray", {})
network_info = network_array.get("networkInfo", [])
ips = []
entry = {}
entry["hostname"] = hostname
entry["ip_addrs"] = []
for ni in network_info:
ip_array = ni.get("ipArray", {})
ip_info = ip_array.get("ipInfo", [])
for ip in ip_info:
ip_addr = ip.get("ipAddress", None)
if not ip_addr:
ip_addr = ip.get("ipv6Address", None)
if ip is None:
continue
if not is_ip_private(ip_addr):
entry["ip_addrs"].append(ip_addr)
if len(entry["ip_addrs"]) == 0:
continue
else:
ip_dict.append(entry)
return ip_dict
urls = get_sys_info(appliance, ids)
def main():
pool = ThreadPoolExecutor(max_workers = 15)
results = list(tqdm(pool.map(get_ips_from_sysinfo, urls), total=len(urls)))
with open("ip_array.json", "w+") as f:
json.dump(results, f, indent=2, sort_keys=True)
main()
*Modified this works now, hope it helps someone else