A different solution to using a mutex is to use a channel to communicate listener changes.
A full example in this style looks like this. The interesting code is in FanOuter.
package main
import (
"fmt"
"time"
)
type Message int
type ListenerUpdate struct {
Add bool
Listener chan Message
}
// FanOuter maintains listeners, and forwards messages from msgc
// to each of them. Updates on listc can add or remove a listener.
func FanOuter(msgc chan Message, listc chan ListenerUpdate) {
lstrs := map[chan Message]struct{}{}
for {
select {
case m := <-msgc:
for k := range lstrs {
k <- m
}
case lup := <-listc:
if lup.Add {
lstrs[lup.Listener] = struct{}{}
} else {
delete(lstrs, lup.Listener)
}
}
}
}
func main() {
msgc := make(chan Message)
listc := make(chan ListenerUpdate)
go FanOuter(msgc, listc)
// Slowly add listeners, then slowly remove them.
go func() {
chans := make([]chan Message, 10)
// Adding listeners.
for i := range chans {
chans[i] = make(chan Message)
// A listener prints its id and any messages received.
go func(i int, c chan Message) {
for {
m := <-c
fmt.Printf("%d received %d\n", i, m)
}
}(i, chans[i])
listc <- ListenerUpdate{true, chans[i]}
time.Sleep(300 * time.Millisecond)
}
// Removing listeners.
for i := range chans {
listc <- ListenerUpdate{false, chans[i]}
time.Sleep(300 * time.Millisecond)
}
}()
// Every second send a message to the fanouter.
for i := 0; i < 10; i++ {
fmt.Println("About to send ", i)
msgc <- Message(i)
time.Sleep(1 * time.Second)
}
}