Using goroutines to process values and gather results into a slice
Asked Answered
A

3

21

I'm recently exploring Go and how goroutines work confuse me.

I tried to port code I had written before into Go using goroutines but got a fatal error: all goroutines are asleep - deadlock! error.

What I'm trying to do is use goroutines to process items in a list, then gather the processed values into a new list. But I'm having problems in the "gathering" part.

Code:

sampleChan := make(chan sample)
var wg sync.WaitGroup

// Read from contents list
for i, line := range contents {
    wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()

// Read from sampleChan and put into a slice
var sampleList []sample
for s := range sampleChan {
    sampleList = append(sampleList, s)
}
close(sampleChan)

What's the right way to gather results from goroutines?

I know slices are not threadsafe so I can't have each goroutine just append to the slice.

Architecture answered 2/9, 2017 at 5:41 Comment(3)
How is newSample implemented?Asberry
@Asberry check the answer I have explained the answer with an exmple similar to your use caseStratopause
@Asberry newSample reads a string, chops it up, makes type conversions to int, float64... and sends a new struct sample to the channelArchitecture
O
23

Your code is almost correct. There's a couple of problems: first, you're waiting for all the workers to finish before collecting the results, and second your for loop terminates when the channel is closed, but the channel is closed only after the for loop terminates.

You can fix the code by asynchronously closing the channel when the workers are finished:

for i, line := range contents {
    wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}

go func() {
    wg.Wait()
    close(sampleChan)
}()

for s := range sampleChan {
  ..
}

As a note of style (and following https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions), it'd be preferable if newSample was a simple, synchronous function that didn't take the waitgroup and channel, and simply generated its result. Then the worker code would look like:

for i, line := range contents {
    wg.Add(1)
    go func(line string) {
        defer wg.Done()
        sampleChan <- newSample(line, *replicatePtr, *timePtr)
    }(line)
}

This keeps your concurrency primitives all together, which apart from simplifiying newSample and making it easier to test, it allows you to see what's going on with the concurrency, and visually check that wg.Done() is always called. And if you want to refactor the code to for example use a fixed number of workers, then your changes will all be local.

Obstruent answered 2/9, 2017 at 6:36 Comment(5)
Thanks. So the deadlock happens because the Wait() was not asynchronous, and not because the channel was unbuffered? I'm just trying to wrap my head around this so that I can make use of concurrency betterArchitecture
Making the channel large enough also works, although it uses extra storage for the channel.Obstruent
Also just to clarify, how does for s := range sampleChan still work even when after Wait() is triggered? I though that closing an unbuffered channel will flush its contents with it. Thanks for the helpArchitecture
close() is like a special sort of write to the channel -- it doesn't flush the contents.Obstruent
@PaulHankin I didn't know that about closeing a channel. Thanks for the tidbit!Saideman
S
7

There are two problems

  1. Using unbuffered channels: Unbuffered channels block receivers until data is available on the channel and senders until a receiver is available.That caused the error
  2. Not closing the channel before range: As you never close the ch channel, the range loop will never finish.

You have to use a buffered channel and close the channel before range

Code

package main

import (
    "fmt"
    "sync"
)

func double(line int, ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    ch <- line * 2

}

func main() {
    contents := []int{1, 2, 3, 4, 5}
    sampleChan := make(chan int,len(contents))
    var wg sync.WaitGroup
    // Read from contents list
    for _, line := range contents {
        wg.Add(1)
        go double(line, sampleChan, &wg)
    }
    wg.Wait()
    close(sampleChan)
    // Read from sampleChan and put into a slice
    var sampleList []int

    for s := range sampleChan {
        sampleList = append(sampleList, s)
    }

    fmt.Println(sampleList)
}

Play link : https://play.golang.org/p/k03vt3hd3P

EDIT: Another approach for better performance would be to run producer and consumer at concurrently

Modified code

package main

import (
    "fmt"
    "sync"
)

func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
    defer wg.Done()

    defer close(sampleChan)
    var w sync.WaitGroup
    for _, line := range lines {
        w.Add(1)
        go double(&w, line, sampleChan)
    }
    w.Wait()
}

func double(wg *sync.WaitGroup, line int, ch chan int) {
    defer wg.Done()
    ch <- line * 2
}

func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
    defer wg.Done()
    for s := range channel {
        *sampleList = append(*sampleList, s)
    }

}

func main() {
    contents := []int{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
    sampleChan := make(chan int, 1)
    var sampleList []int

    var wg sync.WaitGroup

    wg.Add(1)
    go doubleLines(contents, &wg, sampleChan)
    wg.Add(1)
    go collectResult(&wg, sampleChan, &sampleList)
    wg.Wait()
    fmt.Println(sampleList)
}

play link: https://play.golang.org/p/VAe7Qll3iVM

Stratopause answered 2/9, 2017 at 6:0 Comment(4)
Thanks. This works for my case. But what if the length of the original slice is huge, will there be a problem in creating a huge buffer?Architecture
You can create channels with upto 2147483647 buffer size. But I would rather run the consumer function parallelly with the producers if possibleStratopause
Hi, I just tried your last example (the one with collectResult function) in playground, It seems it is failing the collect the double of last element 19. I am kind of beginner and still trying to understand things, example seems good to my eyes, If you can explain why it is failing ( or not failing ) , that would be great. Thanks !Prepense
@Prepense thanks for noticing It was because the main was not waiting for collectResult to process everythingStratopause
C
2

Assuming that there is one result for each input, then the channel or wait group are sufficient to solve the problem. There's no need for both.

Option 1: Eliminate the wait group. Receive one result for each input.

sampleChan := make(chan sample)

// Read from contents list
for i, line := range contents {
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan)
}

// Read from sampleChan and put into a slice
var sampleList []sample
for range contents {
    sampleList = append(sampleList, <-sampleChan)
}

Option 2: Eliminate the channel. Allocate a slice of sufficient size and write results from the goroutine to the slice. Modify newSample to return the result instead of sending the result to a channel. With this approach, the order of the results in sampleList matches the order of the input in contents.

var wg sync.WaitGroup
sampleList := make([]sample, len(contents))

for i, line := range contents {
    go func(i int, line string) {
       sampleList[i] = newSample(line, *replicatePtr, *timePt)
    }(i, line)
}
wg.Wait()
fmt.Println(sampleList)
Cooperative answered 17/6, 2024 at 11:42 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.