How to correctly use sync.Cond?

2020-05-21 11:22发布

I'm having trouble figuring out how to correctly use sync.Cond. From what I can tell, a race condition exists between locking the Locker and invoking the condition's Wait method. This example adds an artificial delay between the two lines in the main goroutine to simulate the race condition:

package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}

[Run on the Go Playground]

This causes an immediate panic:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208, 0x1)
    /usr/local/go/src/runtime/sema.go:241 +0x2e0
sync.(*Cond).Wait(0x10330200, 0x0)
    /usr/local/go/src/sync/cond.go:63 +0xe0
main.main()
    /tmp/sandbox301865429/main.go:17 +0x1a0

What am I doing wrong? How do I avoid this apparent race condition? Is there a better synchronization construct I should be using?


Edit: I realize I should have better explained the problem I'm trying to solve here. I have a long-running goroutine that downloads a large file and a number of other goroutines that need access to the HTTP headers when they are available. This problem is harder than it sounds.

I can't use channels since only one goroutine would then receive the value. And some of the other goroutines would be trying to retrieve the headers long after they are already available.

The downloader goroutine could simply store the HTTP headers in a variable and use a mutex to safeguard access to them. However, this doesn't provide a way for the other goroutines to "wait" for them to become available.

I had thought that both a sync.Mutex and sync.Cond together could accomplish this goal but it appears that this is not possible.

8条回答
我欲成王,谁敢阻挡
2楼-- · 2020-05-21 12:17

Here's a practical example with two go routines. They start one after another but the second one waits on a condition which is broadcast by the first one before proceeding:

package main

import (
    "sync"
    "fmt"
    "time"
)

func main() {
    lock := sync.Mutex{}
    lock.Lock()

    cond := sync.NewCond(&lock)

    waitGroup := sync.WaitGroup{}
    waitGroup.Add(2)

    go func() {
        defer waitGroup.Done()

        fmt.Println("First go routine has started and waits for 1 second before broadcasting condition")

        time.Sleep(1 * time.Second)

        fmt.Println("First go routine broadcasts condition")

        cond.Broadcast()
    }()

    go func() {
        defer waitGroup.Done()

        fmt.Println("Second go routine has started and is waiting on condition")

        cond.Wait()

        fmt.Println("Second go routine unlocked by condition broadcast")
    }()

    fmt.Println("Main go routine starts waiting")

    waitGroup.Wait()

    fmt.Println("Main go routine ends")
}

Output may vary slightly as the second go routine could start before the first one and viceversa:

Main go routine starts waiting
Second go routine has started and is waiting on condition
First go routine has started and waits for 1 second before broadcasting condition
First go routine broadcasts condition
Second go routine unlocked by condition broadcast
Main go routine ends

https://gist.github.com/fracasula/21565ea1cf0c15726ca38736031edc70

查看更多
一夜七次
3楼-- · 2020-05-21 12:17

In the excellent book "Concurrency in Go" they provide the following easy solution while leveraging the fact that a channel that is closed will release all waiting clients.

package main
import (
    "fmt"
    "time"
)
func main() {
    httpHeaders := []string{}
    headerChan := make(chan interface{})
    var consumerFunc= func(id int, stream <-chan interface{}, funcHeaders *[]string)         
    {
        <-stream
        fmt.Println("Consumer ",id," got headers:", funcHeaders )   
    }
    for i:=0;i<3;i++ {
        go consumerFunc(i, headerChan, &httpHeaders)
    }
    fmt.Println("Getting headers...")
    time.Sleep(2*time.Second)
    httpHeaders=append(httpHeaders, "test1");
    fmt.Println("Publishing headers...")
    close(headerChan )
    time.Sleep(5*time.Second)
}

https://play.golang.org/p/cE3SiKWNRIt

查看更多
登录 后发表回答