Waiting on a sync.Cond with a timeout
Asked Answered
P

6

13

Is it possible in some easy way to do the equivalent of Java's

wait(long timeMillis)

which waits on a monitor (mutex+cond, roughly) for a specified amount of time, and returns if it is not signalled?

I can't find anything in the docs or googling around on this, and although it's of course possible to play some games with making a WaitGroup and having a timer goroutine pop, that seems tedious/annoying/inefficient to just get this simple functionality (which is by the way directly supported by any underlying systems thread library I've ever encountered)

Edit: Yes we have all read http://www.golang-book.com/10/index.htm as well as https://blog.golang.org/pipelines - again, creating more threads is a "bad" (non-performant) solution to this, and channels are also not well suited to this. Imagine for a use case a typical concurrent server Join() method... (Please do not tell me to invert the control and use a Listener pattern instead. You don't always have the luxury to change the API you are working with...)

Poppycock answered 28/4, 2015 at 15:32 Comment(12)
Worth reading: sync: add WaitTimeout method to CondNaomanaomi
are you using appengine ??Artimas
In particular , on the link Tim Cooper gave, note Ian Lance Taylor's comment of "Condition variables are generally not the right thing to use in Go". Don't try to write Go the way you'd write Java.Niedersachsen
@TimCooper - Interesting, thanks for the link. But what I see there is "this is the wrong way to deal with your use case" and <conspicuous absence of any reasonable alternative>... Both channels and "side-timers" (goroutine /w timer) won't work for the OP in that thread (or me). And they just shut him down and closed his ticket. =/Poppycock
@DaveC - It's not a Java thing, but a fairly standard idiom when doing systems threading / concurrency where you have a mutex/cond pair, as the sync package provides. Again, can you provide an alternative that works and is "Go-like"?Poppycock
@BadZen, read up on concurrency and communication in Go. The preferred Go approach is summarized by "Do not communicate by sharing memory; instead, share memory by communicating."Niedersachsen
@DaveC - I have read that, before post. It's a nice maxim, but how does it apply to my example or the OP's in the link above? Neither is sharing any kind of memory. IPC or inter-task signaling is a very different sort of facility than a condition variable. Yes, either facility can provide the other, but both 'emulations' are non-optimal - the forward direction because (as OP notes) many objects must be created to do it - it is not performant in very tight loops, and the reverse direction because of the principle outlined in your link, which is a general one, not just for Go...Poppycock
By the way, if the actual answer to my question is "that cannot be done, the Go Czars do not approve of your timed condition wait", then that is definitely a valid answer to my question here - don't want to get hung up on SE philosophy, etc, ITT. (Been doing this stuff for decades before Go was invented...)Poppycock
"How does it apply to my example": You didn't give an example! You asked for a specific feature/function without any context whatsoever, so I assumed it was likely (based on the poor level of questions here that contain "how to do <language X's Y> in Go?") the context itself was likely a design that is better done a different way in Go. Without any context as to why you feel you need a condition variable it's impossible to say more.Niedersachsen
@DaveC - refer to Tim's link which contains a full use case, if you want to be concrete. I said "my example", but I meant "that guy in Tim's link's example". Also note my question was not "hey Internet why might I need a condition variable" (Ben-Ari has a good text on concurrent programming and design if you really need to know that answer), but "is there a timed condition wait in Go that works like Java's monitor wait, or an easy way to accomplish the same?" Please stay OT.Poppycock
And, let me additionally draw your attention to: talks.golang.org/2012/concurrency.slide#54Poppycock
How tight is your loop ?Telemachus
R
3

Since Go 1.21 there is a way to do it with new context.AfterFunc. There is even example with this:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func main() {
    waitOnCond := func(ctx context.Context, cond *sync.Cond, conditionMet func() bool) error {
        stopf := context.AfterFunc(ctx, func() {
            // We need to acquire cond.L here to be sure that the Broadcast
            // below won't occur before the call to Wait, which would result
            // in a missed signal (and deadlock).
            cond.L.Lock()
            defer cond.L.Unlock()

            // If multiple goroutines are waiting on cond simultaneously,
            // we need to make sure we wake up exactly this one.
            // That means that we need to Broadcast to all of the goroutines,
            // which will wake them all up.
            //
            // If there are N concurrent calls to waitOnCond, each of the goroutines
            // will spuriously wake up O(N) other goroutines that aren't ready yet,
            // so this will cause the overall CPU cost to be O(N²).
            cond.Broadcast()
        })
        defer stopf()

        // Since the wakeups are using Broadcast instead of Signal, this call to
        // Wait may unblock due to some other goroutine's context becoming done,
        // so to be sure that ctx is actually done we need to check it in a loop.
        for !conditionMet() {
            cond.Wait()
            if ctx.Err() != nil {
                return ctx.Err()
            }
        }

        return nil
    }

    cond := sync.NewCond(new(sync.Mutex))

    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
            defer cancel()

            cond.L.Lock()
            defer cond.L.Unlock()

            err := waitOnCond(ctx, cond, func() bool { return false })
            fmt.Println(err)
        }()
    }
    wg.Wait()
}
Reflexive answered 19/11, 2023 at 18:40 Comment(0)
W
11

