I write a simple concurrency scheduler, but it seems to have a performance issue at a high level concurrency.
Here is the code (scheduler + concurrent rate limiter test):
package main
import (
"flag"
"fmt"
"log"
"os"
"runtime"
"runtime/pprof"
"sync"
"time"
"github.com/gomodule/redigo/redis"
)
// a scheduler is composed by load function and process function
type Scheduler struct {
// query channel
reqChan chan interface{}
// max routine
maxRoutine int
// max routine
chanSize int
wg sync.WaitGroup
// query process function
process func(interface{})
}
func NewScheduler(maxRoutine int, chanSize int, process func(interface{})) *Scheduler {
s := &Scheduler{}
if maxRoutine == 0 {
s.maxRoutine = 10
} else {
s.maxRoutine = maxRoutine
}
if chanSize == 0 {
s.chanSize = 100
} else {
s.chanSize = chanSize
}
s.reqChan = make(chan interface{}, s.chanSize)
s.process = process
return s
}
func (s *Scheduler) Start() {
// start process
for i := 0; i < s.maxRoutine; i++ {
go s.processRequest()
}
}
func (s *Scheduler) processRequest() {
for {
select {
case req := <-s.reqChan:
s.process(req)
s.wg.Done()
}
}
}
func (s *Scheduler) Enqueue(req interface{}) {
select {
case s.reqChan <- req:
s.wg.Add(1)
}
}
func (s *Scheduler) Wait() {
s.wg.Wait()
}
const script = `
local required_permits = tonumber(ARGV[2]);
local next_free_micros = redis.call('hget',KEYS[1],'next_free_micros');
if(next_free_micros == false) then
next_free_micros = 0;
else
next_free_micros = tonumber(next_free_micros);
end;
local time = redis.call('time');
local now_micros = tonumber(time[1])*1000000 + tonumber(time[2]);
--[[
try aquire
--]]
if(ARGV[3] ~= nil) then
local micros_to_wait = next_free_micros - now_micros;
if(micros_to_wait > tonumber(ARGV[3])) then
return micros_to_wait;
end
end
local stored_permits = redis.call('hget',KEYS[1],'stored_permits');
if(stored_permits == false) then
stored_permits = 0;
else
stored_permits = tonumber(stored_permits);
end
local stable_interval_micros = 1000000/tonumber(ARGV[1]);
local max_stored_permits = tonumber(ARGV[1]);
if(now_micros > next_free_micros) then
local new_stored_permits = stored_permits + (now_micros - next_free_micros) / stable_interval_micros;
if(max_stored_permits < new_stored_permits) then
stored_permits = max_stored_permits;
else
stored_permits = new_stored_permits;
end
next_free_micros = now_micros;
end
local moment_available = next_free_micros;
local stored_permits_to_spend = 0;
if(stored_permits < required_permits) then
stored_permits_to_spend = stored_permits;
else
stored_permits_to_spend = required_permits;
end
local fresh_permits = required_permits - stored_permits_to_spend;
local wait_micros = fresh_permits * stable_interval_micros;
redis.replicate_commands();
redis.call('hset',KEYS[1],'stored_permits',stored_permits - stored_permits_to_spend);
redis.call('hset',KEYS[1],'next_free_micros',next_free_micros + wait_micros);
redis.call('expire',KEYS[1],10);
return moment_available - now_micros;
`
var (
rlScript *redis.Script
)
func init() {
rlScript = redis.NewScript(1, script)
}
func take(key string, qps, requires int, pool *redis.Pool) (int64, error) {
c := pool.Get()
defer c.Close()
var err error
if err := c.Err(); err != nil {
return 0, err
}
reply, err := rlScript.Do(c, key, qps, requires)
if err != nil {
return 0, err
}
return reply.(int64), nil
}
func NewRedisPool(address, password string) *redis.Pool {
pool := &redis.Pool{
MaxIdle: 50,
IdleTimeout: 240 * time.Second,
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
Dial: func() (redis.Conn, error) {
return dial("tcp", address, password)
},
}
return pool
}
func dial(network, address, password string) (redis.Conn, error) {
c, err := redis.Dial(network, address)
if err != nil {
return nil, err
}
if password != "" {
if _, err := c.Do("AUTH", password); err != nil {
c.Close()
return nil, err
}
}
return c, err
}
func main() {
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile `file`")
var memprofile = flag.String("memprofile", "", "write memory profile to `file`")
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
}
test()
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
f.Close()
}
}
func test() {
pool := NewRedisPool("127.0.0.1:6379", "")
s1 := NewScheduler(10000, 1000000, func(r interface{}) {
take("xxx", 1000000, 1, pool)
})
s1.Start()
start := time.Now()
for i := 0; i < 100000; i++ {
s1.Enqueue(i)
}
fmt.Println(time.Since(start))
s1.Wait()
fmt.Println(time.Since(start))
}
The problem is at 10000 routines, sometimes the program gets stuck even no command is send to redis (check with "redis-cli monitor"), and my system max open-files is set to 20000.
I did the profiling,a lot of "syscall.Syscall", could someone give any advice? is there something wrong with my scheduler?
At a surface level the only thing that I have questions about is ordering of incrementing wait group and enqueing the work:
I don't think the above will cause much problem in practice with this large of workload, but I think it may be a logical race condition. At lower levels of concurrency and smaller work sizes, it may enqueue a message, conext switch to a goroutine that starts work on that message, THEN the work in the wait group.
Next are you sure
process
method is threadsafe?? I'd assume so based on the redis go documentation , does running withgo run -race
have any output?At some point It's completely reasonable and expected for performance to drop off. I would recommend starting performance tests to see where latency and throughput start to drop off:
maybe a pool of 10, 100, 500, 1000, 2500, 5000, 10000, or whatever makes sense. IMO it looks like there are 3 important variables to tune:
MaxActive
The biggest thing that jumps out is that it looks like redis.Pool is configured to allow an unbounded number of connections:
I would personally try to understand where and when performance starts to drop off with respect to the size of your worker pool. This might make it easier to understand what your program is constrained by.
My test result shows, when the routine number increases, the execution time per routine per take function increases nearly exponentially.
It should be a problem of redis, here is reply from redis library community: