Best way to implement global counters for highly c

2020-02-16 06:59发布

What is the best way to implement global counters for a highly concurrent application? In my case I may have 10K-20K go routines performing "work", and I want to count the number and types of items that the routines are working on collectively...

The "classic" synchronous coding style would look like:

var work_counter int

func GoWorkerRoutine() {
    for {
        // do work
        atomic.AddInt32(&work_counter,1)
    }    
}

Now this gets more complicated because I want to track the "type" of work being done, so really I'd need something like this:

var work_counter map[string]int
var work_mux sync.Mutex

func GoWorkerRoutine() {
    for {
        // do work
        work_mux.Lock()
        work_counter["type1"]++
        work_mux.Unlock()
    }    
}

It seems like there should be a "go" optimized way using channels or something similar to this:

var work_counter int
var work_chan chan int // make() called somewhere else (buffered)

// started somewher else
func GoCounterRoutine() {
    for {
        select {
            case c := <- work_chan:
                work_counter += c
                break
        }
    }
}

func GoWorkerRoutine() {
    for {
        // do work
        work_chan <- 1
    }    
}

This last example is still missing the map, but that's easy enough to add. Will this style provide better performance than just a simple atomic increment? I can't tell if this is more or less complicated when we're talking about concurrent access to a global value versus something that may block on I/O to complete...

Thoughts are appreciated.

Update 5/28/2013:

I tested a couple implementations, and the results were not what I expected, here's my counter source code:

package helpers

import (
)

type CounterIncrementStruct struct {
    bucket string
    value int
}

type CounterQueryStruct struct {
    bucket string
    channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
    counter = make(map[string]int)
    counterIncrementChan = make(chan CounterIncrementStruct,0)
    counterQueryChan = make(chan CounterQueryStruct,100)
    counterListChan = make(chan chan map[string]int,100)
    go goCounterWriter()
}

func goCounterWriter() {
    for {
        select {
            case ci := <- counterIncrementChan:
                if len(ci.bucket)==0 { return }
                counter[ci.bucket]+=ci.value
                break
            case cq := <- counterQueryChan:
                val,found:=counter[cq.bucket]
                if found {
                    cq.channel <- val
                } else {
                    cq.channel <- -1    
                }
                break
            case cl := <- counterListChan:
                nm := make(map[string]int)
                for k, v := range counter {
                    nm[k] = v
                }
                cl <- nm
                break
        }
    }
}

func CounterIncrement(bucket string, counter int) {
    if len(bucket)==0 || counter==0 { return }
    counterIncrementChan <- CounterIncrementStruct{bucket,counter}
}

func CounterQuery(bucket string) int {
    if len(bucket)==0 { return -1 }
    reply := make(chan int)
    counterQueryChan <- CounterQueryStruct{bucket,reply}
    return <- reply
}

func CounterList() map[string]int {
    reply := make(chan map[string]int)
    counterListChan <- reply
    return <- reply
}

It uses channels for both writes and reads which seems logical.

Here are my test cases:

func bcRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkChannels(b *testing.B) {
    b.StopTimer()
    CounterInitialize()
    e:=make(chan bool)
    b.StartTimer()

    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
    mux.Lock()
    m[bucket]+=value
    mux.Unlock()
}

func bmRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkMutex(b *testing.B) {
    b.StopTimer()
    m=make(map[string]int)
    e:=make(chan bool)
    b.StartTimer()

    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }

    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

I implemented a simple benchmark with just a mutex around the map (just testing writes), and benchmarked both with 5 goroutines running in parallel. Here are the results:

$ go test --bench=. helpers
PASS
BenchmarkChannels         100000             15560 ns/op
BenchmarkMutex   1000000              2669 ns/op
ok      helpers 4.452s

I would not have expected the mutex to be that much faster...

Further thoughts?

7条回答
聊天终结者
2楼-- · 2020-02-16 07:09

If you're trying to synchronize a pool of workers (e.g. allow n goroutines to crunch away at some amount of work) then channels are a very good way to go about it, but if all you actually need is a counter (e.g page views) then they are overkill. The sync and sync/atomic packages are there to help.

import "sync/atomic"

type count32 int32

func (c *count32) increment() int32 {
    return atomic.AddInt32((*int32)(c), 1)
}

func (c *count32) get() int32 {
    return atomic.LoadInt32((*int32)(c))
}

Go Playground Example

查看更多
不美不萌又怎样
3楼-- · 2020-02-16 07:21

Don't be afraid of using mutexes and locks just because you think they're "not proper Go". In your second example it's absolutely clear what's going on, and that counts for a lot. You will have to try it yourself to see how contented that mutex is, and whether adding complication will increase performance.

If you do need increased performance, perhaps sharding is the best way to go: http://play.golang.org/p/uLirjskGeN

The downside is that your counts will only be as up-to-date as your sharding decides. There may also be performance hits from calling time.Since() so much, but, as always, measure it first :)

查看更多
该账号已被封号
4楼-- · 2020-02-16 07:21

The other answer using sync/atomic is suited for things like page counters, but not for submitting unique identifiers to an external API. To do that, you need an "increment-and-return" operation, which can only be implemented as a CAS loop.

Here's a CAS loop around an int32 to generate unique message IDs:

import "sync/atomic"

type UniqueID struct {
    counter int32
}

func (c *UniqueID) Get() int32 {
    for {
        val := atomic.LoadInt32(&c.counter)
        if atomic.CompareAndSwapInt32(&c.counter, val, val+1) {
            return val
        }
    }
}

To use it, simply do:

requestID := client.msgID.Get()
form.Set("id", requestID)

This has an advantage over channels in that it doesn't require as many extra idle resources - existing goroutines are used as they ask for IDs rather than using one goroutine for every counter your program needs.

TODO: Benchmark against channels. I'm going to guess that channels are worse in the no-contention case and better in the high-contention case, as they have queuing while this code simply spins attempting to win the race.

查看更多
乱世女痞
5楼-- · 2020-02-16 07:31

Old question but I just stumbled upon this and it may help: https://github.com/uber-go/atomic

Basically the engineers at Uber has built a few nice util functions on top of the sync/atomic package

I haven't tested this in production yet but the codebase is very small and the implementation of most functions is quite stock standard

Definitely preferred over using channels or basic mutexes

查看更多
祖国的老花朵
6楼-- · 2020-02-16 07:35

Don't use sync/atomic - from the linked page

Package atomic provides low-level atomic memory primitives useful for implementing synchronization algorithms. These functions require great care to be used correctly. Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package

Last time I had to do this I benchmarked something which looked like your second example with a mutex and something which looked like your third example with a channel. The channels code won when things got really busy, but make sure you make the channel buffer big.

查看更多
唯我独甜
7楼-- · 2020-02-16 07:35

The last one was close:

package main

import "fmt"

func main() {
    ch := make(chan int, 3)
    go GoCounterRoutine(ch)
    go GoWorkerRoutine(1, ch)
    // not run as goroutine because mein() would just end
    GoWorkerRoutine(2, ch)

}

// started somewhere else
func GoCounterRoutine(ch chan int) {
    counter := 0
    for {
        ch <- counter
        counter += 1
    }
}

func GoWorkerRoutine(n int, ch chan int) {
    var seq int
    for seq := range ch {
        // do work:
        fmt.Println(n, seq)
    }
}

This introduces a single point of failure: if the counter goroutine dies, everything is lost. This may not be a problem if all goroutine are executed on one computer, but may become a problem if they are scattered over the network. To make the counter immune to failures of single nodes in the cluster, special algorithms have to be used.

查看更多
登录 后发表回答