How does select work when multiple channels are involved?
Asked Answered
B

4

23

I found when using select on multiple non buffered channels like

select {
case <- chana:
case <- chanb:
}

Even when both channels have data, but when processing this select, the call that falls in case chana and case chanb is not balanced.

package main

import (
    "fmt"
    _ "net/http/pprof"
    "sync"
    "time"
)

func main() {
    chana := make(chan int)
    chanb := make(chan int)

    go func() {
        for i := 0; i < 1000; i++ {
            chana <- 100 * i
        }
    }()

    go func() {
        for i := 0; i < 1000; i++ {
            chanb <- i
        }
    }()

    time.Sleep(time.Microsecond * 300)

    acount := 0
    bcount := 0
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        for {
            select {
            case <-chana:
                acount++
            case <-chanb:
                bcount++
            }
            if acount == 1000 || bcount == 1000 {
                fmt.Println("finish one acount, bcount", acount, bcount)
                break
            }
        }
        wg.Done()
    }()

    wg.Wait()
}

Run this demo, when one of the chana,chanb finished read/write, the other may remain 999-1 left.

Is there any method to ensure the balance?

found related topic
golang-channels-select-statement

Beane answered 5/12, 2017 at 3:51 Comment(3)
select does not provide any guarantees on what channel it would pick. select is opposite to the "balance" actually. If you want to split evenly - read from one channel, then from the other, without select at all.Everything
If any of your goroutines were actually doing any work you wouldn't have any problems with fairness.Underscore
@Everything yes, found in runtime.selectgo, indeed it check each case by the sort result usingfastrand.Beane
S
27

The Go select statement is not biased toward any (ready) cases. Quoting from the spec:

If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.

If multiple communications can proceed, one is selected randomly. This is not a perfect random distribution, and the spec does not guarantee that, but it's random.

What you experience is the result of the Go Playground having GOMAXPROCS=1 (which you can verify here) and the goroutine scheduler not being preemptive. What this means is that by default goroutines are not executed parallel. A goroutine is put in park if a blocking operation is encountered (e.g. reading from the network, or attempting to receive from or send on a channel that is blocking), and another one ready to run continues.

And since there is no blocking operation in your code, goroutines may not be put in park and it may be only one of your "producer" goroutines will run, and the other may not get scheduled (ever).

Running your code on my local computer where GOMAXPROCS=4, I have very "realistic" results. Running it a few times, the output:

finish one acount, bcount 1000 901
finish one acount, bcount 1000 335
finish one acount, bcount 1000 872
finish one acount, bcount 427 1000

If you need to prioritize a single case, check out this answer: Force priority of go select statement

The default behavior of select does not guarantee equal priority, but on average it will be close to it. If you need guaranteed equal priority, then you should not use select, but you could do a sequence of 2 non-blocking receive from the 2 channels, which could look something like this:

for {
    select {
    case <-chana:
        acount++
    default:
    }
    select {
    case <-chanb:
        bcount++
    default:
    }
    if acount == 1000 || bcount == 1000 {
        fmt.Println("finish one acount, bcount", acount, bcount)
        break
    }
}

The above 2 non-blocking receive will drain the 2 channels at equal speed (with equal priority) if both supply values, and if one does not, then the other is constantly received from without getting delayed or blocked.

One thing to note about this is that if none of the channels provide any values to receive, this will be basically a "busy" loop and hence consume computational power. To avoid this, we may detect that none of the channels were ready, and then use a select statement with both of the receives, which then will block until one of them is ready to receive from, not wasting any CPU resources:

for {
    received := 0
    select {
    case <-chana:
        acount++
        received++
    default:
    }
    select {
    case <-chanb:
        bcount++
        received++
    default:
    }

    if received == 0 {
        select {
        case <-chana:
            acount++
        case <-chanb:
            bcount++
        }
    }

    if acount == 1000 || bcount == 1000 {
        fmt.Println("finish one acount, bcount", acount, bcount)
        break
    }
}

For more details about goroutine scheduling, see these questions:

Number of threads used by Go runtime

Goroutines 8kb and windows OS thread 1 mb

Why does it not create many threads when many goroutines are blocked in writing file in golang?

