How to wait for all goroutines to finish without using time.Sleep?
Asked Answered
E

6

178

This code selects all xml files in the same folder, as the invoked executable and asynchronously applies processing to each result in the callback method (in the example below, just the name of the file is printed out).

How do I avoid using the sleep method to keep the main method from exiting? I have problems wrapping my head around channels (I assume that's what it takes, to synchronize the results) so any help is appreciated!

package main

import (
    "fmt"
    "io/ioutil"
    "path"
    "path/filepath"
    "os"
    "runtime"
    "time"
)

func eachFile(extension string, callback func(file string)) {
    exeDir := filepath.Dir(os.Args[0])
    files, _ := ioutil.ReadDir(exeDir)
    for _, f := range files {
            fileName := f.Name()
            if extension == path.Ext(fileName) {
                go callback(fileName)
            }
    }
}


func main() {
    maxProcs := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcs)

    eachFile(".xml", func(fileName string) {
                // Custom logic goes in here
                fmt.Println(fileName)
            })

    // This is what i want to get rid of
    time.Sleep(100 * time.Millisecond)
}
Enschede answered 13/8, 2013 at 11:22 Comment(0)
R
273

You can use sync.WaitGroup. Quoting the linked example:

package main

import (
        "net/http"
        "sync"
)

func main() {
        var wg sync.WaitGroup
        var urls = []string{
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        }
        for _, url := range urls {
                // Increment the WaitGroup counter.
                wg.Add(1)
                // Launch a goroutine to fetch the URL.
                go func(url string) {
                        // Decrement the counter when the goroutine completes.
                        defer wg.Done()
                        // Fetch the URL.
                        http.Get(url)
                }(url)
        }
        // Wait for all HTTP fetches to complete.
        wg.Wait()
}
Rhapsodist answered 13/8, 2013 at 11:25 Comment(6)
Any reason you have to do wg.Add(1) outside the go routine? Can we do it inside just before the defer wg.Done()?Smetana
sat, yes, there's a reason, it's described in sync.WaitGroup.Add docs: Note that calls with positive delta must happen before the call to Wait, or else Wait may wait for too small a group. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. See the WaitGroup example.Bipack
Adapting this code caused me a long debugging session because my goroutine was a named function and passing in the WaitGroup as a value will copy it and make wg.Done() ineffective. While this could be fixed by passing a pointer &wg, a better way to prevent such errors is to declare the WaitGroup variable as a pointer in the first place: wg := new(sync.WaitGroup) instead of var wg sync.WaitGroup.Keratogenous
i guess it is valid to write wg.Add(len(urls)) just above the line for _, url := range urls, i believe is better as you use the Add only once.Maurice
@RobertJackWill: Good note! BTW, this is covered in the docs: "A WaitGroup must not be copied after first use. Too bad Go doesn't have a way of enforcing this. Actually, however, go vet does detect this case and warns with "func passes lock by value: sync.WaitGroup contains sync.noCopy".Abnormality
Please note that if you use WaitGroup in methods you have to pass it via pointer.Jonme
B
89

WaitGroups are definitely the canonical way to do this. Just for the sake of completeness, though, here's the solution that was commonly used before WaitGroups were introduced. The basic idea is to use a channel to say "I'm done," and have the main goroutine wait until each spawned routine has reported its completion.

func main() {
    c := make(chan struct{}) // We don't need any data to be passed, so use an empty struct
    for i := 0; i < 100; i++ {
        go func() {
            doSomething()
            c <- struct{}{} // signal that the routine has completed
        }()
    }

    // Since we spawned 100 routines, receive 100 messages.
    for i := 0; i < 100; i++ {
        <- c
    }
}
Bordelaise answered 13/8, 2013 at 17:7 Comment(7)
Nice to see a solution with plain channels. An added bonus: if doSomething() returns some result, than you can put that on the channel, and you can collect and process the results in the second for loop (as soon as they are ready)Maniemanifest
It only works if you already know the amount of gorutines you would like to start. What if you are writing some kind of html crawler and start gorutines in recursive manner for every link on the page?Gastroscope
You'll need to keep track of this somehow regardless. With WaitGroups it's a bit easier because each time you spawn a new goroutine, you can first do wg.Add(1) and thus it'll keep track of them. With channels it would be somewhat harder.Bordelaise
c will block since all go routines will try to access it, and it's unbufferedBustos
If by "block," you mean that the program will deadlock, that's not true. You can try running it yourself. The reason is that the only goroutines that write to c are different from the main goroutine, which reads from c. Thus, the main goroutine is always available to read a value off the channel, which will happen when one of the goroutines is available to write a value to the channel. You're right that if this code didn't spawn goroutines but instead ran everything in a single goroutine, it would deadlock.Bordelaise
@Gastroscope you could use an int channel and send 1 to the channel before starting each goroutine, and have each goroutine defer sending -1 to the channel. Then have the main loop sum the input on the channel until the sum gets back to 0.Trilateral
"The Go Programing Language" 1.6 Fetching URLs ConcurrentlyDanzig
E
19

sync.WaitGroup can help you here.

package main

import (
    "fmt"
    "sync"
    "time"
)


func wait(seconds int, wg * sync.WaitGroup) {
    defer wg.Done()

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println("Slept ", seconds, " seconds ..")
}


