I need to periodically flush contents of a channel. I did this with len() and I am wondering if there is some better way to do this.
http://play.golang.org/p/YzaI_2c_-F
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
commch := make(chan int, 100)
go fillchan(commch)
drainchan(commch)
}
func fillchan(commch chan int) {
for {
select {
case <-time.Tick(30 * time.Millisecond):
commch <- rand.Int()
}
}
}
func drainchan(commch chan int) {
for {
chanlen := len(commch) // get number of entries in channel
time.Sleep(1 * time.Second)
for i := 0; i <= chanlen; i++ { //flush them based on chanlen
fmt.Printf("chan len: %s num: %s\n", chanlen, <-commch)
}
}
}
EDIT 1: seems like this is better way to do this http://play.golang.org/p/4Kp8VwO4yl
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
commch := make(chan int, 1000)
go fillchan(commch)
for {
select {
case <-time.Tick(1000 * time.Millisecond):
drainchan(commch)
}
}
}
func fillchan(commch chan int) {
for {
select {
case <-time.Tick(300 * time.Millisecond):
commch <- rand.Int()
}
}
}
func drainchan(commch chan int) {
for {
select {
case e := <-commch:
fmt.Printf("%s\n",e)
default:
return
}
}
}
EDIT 2: removed select, prevented memory leak with time.Tick http://play.golang.org/p/WybAhRE3u4
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
commch := make(chan int, 1000)
go fillchan(commch)
for _ = range time.Tick(1000 * time.Millisecond) {
drainchan(commch)
}
}
func fillchan(commch chan int) {
for _ = range time.Tick(300 * time.Millisecond) {
commch <- rand.Int()
}
}
func drainchan(commch chan int) {
for {
select {
case e := <-commch:
fmt.Printf("%s\n", e)
default:
return
}
}
}
When I come across this issue, this is how I handle it.
In situations where your select may be blocked context.Context seems like the wrong choice:
If send is full, we are at the mercy of an outside goroutine before we can receive the done signal. This is ok if you're sure the consumers are going to read until the channel closes, but if those consumers can experience error conditions and return, hope is all you can do. In this specific situation, I'm a fan of replacing the context with an internal quit chan and waitgroup, and then provide a public Kill() method. As long as I'm absolutely sure I'm ok throwing away the data, of course.
The
BatchingChannel
from https://github.com/eapache/channels does what you need I think.The need to flush away the contents of a channel would be unusual. Channels don't provide this feature - but you can make a goroutine that will behave that way (...if you really do want to).
Typically, you would be thinking more about a goroutine that inputs on one channel and outputs on another; both channels carry the same data type. You could in principle model all buffered channels this way; to its clients, the goroutine behaves like an ordinary buffered channel because it passes on what it receives.
Add a third channel into the goroutine, combined with a select between it and the input. This will allow you to trigger the emptying of the buffer without race conditions creeping in. Simple.
Now there are three channels connected to the goroutine - two inputs and an output. So, when you design the things that will use it, you can reason about what the semantics of flushing that data are.
A relative springs to mind. Consider a goroutine with one input and one output channel. It provides an overwriting buffer of fixed size, i.e. one that is always ready to read from its input channel, even when the output channel is blocked. This will also need a select with a default case, but no third channel is needed. Overwriting buffers have a clear use case: when channels and goroutines are wired into loops, deadlock can be quite likely. Overwriting buffers come in handy as one candidate solution for deadlocks because some data is useless when it's late - for example, you could for example throw away mice events in a GUI when the application is too busy to respond to them.