You can implement a condition variable that supports Broadcast only (no Signal), with a channel. Here is a quick Gist of it: https://gist.github.com/zviadm/c234426882bfc8acba88f3503edaaa36#file-cond2-go

You can also just utilize this technique of replacing a channel and closing the old one within your code. The code in Gist is using unsafe.Pointer and atomic operations to allow calling 'Broadcast' without acquiring the main sync.Locker. However in your own code, more often than not, you should be Broadcasting from within the acquire lock anyways so you don't need to do any of the unsafe/atomic stuff.

While this method works, you might want to also checkout: https://godoc.org/golang.org/x/sync/semaphore. If you make a weighted semaphore with a limit of 1, that will also give you all the abilities you need and it will also be fair.

Whittier answered 12/1, 2018 at 3:14 Comment(2)
Can you explain the last bit on using weighted semaphore?Inerney
In particular, how do you keep track of how many waiters to release?Inerney
R
3

Since Go 1.21 there is a way to do it with new context.AfterFunc. There is even example with this:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func main() {
    waitOnCond := func(ctx context.Context, cond *sync.Cond, conditionMet func() bool) error {
        stopf := context.AfterFunc(ctx, func() {
            // We need to acquire cond.L here to be sure that the Broadcast
            // below won't occur before the call to Wait, which would result
            // in a missed signal (and deadlock).
            cond.L.Lock()
            defer cond.L.Unlock()

            // If multiple goroutines are waiting on cond simultaneously,
            // we need to make sure we wake up exactly this one.
            // That means that we need to Broadcast to all of the goroutines,
            // which will wake them all up.
            //
            // If there are N concurrent calls to waitOnCond, each of the goroutines
            // will spuriously wake up O(N) other goroutines that aren't ready yet,
            // so this will cause the overall CPU cost to be O(N²).
            cond.Broadcast()
        })
        defer stopf()

        // Since the wakeups are using Broadcast instead of Signal, this call to
        // Wait may unblock due to some other goroutine's context becoming done,
        // so to be sure that ctx is actually done we need to check it in a loop.
        for !conditionMet() {
            cond.Wait()
            if ctx.Err() != nil {
                return ctx.Err()
            }
        }

        return nil
    }

    cond := sync.NewCond(new(sync.Mutex))

    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
            defer cancel()

            cond.L.Lock()
            defer cond.L.Unlock()

            err := waitOnCond(ctx, cond, func() bool { return false })
            fmt.Println(err)
        }()
    }
    wg.Wait()
}
Reflexive answered 19/11, 2023 at 18:40 Comment(0)
D
2

No. There is no easy way to do this and based on that thread they aren't going to add one. (though perhaps discussing it with them may get you somewhere)