Solarium answered 5/12, 2017 at 8:16 Comment(5)
wouldnt your example with equal priority waste more CPU time doing empty iterations of for{} loop with more sparsity in data going through the channels? So basically one goroutine would always use one CPU to 1) check chana - empty 2) select default 3) check chanb - empty 4) select default 5) evaluate if - false. Over and over instead of sharing that CPU with other goroutines to do useful work?Germayne
@Germayne Nice catch! You're right, thanks for pointing that out. I added a solution for that case as well. See edited answer.Solarium
@Solarium if I select from 2 channels ch1, ch2 and both has elements to read at the same time. The go routine will pick one case to execute, say ch2. My question is: Will the available element from ch1 be pulled out or not?Chadwick
@MạnhQuyếtNguyễn No, it won't be, only the chosen communication op is executed.Solarium
@Solarium may be only one of your "producer" goroutines will run, and the other may not get scheduled (ever). If I time.Sleep for long enough, why wouldn't both routines run? From go Ticker example (also runs on playground), it clocks exactly 10s even though it's a different go routine.Coriander
F
3

As mentioned in the comment, if you want to ensure balance, you can just forgo using select altogether in the reading goroutine and rely on the synchronisation provided by unbuffered channels:

go func() {
    for {
        <-chana
        acount++
        <-chanb
        bcount++

        if acount == 1000 || bcount == 1000 {
            fmt.Println("finish one acount, bcount", acount, bcount)
            break
        }
    }
    wg.Done()
}()
Fen answered 5/12, 2017 at 4:41 Comment(0)
D
0

Edited : You can balance from the supply side too, but @icza's answer seems to be a better option to me than this & also explains scheduling which was causing this in the first place. Surprisingly it was one-sided even on my (virtual) machine.

Here's something which can balance two routines from the supply side (somehow doesn't seem to work on Playground).

package main

import (
    "fmt"
    _ "net/http/pprof"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    chana := make(chan int)
    chanb := make(chan int)
    var balanceSwitch int32

    go func() {
        for i := 0; i < 1000; i++ {
            for atomic.LoadInt32(&balanceSwitch) != 0 {
                fmt.Println("Holding R1")
                time.Sleep(time.Nanosecond * 1)
            }
            chana <- 100 * i
            fmt.Println("R1: Sent i", i)
            atomic.StoreInt32(&balanceSwitch, 1)

        }
    }()

    go func() {
        for i := 0; i < 1000; i++ {

            for atomic.LoadInt32(&balanceSwitch) != 1 {
                fmt.Println("Holding R2")
                time.Sleep(time.Nanosecond * 1)
            }
            chanb <- i
            fmt.Println("R2: Sent i", i)
            atomic.StoreInt32(&balanceSwitch, 0)

        }
    }()

    time.Sleep(time.Microsecond * 300)

    acount := 0
    bcount := 0
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        for {
            select {
            case <-chana:
                acount++
            case <-chanb:
                bcount++
            }
            fmt.Println("Acount Bcount", acount, bcount)
            if acount == 1000 || bcount == 1000 {
                fmt.Println("finish one acount, bcount", acount, bcount)
                break
            }
        }
        wg.Done()
    }()

    wg.Wait()
}

By changing atomic.LoadInt32(&balanceSwitch) != XX and atomic.StoreInt32(&balanceSwitch, X), or other mechanisms you can map it to any number of routines. It may not be the best thing to do, but if that's a requirement then you may have to consider such options. Hope this helps.

Destine answered 5/12, 2017 at 6:18 Comment(3)
Virtual machines often have 1 VCPU assigned which results in GOMAXPROCS defaulting to 1, which would explain the "one-sided" effect you experienced. You can verify it with the fmt.Println(runtime.GOMAXPROCS(0)) code.Solarium
Ok. I tried with above code. Actually, I have 2 VCPU's allocated. It does look one-sided still on my system. These are the numbers for acount, bcount : 19 1000, 243 1000, 1000 1, 1000 38, 1000 635, 262 1000, 1 1000, 1000 51, 1 1000, 1000 98, 1 1000, 245 1000, 1 1000Destine
Since there are 3 goroutines working in the example (2 producers and 1 consumer), as long as available CPUs are less than 3, the side effect can be explained with the same reasoning. In my example I had 4 CPUs, so this was not experienced.Solarium
P
0

Seems like every other commenter missed the actual bug here.

The reason this is not balanced is because it literally cannot ever be balanced with the code above. It's a SINGLE thread so the for loop can only ever process either chana OR chanb on each pass through the loop. SO: one of the chans will ALWAYS reach 1000 first.

The if statement is using ||, meaning when EITHER gets to 1000, it will stop.

THE EASY BUGFIX HERE: change || to && in the if statement

Peckham answered 14/6, 2022 at 17:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.