golang + redis concurrency scheduler performance i

2019-08-29 16:28发布

I write a simple concurrency scheduler, but it seems to have a performance issue at a high level concurrency.

Here is the code (scheduler + concurrent rate limiter test):

package main

import (


// a scheduler is composed by load function and process function
type Scheduler struct {
    // query channel
    reqChan chan interface{}

    // max routine
    maxRoutine int

    // max routine
    chanSize int

    wg sync.WaitGroup

    // query process function
    process func(interface{})

func NewScheduler(maxRoutine int, chanSize int, process func(interface{})) *Scheduler {
    s := &Scheduler{}
    if maxRoutine == 0 {
        s.maxRoutine = 10
    } else {
        s.maxRoutine = maxRoutine

    if chanSize == 0 {
        s.chanSize = 100
    } else {
        s.chanSize = chanSize

    s.reqChan = make(chan interface{}, s.chanSize)
    s.process = process
    return s

func (s *Scheduler) Start() {
    // start process
    for i := 0; i < s.maxRoutine; i++ {
        go s.processRequest()

func (s *Scheduler) processRequest() {
    for {
        select {
        case req := <-s.reqChan:

func (s *Scheduler) Enqueue(req interface{}) {
    select {
    case s.reqChan <- req:

func (s *Scheduler) Wait() {

const script = `
local required_permits = tonumber(ARGV[2]);

local next_free_micros = redis.call('hget',KEYS[1],'next_free_micros');
if(next_free_micros == false) then
    next_free_micros = 0;
    next_free_micros = tonumber(next_free_micros);

local time = redis.call('time');
local now_micros = tonumber(time[1])*1000000 + tonumber(time[2]);

try aquire
if(ARGV[3] ~= nil) then
    local micros_to_wait = next_free_micros - now_micros;
    if(micros_to_wait > tonumber(ARGV[3])) then
        return micros_to_wait;

local stored_permits = redis.call('hget',KEYS[1],'stored_permits');
if(stored_permits == false) then
    stored_permits = 0;
    stored_permits = tonumber(stored_permits);

local stable_interval_micros = 1000000/tonumber(ARGV[1]);
local max_stored_permits = tonumber(ARGV[1]);

if(now_micros > next_free_micros) then
    local new_stored_permits = stored_permits + (now_micros - next_free_micros) / stable_interval_micros;
    if(max_stored_permits < new_stored_permits) then
        stored_permits = max_stored_permits;
        stored_permits = new_stored_permits;
    next_free_micros = now_micros;

local moment_available = next_free_micros;
local stored_permits_to_spend = 0;
if(stored_permits < required_permits) then
    stored_permits_to_spend = stored_permits;
    stored_permits_to_spend = required_permits;
local fresh_permits = required_permits - stored_permits_to_spend;
local wait_micros = fresh_permits * stable_interval_micros;

redis.call('hset',KEYS[1],'stored_permits',stored_permits - stored_permits_to_spend);
redis.call('hset',KEYS[1],'next_free_micros',next_free_micros + wait_micros);

return moment_available - now_micros;

var (
    rlScript *redis.Script

func init() {
    rlScript = redis.NewScript(1, script)

func take(key string, qps, requires int, pool *redis.Pool) (int64, error) {
    c := pool.Get()
    defer c.Close()

    var err error
    if err := c.Err(); err != nil {
        return 0, err

    reply, err := rlScript.Do(c, key, qps, requires)
    if err != nil {
        return 0, err
    return reply.(int64), nil

func NewRedisPool(address, password string) *redis.Pool {
    pool := &redis.Pool{
        MaxIdle:     50,
        IdleTimeout: 240 * time.Second,
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            _, err := c.Do("PING")
            return err
        Dial: func() (redis.Conn, error) {
            return dial("tcp", address, password)
    return pool

func dial(network, address, password string) (redis.Conn, error) {
    c, err := redis.Dial(network, address)
    if err != nil {
        return nil, err
    if password != "" {
        if _, err := c.Do("AUTH", password); err != nil {
            return nil, err
    return c, err

func main() {
    var cpuprofile = flag.String("cpuprofile", "", "write cpu profile `file`")
    var memprofile = flag.String("memprofile", "", "write memory profile to `file`")
    if *cpuprofile != "" {
        f, err := os.Create(*cpuprofile)
        if err != nil {
            log.Fatal("could not create CPU profile: ", err)
        if err := pprof.StartCPUProfile(f); err != nil {
            log.Fatal("could not start CPU profile: ", err)
        defer pprof.StopCPUProfile()


    if *memprofile != "" {
        f, err := os.Create(*memprofile)
        if err != nil {
            log.Fatal("could not create memory profile: ", err)
        runtime.GC() // get up-to-date statistics
        if err := pprof.WriteHeapProfile(f); err != nil {
            log.Fatal("could not write memory profile: ", err)

func test() {
    pool := NewRedisPool("", "")

    s1 := NewScheduler(10000, 1000000, func(r interface{}) {
        take("xxx", 1000000, 1, pool)

    start := time.Now()
    for i := 0; i < 100000; i++ {

The problem is at 10000 routines, sometimes the program gets stuck even no command is send to redis (check with "redis-cli monitor"), and my system max open-files is set to 20000.

I did the profiling,a lot of "syscall.Syscall", could someone give any advice? is there something wrong with my scheduler?

2楼-- · 2019-08-29 17:17

At a surface level the only thing that I have questions about is ordering of incrementing wait group and enqueing the work:

func (s *Scheduler) Enqueue(req interface{}) {
    select {
    case s.reqChan <- req:

I don't think the above will cause much problem in practice with this large of workload, but I think it may be a logical race condition. At lower levels of concurrency and smaller work sizes, it may enqueue a message, conext switch to a goroutine that starts work on that message, THEN the work in the wait group.

Next are you sure process method is threadsafe?? I'd assume so based on the redis go documentation , does running with go run -race have any output?

At some point It's completely reasonable and expected for performance to drop off. I would recommend starting performance tests to see where latency and throughput start to drop off:

maybe a pool of 10, 100, 500, 1000, 2500, 5000, 10000, or whatever makes sense. IMO it looks like there are 3 important variables to tune:

  • Worker pool size
  • Work Queue Buffer Size
  • Redis MaxActive

The biggest thing that jumps out is that it looks like redis.Pool is configured to allow an unbounded number of connections:

 pool := &redis.Pool{
        MaxIdle:     50,
        IdleTimeout: 240 * time.Second,
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            _, err := c.Do("PING")
            return err
        Dial: func() (redis.Conn, error) {
            return dial("tcp", address, password)

// Maximum number of connections allocated by the pool at a given time. // When zero, there is no limit on the number of connections in the pool. MaxActive int

I would personally try to understand where and when performance starts to drop off with respect to the size of your worker pool. This might make it easier to understand what your program is constrained by.

3楼-- · 2019-08-29 17:28

My test result shows, when the routine number increases, the execution time per routine per take function increases nearly exponentially.

It should be a problem of redis, here is reply from redis library community:

The problem is what you suspected the pool connection lock, which if your requests are small / quick will pushing the serialisation of your requests.

You should note that redis is single threaded so you should be able to obtain peak performance with just a single connection. This isn't quite true due to the round trip delays from client to server but in this type of use case a limited number of processors is likely the best approach.

I have some ideas on how we could improve pool.Get() / conn.Close() but in your case tuning the number of routines would be the best approach.

登录 后发表回答