How to correctly use sync.Cond?
Asked Answered
D

9

42

I'm having trouble figuring out how to correctly use sync.Cond. From what I can tell, a race condition exists between locking the Locker and invoking the condition's Wait method. This example adds an artificial delay between the two lines in the main goroutine to simulate the race condition:

package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}

[Run on the Go Playground]

This causes an immediate panic:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208, 0x1)
    /usr/local/go/src/runtime/sema.go:241 +0x2e0
sync.(*Cond).Wait(0x10330200, 0x0)
    /usr/local/go/src/sync/cond.go:63 +0xe0
main.main()
    /tmp/sandbox301865429/main.go:17 +0x1a0

What am I doing wrong? How do I avoid this apparent race condition? Is there a better synchronization construct I should be using?


Edit: I realize I should have better explained the problem I'm trying to solve here. I have a long-running goroutine that downloads a large file and a number of other goroutines that need access to the HTTP headers when they are available. This problem is harder than it sounds.

I can't use channels since only one goroutine would then receive the value. And some of the other goroutines would be trying to retrieve the headers long after they are already available.

The downloader goroutine could simply store the HTTP headers in a variable and use a mutex to safeguard access to them. However, this doesn't provide a way for the other goroutines to "wait" for them to become available.

I had thought that both a sync.Mutex and sync.Cond together could accomplish this goal but it appears that this is not possible.

Disgraceful answered 26/4, 2016 at 6:43 Comment(0)
D
1

I finally discovered a way to do this and it doesn't involve sync.Cond at all - just the mutex.

type Task struct {
    m       sync.Mutex
    headers http.Header
}

func NewTask() *Task {
    t := &Task{}
    t.m.Lock()
    go func() {
        defer t.m.Unlock()
        // ...do stuff...
    }()
    return t
}

func (t *Task) WaitFor() http.Header {
    t.m.Lock()
    defer t.m.Unlock()
    return t.headers
}

How does this work?

The mutex is locked at the beginning of the task, ensuring that anything calling WaitFor() will block. Once the headers are available and the mutex unlocked by the goroutine, each call to WaitFor() will execute one at a time. All future calls (even after the goroutine ends) will have no problem locking the mutex, since it will always be left unlocked.

Disgraceful answered 6/5, 2016 at 4:37 Comment(1)
For such purpose consider using sync.RWMutex instead.Bacolod
F
50

OP answered his own, but did not directly answer the original question, I am going to post how to correctly use sync.Cond.

You do not really need sync.Cond if you have one goroutine for each write and read - a single sync.Mutex would suffice to communicate between them. sync.Cond could useful in situations where multiple readers wait for the shared resources to be available.

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()

    // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}

Playground

Having said that, using channels is still the recommended way to pass data around if the situation permitting.

Note: sync.WaitGroup here is only used to wait for the goroutines to complete their executions.

Fresnel answered 13/3, 2017 at 20:25 Comment(5)
not sure if those for loops are really needed. I know the documentation suggests something like that but, it seems unnecessary. An if should suffice since it change sharedRsc in the possession of the lock and as stated by the documentation, the Wait only resumes after a call to Broadcast or SignalFaubourg
@Faubourg In this example, sure. In general, it's good habit/practice since, in a Broadcast() scenario, the first goroutine wakes, changes state, and then the others resume (with the shared state now altered). I agree with you and are absolutely correct in cases where it is not possible for any one of the waiting goroutines to affect the dependent condition(s).Caesarea
I think your for loops are not correct. It should be for sharedRsc["rsc1"] == nil for the first loop and for sharedRsc["rsc2"] == nil for the second loop. Use this to reproduce: fn := func(key, value string) func() { return func() { c.L.Lock() sharedRsc[key] = value c.Broadcast() c.L.Unlock() } } go fn("rsc1", "foo")() go fn("rsc2", "bar")()Darryldarryn
Isn't the OP doing exactly the same thing in their answer without using Cond? That is, how is this answer is different from e.g. go.dev/play/p/iC5FeWC1M-T ?Fathometer
"sync.Cond could useful in situations where multiple readers wait for the shared resources to be available." - that's exactly what OPs use case is and they seemed to manage without Cond just fine. What am I missing?Fathometer
R
22

You need to make sure that c.Broadcast is called after your call to c.Wait. The correct version of your program would be:

package main

import (
    "fmt"
    "sync"
)

func main() {
    m := &sync.Mutex{}
    c := sync.NewCond(m)
    m.Lock()
    go func() {
        m.Lock() // Wait for c.Wait()
        c.Broadcast()
        m.Unlock()
    }()
    c.Wait() // Unlocks m, waits, then locks m again
    m.Unlock()
}

https://play.golang.org/p/O1r8v8yW6h

Refrigerant answered 17/5, 2017 at 3:56 Comment(1)
C.Wait() locks m again before it returns, so I edited your example to add the m.Unlock() at the end for correctness, otherwise you'll be surprised to find only one waiter ever wakes up.Seabury
E
3
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    m.Lock() // main gouroutine is owner of lock
    c := sync.NewCond(&m)
    go func() {
        m.Lock() // obtain a lock
        defer m.Unlock()
        fmt.Println("3. goroutine is owner of lock")
        time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
        c.Broadcast()               // State has been changed, publish it to waiting goroutines
        fmt.Println("4. goroutine will release lock soon (deffered Unlock")
    }()
    fmt.Println("1. main goroutine is owner of lock")
    time.Sleep(1 * time.Second) // initialization
    fmt.Println("2. main goroutine is still lockek")
    c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
    // Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
    m.Unlock()
    fmt.Println("Done")
}

