I have a code that is mimicking a REST API call (see below).
For every key in the item of the generator, it needs to run a REST call. So in my example, a record could be
{"a": 2, "b": 36, "c": 77}
I need to run a REST call for every key (a
, b
, and c
) individually, then output the results (which just negates the number):
{"a": 2, "a_neg": -2, "b": 36, "b_neg": -36, "c": 77, "c_neg": -77}
Right now my current code works for one key, but with multiple keys, it will repeat the items (so I'm getting triple the results for 3 keys).
Also there is some funky race condition that occurs as well. I guess I could only keep the last record, but I'm not good with threads and concerned about thread safety or other advanced stuff.
Here is an example output:
{'a': 89, 'a_neg': -89, 'b': 69, 'c': 38}
{'a': 89, 'a_neg': -89, 'b': 69, 'c': 38, 'c_neg': -38}
{'a': 89, 'a_neg': -89, 'b': 69, 'b_neg': -69, 'c': 38, 'c_neg': -38}
{'a': 90, 'a_neg': -90, 'b': 43, 'c': 16}
{'a': 90, 'a_neg': -90, 'b': 43, 'c': 16, 'c_neg': -16}
{'a': 90, 'a_neg': -90, 'b': 43, 'b_neg': -43, 'c': 16, 'c_neg': -16}
{'a': 91, 'a_neg': -91, 'b': 49, 'b_neg': -49, 'c': 77, 'c_neg': -77}
{'a': 91, 'a_neg': -91, 'b': 49, 'b_neg': -49, 'c': 77, 'c_neg': -77}
{'a': 91, 'a_neg': -91, 'b': 49, 'b_neg': -49, 'c': 77, 'c_neg': -77}
Finally here is my source code (you can run it yourself):
#!/usr/bin/env python
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from pprint import pprint
import random
def records():
# simulates records generator
for i in range(100):
yield {"a": i, "b": random.randint(0,100), "c": random.randint(0,100)}
def stream(records):
threads = 8
pool = ThreadPoolExecutor(threads)
def rest_api_lookup(record_dict):
# simulates REST call :)
sleep(0.1)
key = record_dict["key"]
record = record_dict["record"]
record[key + "_neg"] = -record[key]
return record
def thread(records):
chunk = []
for record in records:
for key in record:
chunk.append(pool.submit(rest_api_lookup, {"record": record, "key": key}))
if len(chunk) == threads:
yield chunk
chunk = []
if chunk:
yield chunk
def unchunk(chunk_gen):
"""Flattens a generator of Future chunks into a generator of Future results."""
for chunk in chunk_gen:
for f in chunk:
yield f.result() # get result from Future
# Now iterate over all results in same order as records
for result in unchunk(thread(records)):
#yield result
pprint(result)
stream(records())