Python multiprocessing Pool.apply_async with share

2020-06-27 10:54发布

问题:

For my college project I am trying to develop a python based traffic generator.I have created 2 CentOS machines on vmware and I am using 1 as my client and 1 as my server machine. I have used IP aliasing technique to increase number of clients and severs using just single client/server machine. Upto now I have created 50 IP alias on my client machine and 10 IP alias on my server machine. I am also using multiprocessing module to generate traffic concurrently from all 50 clients to all 10 servers. I have also developed few profiles(1kb,10kb,50kb,100kb,500kb,1mb) on my server(in /var/www/html directory since I am using Apache Server) and I am using urllib2 to send request to these profiles from my client machine. I am using httplib+urllib2 to first bind to any one of the source alias ip and then send request from this ip using urllib2. Here to increase my number of TCP Connections, I am trying to use multiprocessing.Pool.apply_async module. But I am getting this error 'RuntimeError: Synchronized objects should only be shared between processes through inheritance' while running my scripts. After a bit of debugging I found that this error is caused due to use of multiprocessing.Value. But I want to share some variables between my processes and I also want to increase my number of TCP Connections. What other module(other than multiprocessing.Value) can be used here to share some common variables? Or else is there any other solution for this query?

'''
Traffic Generator Script:

 Here I have used IP Aliasing to create multiple clients on single vm machine. 
 Same I have done on server side to create multiple servers. I have around 50 clients and 10 servers
'''
import multiprocessing
import urllib2
import random
import myurllist    #list of all destination urls for all 10 servers
import time
import socbindtry   #script that binds various virtual/aliased client ips to the script
m=multiprocessing.Manager()
response_time=m.list()    #some shared variables
error_count=multiprocessing.Value('i',0)
def send_request3():    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
def send_request4():    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
#50 such functions are defined here for 50 clients
def func():
    pool=multiprocessing.Pool(processes=750)
    for i in range(5):
        pool.apply_async(send_request3)
        pool.apply_async(send_request4)
        pool.apply_async(send_request5)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return
start=float(time.time())
func()
end=float(time.time())-start
print end

回答1:

As the error messages states, you can't pass a multiprocessing.Value via pickle. However, you can use a multiprocessing.Manager().Value:

import multiprocessing
import urllib2
import random
import myurllist    #list of all destination urls for all 10 servers
import time
import socbindtry   #script that binds various virtual/aliased client ips to the script

def send_request3(response_time, error_count):    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        with error_count.get_lock():
            error_count.value += 1

def send_request4(response_time, error_count):    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        with error_count.get_lock():
            error_count.value += 1

#50 such functions are defined here for 50 clients

def func(response_time, error_count):
    pool=multiprocessing.Pool(processes=2*multiprocessing.cpu_count())
    args = (response_time, error_count)
    for i in range(5):
        pool.apply_async(send_request3, args=args)
        pool.apply_async(send_request4, args=args)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return

if __name__ == "__main__":
    m=multiprocessing.Manager()
    response_time=m.list()    #some shared variables
    error_count=m.Value('i',0)

    start=float(time.time())
    func(response_time, error_count)
    end=float(time.time())-start
    print end

A few other notes here:

  1. Using a Pool with 750 processes is not a good idea. Unless you're using a server with hundreds of CPU cores, that's going to overwhelm your machine. It'd be faster and put less strain on your machine to use significantly fewer processes. Something more like 2 * multiprocessing.cpu_count().
  2. As a best practice, you should explicitly pass all the shared arguments you need to use to the child processes, rather than using global variables. This increases the chances that the code will be work on Windows.
  3. It looks like all your send_request* functions do almost the exact same thing. Why not just make one function and use a variable to decide which socbindtry.BindableHTTPHandler to use? You would avoid a ton of code duplication by doing this.
  4. The way you're incrementing error_count is not process/thread-safe, and is susceptible to race conditions. You need to protect the increment with a lock (as I did in the example code above).


回答2:

Possibly, because Python Multiprocess diff between Windows and Linux (I seriously, don't know how multiprocessing works in VMs, as is the case here.)

This might work;

import multiprocessing
import random
import myurllist    #list of all destination urls for all 10 servers
import time

def send_request3(response_time, error_count):    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
def send_request4(response_time, error_count):    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
#50 such functions are defined here for 50 clients
def func():
    m=multiprocessing.Manager()
    response_time=m.list()    #some shared variables
    error_count=multiprocessing.Value('i',0)

    pool=multiprocessing.Pool(processes=750)
    for i in range(5):
        pool.apply_async(send_request3, [response_time, error_count])
        pool.apply_async(send_request4, [response_time, error_count])
        # pool.apply_async(send_request5)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return


start=float(time.time())
func()
end=float(time.time())-start
print end