How to wait until buffered channel (semaphore) is empty?
Asked Answered
F

5

19

I have a slice of integers, which are manipulated concurrently:

ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

I'm using a buffered channel as semaphore in order to have an upper bound of concurrently running go routines:

sem := make(chan struct{}, 2)

for _, i := range ints {
  // acquire semaphore
  sem <- struct{}{}

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    <- sem
  }(i, sem)
}

The code above works pretty well until the last or last two integers are reached, because the program ends before those last go routines are finished.

Question: how do I wait for the buffered channel to drain?

Flashbulb answered 29/9, 2016 at 17:25 Comment(1)
You gotta use a mutex or something. The buffered channel blocks when it's full, but there is no language feature to block until it's empty.Orban
M
26

You can't use a semaphore (channel in this case) in that manner. There's no guarantee it won't be empty any point while you are processing values and dispatching more goroutines. That's not a concern in this case specifically since you're dispatching work synchronously, but because there's no race-free way to check a channel's length, there's no primitive to wait for a channel's length to reach 0.

Use a sync.WaitGroup to wait for all goroutines to complete

sem := make(chan struct{}, 2)

var wg sync.WaitGroup

for _, i := range ints {
    wg.Add(1)
    // acquire semaphore
    sem <- struct{}{}
    // start long running go routine
    go func(id int) {
        defer wg.Done()
        // do something
        // release semaphore
        <-sem
    }(i)
}

wg.Wait()
Mongrel answered 29/9, 2016 at 17:29 Comment(1)
Thanks, I also thought about using a WaitGroup. This feels like the right way!Flashbulb
J
5

Use "worker pool" to process you data. It is cheeper than run goroutine for each int, allocate memory for variables inside it and so on...

ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

ch := make(chan int)

var wg sync.WaitGroup

// run worker pool
for i := 2; i > 0; i-- {
    wg.Add(1)

    go func() {
        defer wg.Done()

        for id := range ch {
            // do something
            fmt.Println(id)
        }
    }()
}

// send ints to workers
for _, i := range ints {
    ch <- i
}

close(ch)

wg.Wait()
Jitterbug answered 6/5, 2017 at 8:20 Comment(0)
S
0

Clearly there is no one waiting for your go-routines to complete. Thus the program ends before the last 2 go-routines are completed. You may use a workgroup to wait for all your go-routines complete before the program ends. This tells it better - https://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/

Sitdown answered 29/9, 2016 at 17:35 Comment(1)
Thank you, I was looking for how to avoid that, and a WorkGroup works perfectly. JimB's answers clarified thisFlashbulb
C
0

You can wait your "sub-goroutines" with the current goroutine in a for loop

semLimit := 2
sem := make(chan struct{}, semLimit)

for _, i := range ints {
  // acquire semaphore
  sem <- struct{}{}

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    <- sem
  }(i, sem)
}

// wait semaphore
for i := 0; i < semLimit; i++ { 
  wg<-struct{}{} 
}

Optionnaly it is also possible to program a minimalistic "semaphored waitgroup" with the economy of import sync

semLimit := 2
// mini semaphored waitgroup 
wg := make(chan struct{}, semLimit)
// mini methods
wgAdd := func(){ wg<-struct{}{} }
wgDone := func(){ <-wg }
wgWait := func(){ for i := 0; i < semLimit; i++ { wgAdd() } }

for _, i := range ints {
  // acquire semaphore
  wgAdd()

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    wgDone()
  }(i, sem)
}

// wait semaphore
wgWait()
Comminate answered 30/12, 2019 at 9:18 Comment(0)
R
-1

Here is a working example. The for loop at the end forces the program to wait until the jobs are done:

package main
import "time"

func w(n int, e chan error) {
   // do something
   println(n)
   time.Sleep(time.Second)
   // release semaphore
   <-e
}

func main() {
   a := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
   e := make(chan error, 2)
   for _, n := range a {
      // acquire semaphore
      e <- nil
      // start long running go routine
      go w(n, e)
   }
   for n := cap(e); n > 0; n-- {
      e <- nil
   }
}
Renata answered 18/4, 2021 at 2:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.