calling an api concurrently in python

2019-07-11 06:09发布

问题:

I need to talk to an api to get information about teams. Each team has a unique id. I call the api with that id, and I get a list of players on each team (list of dicts). One of the keys for a player is another id that I can use to get more information about that player. I can bundle all these player_ids and make a call to the api to get all the additional information for each player in one api call.

My question is this: I expect the number of teams to grow, it could be quite large. Also, the number of players for each team could also grow large.

What is the best way to make these api calls concurrently to the api? I can use the ThreadPool from multiprocessing.dummy, I have also seen genvent used for something like this.

The calls to the api take some time to get a return value (1-2 seconds for each bulk api call).

Right now, what I do is this:

for each team:
    get the list of players
    store the player_ids in a list
    get the player information for all the players (passing the list of player_ids)
assemble and process the information

If I use ThreadPool, I can do the following:

create a ThreadPool of size x
result = pool.map(function_to_get_team_info, list of teams)
pool.close()
pool.join()
#process results

def function_to_get_team_info(team_id):
    players = api.call(team_id)
    player_info = get_players_information(players)
    return player_info

def get_players_information(players):
    player_ids = []
    for player in players:
        player_ids.append(player['id'])
    return get_all_player_stats(player_ids)

def get_all_player_stats(players_id):
    return api.call(players_id)

This processes each team concurrently, and assembles all the information back in the ThreadPool results.

In order to make this completely concurrent, I think I would need to make my ThreadPool the size of the number of teams. But I don't think this scales well. So, I was wondering if I used gevent to process this information if that would be a better approach.

Any suggestions would be very welcome

回答1:

One solution would be to:

  • prepare a list of tasks to perform, in your case list of teams IDs to be processed,
  • create fixed pool of N thread workers,
  • each worker thread pops a task from the list and processes the task (downloads team data), after completion it pops another task,
  • when task list is empty, the worker thread stops.

This solution could safe you from the case when processing of a particular team takes e.g. 100 time units, when other teams are processed in 1 time unit (on an average).

You can tune number of thread workers depending on number of teams, average team processing time, number of CPU cores etc.

Extended answer

This can be achieved with the Python multiprocessing.Pool:

from multiprocessing import Pool

def api_call(id):
    pass # call API for given id

if __name__ == '__main__':
    p = Pool(5)
    p.map(api_call, [1, 2, 3])