periodically flushing channel in golang

2020-08-17 18:26发布

I need to periodically flush contents of a channel. I did this with len() and I am wondering if there is some better way to do this.

http://play.golang.org/p/YzaI_2c_-F

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    commch := make(chan int, 100)
    go fillchan(commch)
    drainchan(commch)
}

func fillchan(commch chan int) {
    for {
        select {
        case <-time.Tick(30 * time.Millisecond):
            commch <- rand.Int()
        }
    }
}

func drainchan(commch chan int) {
    for {
        chanlen := len(commch) // get number of entries in channel
        time.Sleep(1 * time.Second)
        for i := 0; i <= chanlen; i++ { //flush them based on chanlen
            fmt.Printf("chan len: %s num: %s\n", chanlen, <-commch)
        }
    }
}

EDIT 1: seems like this is better way to do this http://play.golang.org/p/4Kp8VwO4yl

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    commch := make(chan int, 1000)
    go fillchan(commch)
    for {
        select {
        case <-time.Tick(1000 * time.Millisecond):
            drainchan(commch)
        }
    }
}

func fillchan(commch chan int) {
    for {
        select {
        case <-time.Tick(300 * time.Millisecond):
            commch <- rand.Int()
        }
    }
}

func drainchan(commch chan int) {
    for {
        select {
        case e := <-commch:
            fmt.Printf("%s\n",e)
        default:
            return
        }
    }
}

EDIT 2: removed select, prevented memory leak with time.Tick http://play.golang.org/p/WybAhRE3u4

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    commch := make(chan int, 1000)
    go fillchan(commch)
    for _ = range time.Tick(1000 * time.Millisecond) {
        drainchan(commch)
    }
}

func fillchan(commch chan int) {
    for _ = range time.Tick(300 * time.Millisecond) {
        commch <- rand.Int()
    }
}

func drainchan(commch chan int) {
    for {
        select {
        case e := <-commch:
            fmt.Printf("%s\n", e)
        default:
            return
        }
    }
}

标签: go
3条回答
Bombasti
2楼-- · 2020-08-17 18:41

When I come across this issue, this is how I handle it.

func (s *ReadStream) drain() {
    go func() {
        for b := range s.chan {
            blackhole(b)
        }
    }()
}

func blackhole(b []byte) {}

In situations where your select may be blocked context.Context seems like the wrong choice:

for {
    select {
    case <-ctx.Done():
        return
    default:
        send<-getData()
    }
}

If send is full, we are at the mercy of an outside goroutine before we can receive the done signal. This is ok if you're sure the consumers are going to read until the channel closes, but if those consumers can experience error conditions and return, hope is all you can do. In this specific situation, I'm a fan of replacing the context with an internal quit chan and waitgroup, and then provide a public Kill() method. As long as I'm absolutely sure I'm ok throwing away the data, of course.

func (s *ReadStream) Kill() {
    s.quit<-struct{}{}
    s.drain()   // ensure goroutine sees the cancel
    s.wg.Wait() // wait for goroutine to see the cancel
    s.close()
}
查看更多
神经病院院长
3楼-- · 2020-08-17 18:44

The BatchingChannel from https://github.com/eapache/channels does what you need I think.

查看更多
Explosion°爆炸
4楼-- · 2020-08-17 18:56

The need to flush away the contents of a channel would be unusual. Channels don't provide this feature - but you can make a goroutine that will behave that way (...if you really do want to).

Typically, you would be thinking more about a goroutine that inputs on one channel and outputs on another; both channels carry the same data type. You could in principle model all buffered channels this way; to its clients, the goroutine behaves like an ordinary buffered channel because it passes on what it receives.

Add a third channel into the goroutine, combined with a select between it and the input. This will allow you to trigger the emptying of the buffer without race conditions creeping in. Simple.

Now there are three channels connected to the goroutine - two inputs and an output. So, when you design the things that will use it, you can reason about what the semantics of flushing that data are.

A relative springs to mind. Consider a goroutine with one input and one output channel. It provides an overwriting buffer of fixed size, i.e. one that is always ready to read from its input channel, even when the output channel is blocked. This will also need a select with a default case, but no third channel is needed. Overwriting buffers have a clear use case: when channels and goroutines are wired into loops, deadlock can be quite likely. Overwriting buffers come in handy as one candidate solution for deadlocks because some data is useless when it's late - for example, you could for example throw away mice events in a GUI when the application is too busy to respond to them.

查看更多
登录 后发表回答