I am trying to upload 100,000 data points to a web service backend. If I run it one at a time, it will take ~12 hours. They support 20 API calls simultaneously. How can I run this POST concurrently so I can speed up the import?
def AddPushTokens():
import requests
import csv
import json
count=0
tokenList=[]
apikey="12345"
restkey="12345"
URL="https://api.web.com/1/install/"
headers={'content-type': 'application/json','Application-Id': apikey,'REST-API-Key':restkey}
with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
deviceTokens=csv.reader(csvfile, delimiter=',')
for token in deviceTokens:
deviceToken=token[0].replace("/","")
deviceType="ios"
pushToken="pushtoken_"+deviceToken
payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
r = requests.post(URL, data=json.dumps(payload), headers=headers)
count=count+1
print "Count: " + str(count)
print r.content
Edit: I am trying to use concurrent.futures. Where I am confused is how do I set this up so it pulls the token from the CSV and passes it to load_url? Also, I want to make sure that it goes through the first 20 runs the requests, then picks up at 21 and runs the next set of 20.
import concurrent.futures
import requests
URLS = ['https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/',
'https://api.web.com/1/installations/']
apikey="12345"
restkey="12345"
URL="https://api.web.com/1/installations/"
headers={'content-type': 'application/json','X-web-Application-Id': apikey,'X-web-REST-API-Key':restkey}
with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
deviceTokens=csv.reader(csvfile, delimiter=',')
for token in deviceTokens:
deviceToken=token[0].replace("/","")
deviceType="ios"
pushToken="pushtoken_"+deviceToken
payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
r = requests.post(URL, data=json.dumps(payload), headers=headers)
# Retrieve a single page and report the url and contents
def load_url(token):
URL='https://api.web.com/1/installations/'
deviceToken=token[0].replace("/","")
deviceType="ios"
pushToken="pushtoken_"+deviceToken
payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
r = requests.post(URL, data=json.dumps(payload), headers=headers)
count=count+1
print "Count: " + str(count)
print r.content
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
Edit: Updated based on Comments Below
import concurrent.futures
import requests
import csv
import json
apikey="ldy0eSCqPz9PsyOLAt35M2b0XrfDZT1NBW69Z7Bw"
restkey="587XASjEYdQwH2UHruA1yeZfT0oX7uAUJ8kWTmE3"
URL="https://api.parse.com/1/installations/"
headers={'content-type': 'application/json','X-Parse-Application-Id': apikey,'X-Parse-REST-API-Key':restkey}
with open('/Users/jgurwin/Desktop/push/push-new.csv','rU') as csvfile:
deviceTokens=csv.reader(csvfile, delimiter=',')
for device in deviceTokens:
token=device[0].replace("/","")
# Retrieve a single page and report the url and contents
def load_url(token):
count=0
deviceType="ios"
pushToken="pushtoken_"+token
payload={"deviceType": deviceType,"deviceToken":token,"channels":["",pushToken]}
r = requests.post(URL, data=json.dumps(payload), headers=headers)
count=count+1
print "Count: " + str(count)
print r.content
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
# Start the load operations and mark each future with its URL
future_to_token = {executor.submit(load_url, token, 60): token for token in deviceTokens}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
The easy way to do this is with threads. The nearly-as-easy way is with
gevent
or a similar library (andgrequests
even tiesgevent
andrequests
together so you don't have to figure out how to do so). The hard way is building an event loop (or, better, using something like Twisted or Tulip) and multiplexing the requests yourself.Let's do it the easy way.
You don't want to run 100000 threads at once. Besides the fact that it would take hundreds of GB of stack space, and your CPU would spend more time context-switching than running actual code, the service only supports 20 connections at once. So, you want 20 threads.
So, how do you run 100000 tasks on 20 threads? With a thread pool executor (or a bare thread pool).
The
concurrent.futures
docs have an example which is almost identical to what you want to do, except doing GETs instead of POSTs and usingurllib
instead ofrequests
. Just change theload_url
function to something like this:… and the example will work as-is.
Since you're using Python 2.x, you don't have the
concurrent.futures
module in the stdlib; you'll need the backport,futures
.In Python (at least CPython), only one thread at a time can do any CPU work. If your tasks spend a lot more time downloading over the network (I/O work) than building requests and parsing responses (CPU work), that's not a problem. But if that isn't true, you'll want to use processes instead of threads. Which only requires replacing the
ThreadPoolExecutor
in the example with aProcessPoolExecutor
.If you want to do this entirely in the 2.7 stdlib, it's nearly as trivial with the thread and process pools built into the
multiprocessing
. See Using a pool of workers and the Process Pools API, then seemultiprocessing.dummy
if you want to use threads instead of processes.Could be overkill, but you may like to have a look at Celery.
Tutorial
tasks.py could be:
Start celery task with:
Then in another script:
NOTE: Above code is sample. You may have to modify it for your requirement.