http://play.golang.org/p/fBBwoL7_pm

Evonneevonymus answered 26/4, 2016 at 8:14 Comment(5)
What if it isn't possible to lock the mutex before launching the goroutine? For example, there may be other goroutines calling Wait().Disgraceful
Than is possible, that when Broadcast is called, no other goroutine will be notified. It also fine - but what we both don't mentioned - usually condition is connected with some state. And Wait means - I cannot continue while system is in this state, wait. And Broadcast means - state is changed, everyone who has been waiting should check whether he can continue. Please describe more precisely what is computed in both goroutines, and why they have to communicate to each other.Evonneevonymus
Sorry, I should have gone into more detail in the original question. I've added an edit that describes precisely what I'm trying to do.Disgraceful
I'm trying to understand this and to me it looks like this introduces potential race condition. c.Wait() releases mutex, then it starts to wait for notification. The thing is, in theory, when c.Wait() releases mutex, but before it adds itself to notification list, goroutine can lock the mutex, and run Broadcast(), after that c.Wait() can add itself to notification list and wait forever for broadcast. Of course in above example it's almost never gonna happen because of time.Sleep before broadcast which gives plenty of time for c.Wait to add itself to notification list before Broadcast.Caretaker
wow, nvm, this got fixed in go 1.7 I was looking at old code: github.com/golang/go/issues/14064Caretaker
C
2

This can be done with channels pretty easily and the code will be clean. Below is the example. Hope this helps!

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    done := make(chan struct{})
    var wg sync.WaitGroup
    // fork required number of goroutines
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            <-done
            fmt.Println("read the http headers from here")
        }()
    }
    time.Sleep(1) //download your large file here
    fmt.Println("Unblocking goroutines...")
    close(done) // this will unblock all the goroutines
    wg.Wait()
}
Claro answered 12/8, 2022 at 15:37 Comment(0)
A
1

Looks like you c.Wait for Broadcast which would never happens with your time intervals. With

time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()

your snippet seems to work http://play.golang.org/p/OE8aP4i6gY .Or am I missing something that you try to achive?

Aesir answered 26/4, 2016 at 10:27 Comment(0)
D
1

I finally discovered a way to do this and it doesn't involve sync.Cond at all - just the mutex.

type Task struct {
    m       sync.Mutex
    headers http.Header
}

func NewTask() *Task {
    t := &Task{}
    t.m.Lock()
    go func() {
        defer t.m.Unlock()
        // ...do stuff...
    }()
    return t
}

func (t *Task) WaitFor() http.Header {
    t.m.Lock()
    defer t.m.Unlock()
    return t.headers
}

How does this work?

The mutex is locked at the beginning of the task, ensuring that anything calling WaitFor() will block. Once the headers are available and the mutex unlocked by the goroutine, each call to WaitFor() will execute one at a time. All future calls (even after the goroutine ends) will have no problem locking the mutex, since it will always be left unlocked.

Disgraceful answered 6/5, 2016 at 4:37 Comment(1)
For such purpose consider using sync.RWMutex instead.Bacolod
K
1

Yes you can use one channel to pass Header to multiple Go routines.

headerChan := make(chan http.Header)

go func() { // This routine can be started many times
    header := <-headerChan  // Wait for header
    // Do things with the header
}()

// Feed the header to all waiting go routines
for more := true; more; {
    select {
    case headerChan <- r.Header:
    default: more = false
    }
}
Kra answered 29/1, 2019 at 10:33 Comment(1)
It's interesting example, but I think in practice it will be hard to make it reliable.Dactyl
C
0

The problem in your code was that your signal was emitted once and a receiving go routine was not ready for it so the signal was missed. You should do broadcasting in a loop.

   package main
    
    import (
        "sync"
        "time"
    )
    
    func main() {
        m := sync.Mutex{}
        c := sync.NewCond(&m)
        go func() {
            time.Sleep(1 * time.Second)
            for range time.Tick(time.Millisecond) {
                c.Broadcast()
            }
        }()
        m.Lock()
        time.Sleep(2 * time.Second)
        c.Wait()
        m.Unlock()
    
        //do stuff
    }
Corpulence answered 21/6, 2023 at 7:7 Comment(0)
O
-1

In the excellent book "Concurrency in Go" they provide the following easy solution while leveraging the fact that a channel that is closed will release all waiting clients.

package main
import (
    "fmt"
    "time"
)
func main() {
    httpHeaders := []string{}
    headerChan := make(chan interface{})
    var consumerFunc= func(id int, stream <-chan interface{}, funcHeaders *[]string)         
    {
        <-stream
        fmt.Println("Consumer ",id," got headers:", funcHeaders )   
    }
    for i:=0;i<3;i++ {
        go consumerFunc(i, headerChan, &httpHeaders)
    }
    fmt.Println("Getting headers...")
    time.Sleep(2*time.Second)
    httpHeaders=append(httpHeaders, "test1");
    fmt.Println("Publishing headers...")
    close(headerChan )
    time.Sleep(5*time.Second)
}

https://play.golang.org/p/cE3SiKWNRIt

Oxen answered 29/12, 2019 at 0:9 Comment(2)
I guess this is a neat example. It doesn't answer the question. I still don't know how to use a condition after looking at this.Baryta
If you read the full question, the question is about how to communicate some event between few goroutines which this example solves. @BarytaOxen

© 2022 - 2024 — McMap. All rights reserved.