Why is my Golang Channel Write Blocking Forever?
Asked Answered
N

2

19

I've been attempting to take a swing at concurrency in Golang by refactoring one of my command-line utilities over the past few days, but I'm stuck.

Here's the original code (master branch).

Here's the branch with concurrency (x_concurrent branch).

When I execute the concurrent code with go run jira_open_comment_emailer.go, the defer wg.Done() never executes if the JIRA issue is added to the channel here, which causes my wg.Wait() to hang forever.

The idea is that I have a large amount of JIRA issues, and I want to spin off a goroutine for each one to see if it has a comment I need to respond to. If it does, I want to add it to some structure (I chose a channel after some research) that I can read from like a queue later to build up an email reminder.

Here's the relevant section of the code:

// Given an issue, determine if it has an open comment
// Returns true if there is an open comment on the issue, otherwise false
func getAndProcessComments(issue Issue, channel chan<- Issue, wg *sync.WaitGroup) {
    // Decrement the wait counter when the function returns
    defer wg.Done()

    needsReply := false

    // Loop over the comments in the issue
    for _, comment := range issue.Fields.Comment.Comments {
        commentMatched, err := regexp.MatchString("~"+config.JIRAUsername, comment.Body)
        checkError("Failed to regex match against comment body", err)

        if commentMatched {
            needsReply = true
        }

        if comment.Author.Name == config.JIRAUsername {
            needsReply = false
        }
    }

    // Only add the issue to the channel if it needs a reply
    if needsReply == true {
        // This never allows the defered wg.Done() to execute?
        channel <- issue
    }
}

func main() {
    start := time.Now()

    // This retrieves all issues in a search from JIRA
    allIssues := getFullIssueList()

    // Initialize a wait group
    var wg sync.WaitGroup

    // Set the number of waits to the number of issues to process
    wg.Add(len(allIssues))

    // Create a channel to store issues that need a reply
    channel := make(chan Issue)

    for _, issue := range allIssues {
        go getAndProcessComments(issue, channel, &wg)
    }

    // Block until all of my goroutines have processed their issues.
    wg.Wait()

    // Only send an email if the channel has one or more issues
    if len(channel) > 0 {
        sendEmail(channel)
    }

    fmt.Printf("Script ran in %s", time.Since(start))
}
Nickels answered 25/5, 2016 at 14:12 Comment(4)
You have len(channel) all over the place, but that channel has no length because it's un-buffered. You need to receive from the channel for any sends to complete (and in general, making decisions based on the length of a buffered channel is a mistake, since concurrent operations can race to change that value)Fukien
So, if I'm doing all of my writes to the channel, waiting for them to complete, and then reading from the channel... that can never happen because the sends will never actually complete and trigger the defer wg.Done()? How would you tackle implementing this concurrency, in general? Also, I'm not sure that you're correct on the len(channel), since the godocs state that it returns the current number of elements in the channel, not the capacity like cap(channel) would. golang.org/pkg/builtin/#lenNickels
len(channel) returns the current number of items in a "buffered" channel, but since channels are usually used concurrently, the result of len is "stale" as soon as you read it. One would generally have concurrent goroutines sending and receiving from the channel. I would advise going through the Concurrency section in the Tour Of Go again to get a better grasp of how channels work.Fukien
@Nickels yes, the first channel send will block until someone reads it, which never happens. The simplest thing you can do is to start a goroutine that reads from the other end of the channel before writing to it. As for len and cap, consider that len(c) is always <= cap(c).Crepuscule
H
32

The goroutines block on sending to the unbuffered channel. A minimal change unblocks the goroutines is to create a buffered channel with capacity for all issues:

channel := make(chan Issue, len(allIssues))

and close the channel after the call to wg.Wait().

Humism answered 25/5, 2016 at 15:5 Comment(6)
But it kind of defeats the purpose of a channel as a pipe between concurrent blocks....Subtrahend
@Subtrahend There's nothing wrong with using a channel is a queue of items.Burning
true, it saves you the overhead of passing a mutexed slice around.Subtrahend
Yup, a slice with mutex requires more code and is unlikely to perform significantly better in this scenario.Burning
yes, buffered channels make great fifo buffers/queues, but in this case there's no reason to wait for the buffer to fill when each issue is already "queued" up in its own goroutine. The goal was to improve concurrency here, not add another queue to the pipeline.Fukien
Excellent, thank you. The only additional change I had to make was to close(channel) after I was done with all of my writes.Nickels
J
0

It's important to be aware that sending or receiving with an uninitialized (nil) channel will block forever (see Closed channel vs nil channel).

So what may have happened is that you've either failed to initialize the channel or, after initialization, the channel variable has somehow been reset with a nil assignment (likely due to a bug elsewhere in the code).

In summary: Double check that your channel is not nil

log.Printf("About to use channel %v", mychannel)
Jillian answered 1/8, 2024 at 17:52 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.