How Can I Effectively 'Max Out' Concurrent

2019-01-22 16:49发布

问题:

I'm currently trying a bit of an experiment with Go. Here's what I'm attempting to do:

I've got a REST API service running, and I'd like to query a specific URL over and over again in as many Goroutines as possible, to see how performant these responses are (by viewing my REST API server logs). I'd like to send off a total of 1 million HTTP requests before quitting the program -- executing as many concurrently as my computer will allow.

I'm aware that there are tools which are meant to do this, but I'm primarily interested in how to maximize my HTTP concurrency in Go with goroutines.

Here's my code:

package main

import (
    "fmt"
    "net/http"
    "runtime"
    "time"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    transport := &http.Transport{}

    for i := 0; i < 1000000; i++ {
        go func() {
            req, _ := http.NewRequest("GET", "http://myapi.com", nil)
            req.Header.Set("User-Agent", "custom-agent")
            req.SetBasicAuth("xxx", "xxx")
            resp, err := transport.RoundTrip(req)
            if err != nil {
                panic("HTTP request failed.")
            }
            defer resp.Body.Close()

            if resp.StatusCode != 302 {
                panic("Unexpected response returned.")
            }

            location := resp.Header.Get("Location")
            if location == "" {
                panic("No location header returned.")
            }
            fmt.Println("Location Header Value:", location)
        }()
    }

    time.Sleep(60 * time.Second)
}

What I'm expecting this code to do is:

  • Start 1,000,000 goroutines, each one making HTTP requests to my API service.
  • Run the goroutines concurrently across all of my CPUs (since I used the runtime package to increase the GOMAXPROCS setting).