But there's always a hard way. 2 options:

  1. Roll your own Cond that has this capability. (see https://golang.org/src/sync/cond.go)
  2. Use OS-level capabilities via a syscall. (maybe a futex?)

The challenge here - and the reason why it's not trivial - is that goroutines aren't threads. Go has it's own custom scheduler. Creating your own Cond will involve tinkering with parts of runtime that aren't really meant to be tinkered with. (but, like I said, it's possible)

Sorry if that's limiting. Most of go is pretty straightforward - you can often jump down into the lower layer without too much trouble. But the scheduler is not like that. It's magic.

The magic works for most things, and they added the stuff in sync to cover some known cases where it doesn't. If you feel like you found another, maybe you can convince them to add it. (but it's not just a matter of reproducing an API from another programming language, or exposing an underlying API)

Donnydonnybrook answered 1/5, 2015 at 5:30 Comment(4)
Caleb - The same trick I used to work around the blocking tcp Accept() call in this question: #29948997 will absolutely work for condition variables as well (the listener close gets replaced with a condition signal to cycle the blocking goroutine). I'll write up a code example later to answer this question if no one else does by the time I get to it. (I guess that's "roll your own Cond", though, on second read!)Poppycock
(I'm actually seeing the need for a package that wraps all the "traditional" blocking functions in the API that aren't properly selectable in channels as futures...)Poppycock
You said in your question that we couldn't use a goroutine or channel. You also didn't provide an example to work with.Donnydonnybrook
Yeah, you're right. I actually wasn't aware of non-blocking select syntax / default when I wrote that, which seems much less "heavy" - and also not as desperate to just get some solutions together as I am now...Poppycock
K
2

https://gitlab.com/jonas.jasas/condchan makes it possible to timeout on wait. Please see an example:

package main

import (
    "fmt"
    "sync"
    "time"
    "gitlab.com/jonas.jasas/condchan"
)

func main() {
    cc := condchan.New(&sync.Mutex{})
    timeoutChan := time.After(time.Second)

    cc.L.Lock()
    // Passing func that gets channel c that signals when
    // Signal or Broadcast is called on CondChan
    cc.Select(func(c <-chan struct{}) { // Waiting with select
        select {
        case <-c: // Never ending wait
        case <-timeoutChan:
            fmt.Println("Hooray! Just escaped from eternal wait.")
        }
    })
    cc.L.Unlock()
}
Kuhlmann answered 28/1, 2019 at 16:34 Comment(0)
H
1

I sketched out a couple of possible alternatives in my GopherCon talk this year (see https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view). The “Condition Variables” section starts at slide 37, but this particular pattern is covered in more detail in the backup slides (101-105).

As zviadm notes, one option (https://play.golang.org/p/tWVvXOs87HX) is to close a channel.

Another option (https://play.golang.org/p/uRwV_i0v13T) is to have each waiter allocate a 1-buffered channel, and have the broadcaster send a token into the buffer to broadcast.

If the event is a persistent condition, such as “the queue is empty”, a third option (https://play.golang.org/p/uvx8vFSQ2f0) is to use a 1-buffered channel and have each receiver refill the buffer as long as the condition persists.

Hereford answered 6/9, 2018 at 15:50 Comment(0)
I
-2

I ran into the same issue and it turned out to be pretty easy to solve using a channel.

  • A signal is a send on a channel
  • A wait is simply waiting for a message on a channel.
  • A wait with timeout is just a select on a timer and the message.
  • A broadcast is a loop sending messages until there's no one left who listens.

As with any condition variable, it's required to hold the mutex when you wait and highly recommended to hold it when you're signaling.

I wrote a in implementation that follows the Cond protocol and adds a WaitOrTimeout. It returns true if successful, false if timed out.

Here's my code along with some test cases! DISCLAIMER: This seems to work fine but hasn't been thoroughly tested. Also, fairness is not guaranteed. Threads waiting are released in the order the scheduler sees fit and not necessarily first come/first serve.

https://play.golang.org/p/K1veAOGbWZ

package main

import (
    "sync"
    "time"
    "fmt"
)

type TMOCond struct {
    L    sync.Locker
    ch      chan bool
}

func NewTMOCond(l sync.Locker) *TMOCond {
    return &TMOCond{ ch: make(chan bool), L: l }
}

func (t *TMOCond) Wait() {
    t.L.Unlock()
    <-t.ch
    t.L.Lock()
}

func (t *TMOCond) WaitOrTimeout(d time.Duration) bool {
    tmo := time.NewTimer(d)
    t.L.Unlock()
    var r bool
    select {
    case <-tmo.C:
    r = false
    case <-t.ch:
        r = true
    }
    if !tmo.Stop() {
        select {
        case <- tmo.C:
        default:
        }
    }
    t.L.Lock()
    return r
}

func (t *TMOCond) Signal() {
    t.signal()
}

func (t *TMOCond) Broadcast() {
    for {
        // Stop when we run out of waiters
        //
        if !t.signal() {
            return
        }
    }
}

func (t *TMOCond) signal() bool {
    select {
    case t.ch <- true:
        return true
    default:
        return false
    }
}

// **** TEST CASES ****
func lockAndSignal(t *TMOCond) {
    t.L.Lock()
    t.Signal()
    t.L.Unlock()
}

func waitAndPrint(t *TMOCond, i int) {
    t.L.Lock()
    fmt.Println("Goroutine", i, "waiting...")
    ok := t.WaitOrTimeout(10 * time.Second)
    t.L.Unlock()
    fmt.Println("This is goroutine", i, "ok:", ok)
}

func main() {
    var m sync.Mutex
    t := NewTMOCond(&m)

    // Simple wait
    //
    t.L.Lock()
    go lockAndSignal(t)
    t.Wait()
    t.L.Unlock()
    fmt.Println("Simple wait finished.")

    // Wait that times out
    //
    t.L.Lock()
    ok := t.WaitOrTimeout(100 * time.Millisecond)
    t.L.Unlock()
    fmt.Println("Timeout wait finished. Timeout:", !ok)


    // Broadcast. All threads should finish.
    //
    for i := 0; i < 10; i++ {
        go waitAndPrint(t, i)
    }
    time.Sleep(1 * time.Second) 
    t.L.Lock()
    fmt.Println("About to signal")
    t.Broadcast()
    t.L.Unlock()
    time.Sleep(10 * time.Second)
}
Inane answered 18/1, 2017 at 23:51 Comment(1)
This code unlocks the lock before listening on the channel. This means that the signal may never come through since the Signal may happen after WaitOrTimeout does the unlock but before it pulls from the channel, and then the default case on the signal may happen, returning false.Philander

© 2022 - 2024 — McMap. All rights reserved.