I've been attempting to take a swing at concurrency in Golang by refactoring one of my command-line utilities over the past few days, but I'm stuck.
Here's the original code (master branch).
Here's the branch with concurrency (x_concurrent branch).
When I execute the concurrent code with go run jira_open_comment_emailer.go
, the defer wg.Done()
never executes if the JIRA issue is added to the channel here, which causes my wg.Wait()
to hang forever.
The idea is that I have a large amount of JIRA issues, and I want to spin off a goroutine for each one to see if it has a comment I need to respond to. If it does, I want to add it to some structure (I chose a channel after some research) that I can read from like a queue later to build up an email reminder.
Here's the relevant section of the code:
// Given an issue, determine if it has an open comment
// Returns true if there is an open comment on the issue, otherwise false
func getAndProcessComments(issue Issue, channel chan<- Issue, wg *sync.WaitGroup) {
// Decrement the wait counter when the function returns
defer wg.Done()
needsReply := false
// Loop over the comments in the issue
for _, comment := range issue.Fields.Comment.Comments {
commentMatched, err := regexp.MatchString("~"+config.JIRAUsername, comment.Body)
checkError("Failed to regex match against comment body", err)
if commentMatched {
needsReply = true
}
if comment.Author.Name == config.JIRAUsername {
needsReply = false
}
}
// Only add the issue to the channel if it needs a reply
if needsReply == true {
// This never allows the defered wg.Done() to execute?
channel <- issue
}
}
func main() {
start := time.Now()
// This retrieves all issues in a search from JIRA
allIssues := getFullIssueList()
// Initialize a wait group
var wg sync.WaitGroup
// Set the number of waits to the number of issues to process
wg.Add(len(allIssues))
// Create a channel to store issues that need a reply
channel := make(chan Issue)
for _, issue := range allIssues {
go getAndProcessComments(issue, channel, &wg)
}
// Block until all of my goroutines have processed their issues.
wg.Wait()
// Only send an email if the channel has one or more issues
if len(channel) > 0 {
sendEmail(channel)
}
fmt.Printf("Script ran in %s", time.Since(start))
}
len(channel)
all over the place, but that channel has no length because it's un-buffered. You need to receive from the channel for any sends to complete (and in general, making decisions based on the length of a buffered channel is a mistake, since concurrent operations can race to change that value) – Fukiendefer wg.Done()
? How would you tackle implementing this concurrency, in general? Also, I'm not sure that you're correct on thelen(channel)
, since the godocs state that it returns the current number of elements in the channel, not the capacity likecap(channel)
would. golang.org/pkg/builtin/#len – Nickelslen(channel)
returns the current number of items in a "buffered" channel, but since channels are usually used concurrently, the result oflen
is "stale" as soon as you read it. One would generally have concurrent goroutines sending and receiving from the channel. I would advise going through the Concurrency section in the Tour Of Go again to get a better grasp of how channels work. – Fukienlen(c)
is always <=cap(c)
. – Crepuscule