Simultaneously run POST in Python

2019-09-09 13:05发布

I am trying to upload 100,000 data points to a web service backend. If I run it one at a time, it will take ~12 hours. They support 20 API calls simultaneously. How can I run this POST concurrently so I can speed up the import?

def AddPushTokens():

 import requests
 import csv
 import json

 count=0
 tokenList=[]

 apikey="12345"
 restkey="12345"
 URL="https://api.web.com/1/install/"
 headers={'content-type': 'application/json','Application-Id': apikey,'REST-API-Key':restkey}

 with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
      deviceTokens=csv.reader(csvfile, delimiter=',')

      for token in deviceTokens:

       deviceToken=token[0].replace("/","")
       deviceType="ios"
       pushToken="pushtoken_"+deviceToken
       payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
       r = requests.post(URL, data=json.dumps(payload), headers=headers)

       count=count+1
       print "Count: " + str(count)
       print r.content

Edit: I am trying to use concurrent.futures. Where I am confused is how do I set this up so it pulls the token from the CSV and passes it to load_url? Also, I want to make sure that it goes through the first 20 runs the requests, then picks up at 21 and runs the next set of 20.

import concurrent.futures
import requests

URLS = ['https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/']


apikey="12345"
restkey="12345"
URL="https://api.web.com/1/installations/"
headers={'content-type': 'application/json','X-web-Application-Id': apikey,'X-web-REST-API-Key':restkey}


     with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
     deviceTokens=csv.reader(csvfile, delimiter=',')

     for token in deviceTokens:

          deviceToken=token[0].replace("/","")
          deviceType="ios"
          pushToken="pushtoken_"+deviceToken
          payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
          r = requests.post(URL, data=json.dumps(payload), headers=headers)


# Retrieve a single page and report the url and contents
def load_url(token):

     URL='https://api.web.com/1/installations/'

     deviceToken=token[0].replace("/","")
     deviceType="ios"
     pushToken="pushtoken_"+deviceToken
     payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
     r = requests.post(URL, data=json.dumps(payload), headers=headers)

     count=count+1
     print "Count: " + str(count)
     print r.content

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

Edit: Updated based on Comments Below

import concurrent.futures
import requests
import csv
import json

apikey="ldy0eSCqPz9PsyOLAt35M2b0XrfDZT1NBW69Z7Bw"
restkey="587XASjEYdQwH2UHruA1yeZfT0oX7uAUJ8kWTmE3"
URL="https://api.parse.com/1/installations/"
headers={'content-type': 'application/json','X-Parse-Application-Id': apikey,'X-Parse-REST-API-Key':restkey}

with open('/Users/jgurwin/Desktop/push/push-new.csv','rU') as csvfile:
     deviceTokens=csv.reader(csvfile, delimiter=',')

     for device in deviceTokens:

        token=device[0].replace("/","")

        # Retrieve a single page and report the url and contents

        def load_url(token):

          count=0
          deviceType="ios"
          pushToken="pushtoken_"+token
          payload={"deviceType": deviceType,"deviceToken":token,"channels":["",pushToken]}
          r = requests.post(URL, data=json.dumps(payload), headers=headers)

          count=count+1
          print "Count: " + str(count)
          print r.content


        # We can use a with statement to ensure threads are cleaned up promptly
          with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
              # Start the load operations and mark each future with its URL
              future_to_token = {executor.submit(load_url, token, 60): token for token in deviceTokens}
              for future in concurrent.futures.as_completed(future_to_url):
                  url = future_to_url[future]
                  try:
                      data = future.result()
                  except Exception as exc:
                      print('%r generated an exception: %s' % (url, exc))
                  else:
                      print('%r page is %d bytes' % (url, len(data)))

2条回答
祖国的老花朵
2楼-- · 2019-09-09 13:25

The easy way to do this is with threads. The nearly-as-easy way is with gevent or a similar library (and grequests even ties gevent and requests together so you don't have to figure out how to do so). The hard way is building an event loop (or, better, using something like Twisted or Tulip) and multiplexing the requests yourself.

Let's do it the easy way.

You don't want to run 100000 threads at once. Besides the fact that it would take hundreds of GB of stack space, and your CPU would spend more time context-switching than running actual code, the service only supports 20 connections at once. So, you want 20 threads.

So, how do you run 100000 tasks on 20 threads? With a thread pool executor (or a bare thread pool).

The concurrent.futures docs have an example which is almost identical to what you want to do, except doing GETs instead of POSTs and using urllib instead of requests. Just change the load_url function to something like this:

def load_url(token):
    deviceToken=token[0].replace("/","")
    # … your original code here …
    r = requests.post(URL, data=json.dumps(payload), headers=headers)
    return r.content

… and the example will work as-is.

Since you're using Python 2.x, you don't have the concurrent.futures module in the stdlib; you'll need the backport, futures.


In Python (at least CPython), only one thread at a time can do any CPU work. If your tasks spend a lot more time downloading over the network (I/O work) than building requests and parsing responses (CPU work), that's not a problem. But if that isn't true, you'll want to use processes instead of threads. Which only requires replacing the ThreadPoolExecutor in the example with a ProcessPoolExecutor.


If you want to do this entirely in the 2.7 stdlib, it's nearly as trivial with the thread and process pools built into the multiprocessing. See Using a pool of workers and the Process Pools API, then see multiprocessing.dummy if you want to use threads instead of processes.

查看更多
老娘就宠你
3楼-- · 2019-09-09 13:30

Could be overkill, but you may like to have a look at Celery.

Tutorial

tasks.py could be:

from celery import Celery
import requests

app = Celery('tasks', broker='amqp://guest@localhost//')

apikey="12345"
restkey="12345"

URL="https://api.web.com/1/install/"
headers={'content-type': 'application/json','Application-Id': apikey,'REST-API-Key':restkey}

f = open('upload_data.log', 'a+')
@app.task
def upload_data(data, count):
    r = requests.post(URL, data=data, headers=headers)
    f.write("Count: %d\n%s\n\n" % (count, r.content)

Start celery task with:

$ celery -A tasks worker --loglevel=info -c 20

Then in another script:

import tasks
def AddPushTokens():

    import csv
    import json

    count=0
    tokenList=[]

    with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
        deviceTokens=csv.reader(csvfile, delimiter=',')

        for token in deviceTokens:
            deviceToken=token[0].replace("/","")
            deviceType="ios"
            pushToken="pushtoken_"+deviceToken
            payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
   r = tasks.upload_data.delay(json.dumps(payload), count)

   count=count+1

NOTE: Above code is sample. You may have to modify it for your requirement.

查看更多
登录 后发表回答