func main() {
    var wg sync.WaitGroup

    for i := 0; i <= 5; i++ {
        wg.Add(1)   
        go wait(i, &wg)
    }
    wg.Wait()
}
Escent answered 29/3, 2018 at 12:5 Comment(0)
S
4

Although sync.waitGroup (wg) is the canonical way forward, it does require you do at least some of your wg.Add calls before you wg.Wait for all to complete. This may not be feasible for simple things like a web crawler, where you don't know the number of recursive calls beforehand and it takes a while to retrieve the data that drives the wg.Add calls. After all, you need to load and parse the first page before you know the size of the first batch of child pages.

I wrote a solution using channels, avoiding waitGroup in my solution the the Tour of Go - web crawler exercise. Each time one or more go-routines are started, you send the number to the children channel. Each time a go routine is about to complete, you send a 1 to the done channel. When the sum of children equals the sum of done, we are done.

My only remaining concern is the hard-coded size of the the results channel, but that is a (current) Go limitation.


// recursionController is a data structure with three channels to control our Crawl recursion.
// Tried to use sync.waitGroup in a previous version, but I was unhappy with the mandatory sleep.
// The idea is to have three channels, counting the outstanding calls (children), completed calls 
// (done) and results (results).  Once outstanding calls == completed calls we are done (if you are
// sufficiently careful to signal any new children before closing your current one, as you may be the last one).
//
type recursionController struct {
    results  chan string
    children chan int
    done     chan int
}

// instead of instantiating one instance, as we did above, use a more idiomatic Go solution
func NewRecursionController() recursionController {
    // we buffer results to 1000, so we cannot crawl more pages than that.  
    return recursionController{make(chan string, 1000), make(chan int), make(chan int)}
}

// recursionController.Add: convenience function to add children to controller (similar to waitGroup)
func (rc recursionController) Add(children int) {
    rc.children <- children
}

// recursionController.Done: convenience function to remove a child from controller (similar to waitGroup)
func (rc recursionController) Done() {
    rc.done <- 1
}

// recursionController.Wait will wait until all children are done
func (rc recursionController) Wait() {
    fmt.Println("Controller waiting...")
    var children, done int
    for {
        select {
        case childrenDelta := <-rc.children:
            children += childrenDelta
            // fmt.Printf("children found %v total %v\n", childrenDelta, children)
        case <-rc.done:
            done += 1
            // fmt.Println("done found", done)
        default:
            if done > 0 && children == done {
                fmt.Printf("Controller exiting, done = %v, children =  %v\n", done, children)
                close(rc.results)
                return
            }
        }
    }
}

Full source code for the solution

Sedulous answered 16/1, 2019 at 9:7 Comment(0)
H
4

Here is a solution that employs WaitGroup.

First, define 2 utility methods:

package util

import (
    "sync"
)

var allNodesWaitGroup sync.WaitGroup

func GoNode(f func()) {
    allNodesWaitGroup.Add(1)
    go func() {
        defer allNodesWaitGroup.Done()
        f()
    }()
}

func WaitForAllNodes() {
    allNodesWaitGroup.Wait()
}

Then, replace the invocation of callback:

go callback(fileName)

With a call to your utility function:

util.GoNode(func() { callback(fileName) })

Last step, add this line at the end of your main, instead of your sleep. This will make sure the main thread is waiting for all routines to finish before the program can stop.

func main() {
  // ...
  util.WaitForAllNodes()
}
Herbivorous answered 10/7, 2019 at 9:10 Comment(0)
G
0

sync.WaitGroup has several disadvantages:

  1. error handling must be implemented manually
  2. goroutine execution results must be collected manually

errgroup.Group solves the problem with error handling:

ts := time.Now()
var urls = []string{
    "http://www.golang.org",
    "http://www.google.com",
    "http://www.invalid.link",
}
wg := new(errgroup.Group)

for _, url := range urls {
    url := url
    wg.Go(func() error {
        resp, err := http.Get(url)
        if err == nil {
            fmt.Println(url, resp.Status, time.Since(ts))
        }
        return err
    })
}
err := wg.Wait()
fmt.Println(err, time.Since(ts))
// http://www.google.com 200 OK 67.23525ms
// http://www.golang.org 200 OK 872.941833ms
// Get "http://www.invalid.link": dial tcp: lookup www.invalid.link: no such host 873.001375ms

Nice, but it doesn't solve the problem with return values.
You may also notice that the error did not return immediately, but only after the parallel goroutines were executed.

However, to conveniently work with parallelism in go it is better to use specialized libraries...

Pipers solves the problems described above and provides some useful methods, such as .Concurrency(n) and .Context(ctx)

import github.com/kozhurkin/pipers

func main() {
    ts := time.Now()
    urls := []string{
        "http://www.golang.org",
        "http://www.google.com",
        "http://stackoverflow.com/404",
    }

    pp := pipers.FromArgs(urls, func(i int, url string) (int, error) {
        resp, err := http.Get(url)
        if err != nil {
            return 0, err
        }
        return resp.StatusCode, nil
    })

    results, err := pp.Resolve()

    fmt.Println(results, err, time.Since(ts))
    // [200 200 404] <nil> 899.973292ms
}
Groth answered 2/4, 2024 at 1:57 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.