I am trying to figure out how to use RethinkDB changefeeds with golang.
My specific question is how to stop a goroutine that listens for changes to the
database. See, for example, the function getData()
below. I run this from a handler
function by calling go getData(c)
. Whenever the database updates, the record is
passed to the channel c
, which is then passed to the handler function and sent to the client
using SSE technology. My question is: when the client disconnects, I know how to stop and exit
the handler function; however the goroutine that is running the getData()
function, keeps running. What can
I do to close it? One solution I can think of based on other answers on stackoverflow is to
send a signal to close the goroutine on another channel and use a select statement to handle this
signal. For example, I can replace
for cur.Next(&rec) {
c <- rec
}
in the function definition below with:
for cur.Next(&rec) {
select {
case <- closesignal:
return
default:
c <- rec
}
}
where, closesignal
is another channel that is given as a third argument to getData()
and
a message is sent on this channel by the handler when the client disconnects.
The problem with this approach is: what if the result of the specific rethinkdb query
never updates. In that case, the for cur.Next(&rec)
loop will not be entered and the closesignal
will not be used.
Would this goroutine then keep running? If so, how do I stop this goroutine?
The getData()
function
func getData(session *r.Session, c chan interface{}) {
var rec interface{}
changesOpts := r.ChangesOpts{
IncludeInitial: true,
}
cur, err := r.DB(DBNAME).Table("test").Changes(changesOpts).Run(session)
if err != nil {
log.Println(err)
return
}
defer cur.Close()
defer func() {
fmt.Println("exiting getData goroutine()...")
}()
for cur.Next(&rec) {
c <- rec
}
}
You should stop the changefeed via context. You can pass your context to the
RunOpts
. This will close your changefeed immediately.You can stop a goroutine that is listening to a changefeed by closing the cursor. For example this code will listen to a changefeed for 10 seconds before closing: