Reconnect TCP on EOF in Go

2019-05-01 11:34发布

问题:

I have the following:

    //In an init func
    if logStashHost != "" {
        lsconn, err = net.Dial("tcp", logStashHost)
    }
    ...
    ToLogStash(rec, lsconn)

Then Two functions:

func ReadLogStash(conn net.Conn) {
    buffer := make([]byte, 256)
    for {
        _, err := conn.Read(buffer)
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(buffer)
        }
    }
}

func ToLogStash(r *logrow.Record, conn net.Conn) {
    b, err := json.Marshal(r)
    if err != nil {
        fmt.Println(err)
        return
    }
    _, err = fmt.Fprintln(conn, string(b))
    if err != nil {
        fmt.Println(err)
    }
}

Where ReadLogStash is a running goroutine. If the other side closes, I get EOF. What would be a good implementation in ReadLogStash to have it attempt to reestablish the connection every X seconds when it gets an EOF?

回答1:

Go has channels for synchronization and communication, use them!

Make your connection in a loop, and have it wait for some sort of message to come back on a channel.

...
errCh := make(chan error)
for {
    lsconn, err = net.Dial("tcp", logStashHost)
    // check error!
    go ReadLogStash(lsconn, errCh)
    err = <-errCh
    if err != nil {
        // bad error
        break
    }
    // sleep to backoff on retries?
}
...

func ReadLogStash(conn net.Conn, errCh chan error) {
    _, err := io.Copy(os.Stderr, conn)
    if err != nil {
        fmt.Println(err)
    }
    // a nil error from io.Copy means you reached EOF.
    errCh <- err
}

Unless you have more functionality in ReadLogStash, you can probably just use io.Copy inline, and forget the entire function, but this pattern may come in useful for you anyway.



回答2:

Here is what I ended up going with, a channel was the right direction:

if logStashHost != "" {
    lsc = make(chan *logrow.Record)
    go ToLogStash(lsc, logStashHost)
}
...
if lsc != nil {
   lsc <- rec
}
...
func ToLogStash(c chan *logrow.Record, logStashHost string) {
    var lsconn net.Conn
    var enc *json.Encoder
    var err error
    connect := func() {
        for {
            lsconn, err = net.Dial("tcp", logStashHost)
            if err == nil {
                enc = json.NewEncoder(lsconn)
                break
            }
            log.Println(err)
            time.Sleep(time.Second)
        }
    }
    connect()
    for r := range c {
        err = enc.Encode(r)
        if err != nil {
            lsconn.Close()
            log.Println(err)
            connect()
        }
    }
}


标签: tcp go