What happens, however, is that I get the following errors (too many to paste, so I'm only including a bit of the output):

goroutine 16680 [IO wait]:
net.runtime_pollWait(0xcb1d878, 0x77, 0x0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/runtime/netpoll.goc:116 +0x6a
net.(*pollDesc).Wait(0xc212a86ca0, 0x77, 0x55d0c0, 0x24)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:81 +0x34
net.(*pollDesc).WaitWrite(0xc212a86ca0, 0x24, 0x55d0c0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:90 +0x30
net.(*netFD).connect(0xc212a86c40, 0x0, 0x0, 0xb4c97e8, 0xc212a84500, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:86 +0x166
net.(*netFD).dial(0xc212a86c40, 0xb4c87d8, 0x0, 0xb4c87d8, 0xc212a878d0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:121 +0x2fd
net.socket(0x2402c0, 0x3, 0x2, 0x1, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:91 +0x40b
net.internetSocket(0x2402c0, 0x3, 0xb4c87d8, 0x0, 0xb4c87d8, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/ipsock_posix.go:136 +0x161
net.dialTCP(0x2402c0, 0x3, 0x0, 0xc212a878d0, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/tcpsock_posix.go:155 +0xef
net.dialSingle(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:225 +0x3d8
net.func·015(0x0, 0x0, 0x0, 0x2402c0, 0x3, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:158 +0xde
net.dial(0x2402c0, 0x3, 0xb4c8748, 0xc212a878d0, 0xafbbcd8, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:40 +0x45
net.(*Dialer).Dial(0xafbbd78, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:165 +0x3e0
net.Dial(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:138 +0x75
net/http.(*Transport).dial(0xc210057280, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:401 +0xd4
net/http.(*Transport).dialConn(0xc210057280, 0xc2112efa80, 0x0, 0x0, 0x0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:444 +0x6e
net/http.func·014()
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:419 +0x3e
created by net/http.(*Transport).getConn
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:421 +0x11a

I'm running this script on a Mac OSX 10.9.2 laptop with 16GB of RAM and a 2.6GHz Intel Core i5 processor.

What can I do to 'flood' my laptop with as many concurrent HTTP requests as possible?

回答1:

As Rob Napier suggested, your almost certainly hitting file descriptor limits.

EDIT: Improved, concurrent version:

This program creates a worker pool of max goroutines, which pull requests off a channel, process them, and send them on a response channel. The requests are queued by a dispatcher, the goroutines are started by a workerPool, the workers each process one job at a time until the request channel is empty, and the consumer processes the response channel until the number of successful responses equals the number of requests.

package main

import (
    "flag"
    "fmt"
    "log"
    "net/http"
    "runtime"
    "time"
)

var (
    reqs int
    max  int
)

func init() {
    flag.IntVar(&reqs, "reqs", 1000000, "Total requests")
    flag.IntVar(&max, "concurrent", 200, "Maximum concurrent requests")
}

type Response struct {
    *http.Response
    err error
}

// Dispatcher
func dispatcher(reqChan chan *http.Request) {
    defer close(reqChan)
    for i := 0; i < reqs; i++ {
        req, err := http.NewRequest("GET", "http://localhost/", nil)
        if err != nil {
            log.Println(err)
        }
        reqChan <- req
    }
}

// Worker Pool
func workerPool(reqChan chan *http.Request, respChan chan Response) {
    t := &http.Transport{}
    for i := 0; i < max; i++ {
        go worker(t, reqChan, respChan)
    }
}

// Worker
func worker(t *http.Transport, reqChan chan *http.Request, respChan chan Response) {
    for req := range reqChan {
        resp, err := t.RoundTrip(req)
        r := Response{resp, err}
        respChan <- r
    }
}

// Consumer
func consumer(respChan chan Response) (int64, int64) {
    var (
        conns int64
        size  int64
    )
    for conns < int64(reqs) {
        select {
        case r, ok := <-respChan:
            if ok {
                if r.err != nil {
                    log.Println(r.err)
                } else {
                    size += r.ContentLength
                    if err := r.Body.Close(); err != nil {
                        log.Println(r.err)
                    }
                }
                conns++
            }
        }
    }
    return conns, size
}

func main() {
    flag.Parse()
    runtime.GOMAXPROCS(runtime.NumCPU())
    reqChan := make(chan *http.Request)
    respChan := make(chan Response)
    start := time.Now()
    go dispatcher(reqChan)
    go workerPool(reqChan, respChan)
    conns, size := consumer(respChan)
    took := time.Since(start)
    ns := took.Nanoseconds()
    av := ns / conns
    average, err := time.ParseDuration(fmt.Sprintf("%d", av) + "ns")
    if err != nil {
        log.Println(err)
    }
    fmt.Printf("Connections:\t%d\nConcurrent:\t%d\nTotal size:\t%d bytes\nTotal time:\t%s\nAverage time:\t%s\n", conns, max, size, took, average)
}

Produces:

Connections: 1000000
Concurrent: 200
Total size: 15000000 bytes
Total time: 36m39.6778103s
Average time: 2.199677ms

WARNING: This very rapidly hits system resource limits. On my laptop, anything more than 206 concurrent workers caused my local test web server to crash!

Playground

ORIGINAL ANSWER: The program below uses a buffered chan bool as a semaphore channel, which limits the number of concurrent requests. You can tweak this number, and the total number of requests in order to stress test your system and determine maxima.

package main

import (
    "fmt"
    "net/http"
    "runtime"
    "time"
)

type Resp struct {
    *http.Response
    err error
}

func makeResponses(reqs int, rc chan Resp, sem chan bool) {
    defer close(rc)
    defer close(sem)
    for reqs > 0 {
        select {
        case sem <- true:
            req, _ := http.NewRequest("GET", "http://localhost/", nil)
            transport := &http.Transport{}
            resp, err := transport.RoundTrip(req)
            r := Resp{resp, err}
            rc <- r
            reqs--
        default:
            <-sem
        }
    }
}

func getResponses(rc chan Resp) int {
    conns := 0
    for {
        select {
        case r, ok := <-rc:
            if ok {
                conns++
                if r.err != nil {
                    fmt.Println(r.err)
                } else {
                    // Do something with response
                    if err := r.Body.Close(); err != nil {
                        fmt.Println(r.err)
                    }
                }
            } else {
                return conns
            }
        }
    }
}

func main() {
    reqs := 100000
    maxConcurrent := 1000
    runtime.GOMAXPROCS(runtime.NumCPU())
    rc := make(chan Resp)
    sem := make(chan bool, maxConcurrent)
    start := time.Now()
    go makeResponses(reqs, rc, sem)
    conns := getResponses(rc)
    end := time.Since(start)
    fmt.Printf("Connections: %d\nTotal time: %s\n", conns, end)
}

This will print something like:

Connections: 100000
Total time: 6m8.2554629s

This test was done on a local web server, which returned a total response size of 85B per request, so it's not a realistic result. Also, I'm doing no processing on the response, except to close it's body.

At a maximum of 1000 concurrent requests it took my laptop just over 6 minutes to do 100,000 requests so I'm guessing a million would take over an hour. Tweaking the maxConcurrent variable should help you home in the maximum performance for your system.



回答2:

You're almost certainly running into a file descriptor limit. The default limit is 2560 (the old limit was 256, but I think they x10'd it at some point). I'm fairly certain the highest you can set it is 10,000.

I don't know if you'll ever be able to get a million simultaneous connections out of one machine this way. You may want to try a hybrid of processes and goroutines: 10k processes at 1000 goroutines per process, but I would not be surprised if you run into the systemwide limits anyway.

To get what you want, I believe you're going to need to rate limit (with a buffered channel semaphore) so that you're not making more than several thousand connections at the same time if the goal is just to hit the API as hard as you can simply and from one host (and one network card).