How make a function thread safe in golang
Asked Answered
S

4

19

How to lock a function or the body of a function from being called by two threads in golang?

My use case is that I have a webserver that is calling a serial interface which can only have one caller at a time, two calls will cancel each other out by creating noise for one another on the serial line.

Sundsvall answered 10/9, 2017 at 7:39 Comment(0)
S
22

Easiest way is to use sync.Mutex:

package main

import (
    "fmt"
    "sync"
    "time"
)

var lock sync.Mutex

func main() {
    go importantFunction("foo")
    go importantFunction("bar")
    time.Sleep(3 * time.Second)
}


func importantFunction(name string) {
    lock.Lock()
    defer lock.Unlock()
    fmt.Println(name)
    time.Sleep(1 * time.Second)
}

Here you'll see that "foo" and "bar" is printed one second apart even though they are go routines.

Go playground: https://play.golang.org/p/mXKl42zRW8

Sundsvall answered 10/9, 2017 at 7:39 Comment(4)
Alternatively, defer lock.Unlock() just after lock.Lock() might be preferable, just in case anything inside the logic of importantFunction panics. That way the Unlock call is guaranteed to happen.Liebfraumilch
There is no guarantee that "first" will be printed before "second" as the first go routine may complete after the second. Although in this example it probably always will.Token
Nobody promised order, just that they will be a second apart ;-)Sundsvall
@Sundsvall Well you do write: "Here you'll see that "first" and "second" is printed". - Im just pointing out that it could be "second" followed by "first". I think that is important to note.Token
C
6

There are two approaches to implementing non-reentrant functions:

  • Blocking: first caller runs the function, subsequent caller(s) block and wait till function exits, then run the function
  • Yielding: first caller runs the function, subsequent caller(s) abort while function is being executed

The two approaches have different merits:

  • Blocking non-reentrant functions are guaranteed to execute as many times as were attempted. However, they can be a backlog in case of long execution times, then bursts of executions following.
  • Yielding non-reentrant functions guarantee non congestion and no bursts, and can guarantee a maximum of execution rate.

Blocking non-reentrant functions are most easily implemented via mutex, as described in @Pylinux's answer. Yielding non-reentrant functions can be implemented via atomic compare & swap, as follows:

import (
    "sync/atomic"
    "time"
)

func main() {
    tick := time.Tick(time.Second)
    var reentranceFlag int64
    go func() {
        for range tick {
            go CheckSomeStatus()
            go func() {
                if atomic.CompareAndSwapInt64(&reentranceFlag, 0, 1) {
                    defer atomic.StoreInt64(&reentranceFlag, 0)
                } else {
                    return
                }
                CheckAnotherStatus()
            }()
        }
    }()
}

In the above, CheckAnotherStatus() is protected against re-entry such that the first caller sets reentranceFlag to 1, and subsequent callers fail to do the same, and quit.

Please consider my blog post, Implementing non re-entrant functions in Golang for a more elaborate discussion.

Countryandwestern answered 6/1, 2018 at 6:28 Comment(0)
L
5

Pylinux's solution using a Mutex is, like he says, probably the simplest in your case. I'll add another one here as an alternative, though. It may or may not apply in your case.

Instead of using a Mutex, you could have a single goroutine perform all the operations on the serial interface, and use a channel to serialise the work it needs to perform. Example:

package main

import (
    "fmt"
    "sync"
)

// handleCommands will handle commands in a serialized fashion
func handleCommands(opChan <-chan string) {
    for op := range opChan {
        fmt.Printf("command: %s\n", op)
    }
}

// produceCommands will generate multiple commands concurrently
func produceCommands(opChan chan<- string) {
    var wg sync.WaitGroup
    wg.Add(2)
    go func() { opChan <- "cmd1"; wg.Done() }()
    go func() { opChan <- "cmd2"; wg.Done() }()
    wg.Wait()
    close(opChan)
}

func main() {
    var opChan = make(chan string)
    go produceCommands(opChan)
    handleCommands(opChan)
}

The advantage of this relative to a Mutex is that you have more control over the wait queue. With the Mutex, the queue exists implicitly at Lock(), and is unbounded. Using a channel, on the other hand, you can limit the maximum number of callers waiting and react appropriately if the synchronised call site is overloaded. You can also do things like checking how many goroutines are in the queue with len(opChan).

Edit to add:

A limitation with the above example (as noted in the comments) is that it doesn't handle returning results from the computation back to the original sender. One way to do that, while keeping the approach of using channels, is to introduce a result channel for each command. So instead of sending strings over the command channel, one can send structs of the following format:

type operation struct {
    command string
    result  chan string
}

Commands would be enqueued onto the command channel as follows:

func enqueueCommand(opChan chan<- operation, cmd string) <-chan string {
    var result = make(chan string)
    opChan <- operation{command: cmd, result: result}
    return result
}

This allows the command handler to send a value back to the originator of the command. Full example on the playground here.

Liebfraumilch answered 10/9, 2017 at 8:39 Comment(8)
I think you're right that this doesn't solve my problem since the result of the function is important to me (the reply from the serial line) and if I used a channel I couldn't be sure which request the two-way channel was responding to. But this does feel like the more idiomatic way of doing it in go so I appreciate you adding it :-)Sundsvall
Just for completeness, your main() should have close(opChan) at the end. It would then be better to remove the sleep and make the second goroutine not a new goroutine (i.e. part of main's instead). Then termination would happen cleanly.Iden
Pylinux, there is a good case for suggesting that this answer is the more idiomatic solution. It follows the Go recommendation 'do not communicate by sharing memory, but share memory by communicating'. In this case, the shared entity is within the serial device. Both approaches will of course work; the mutex is likely to be faster. But the channel approach fits better into any larger-scale network because CSP channels and goroutines can be composed into bigger aggregations quite easily.Iden
Thanks @Rick-777. You're right that the example as a whole isn't complete in that it lacks a robust and idiomatic shut-down mechanism. Adding a close(opChan) at the end in its current state is racy and can theoretically lead to a panic, though. And putting the serialLoop() call in the main goroutine would prevent the program from exiting. I'll see if I can create a more complete example without hiding relevant parts in too much surrounding setup/teardown logic.Liebfraumilch
@Pylinux, yes, returning results is an advantage of the mutex approach. Nevertheless, I added an example of how results can be managed while still using channels. I also adjusted the main example as mentioned in the comment above.Liebfraumilch
Waiting for child goroutines is a bit tiresome in Go, so I wrote a small wrapper to make this easier (github.com/rickb777/process - shameless plug!), which contains the mutex code similar to the above.Iden
Is there a reason for close(opChan) not being able to be called with defer close(opChan). Doesn't defer work with waitGroup?Sundsvall
You can do defer close(opChan) as long as you know with certainty that wg.Wait() has run before it is called. Otherwise you might have producer goroutines still trying to write to the channel, which will lead to a panic if it is closed. So you haven't really added any safety unless you also handle Wait the same way. But then you also need to make sure there is an equal amount of Done calls to the total of the Add calls, or Wait will deadlock in the deferred call. So all-in-all, I'm not sure how much it will help you in terms of achieving code correctness.Liebfraumilch
U
-1
type Semafor struct {
    sync.RWMutex
    semafor int
}

var mySemafor *Semafor

func (m *Semafor) get() int { //read lock
    if m != nil {
        m.RLock()
        defer m.RUnlock()
        return m.semafor
    } else {
        panic ("Error : The semaphore is not initialized, IntSemafor()")
    }
}

func (m *Semafor) set(val int) bool { //write lock
    ok := false
    if m != nil {
        if val != 0 {
            m.Lock()
            if m.semafor == 0 {
                m.semafor = val
                ok = true
            }
            m.Unlock()
        } else {
            m.Lock()
            m.semafor = val
            ok = true
            m.Unlock()
        }
    }
    return ok
}

func InitSemafor() {
    if mySemafor == nil {
        mySemafor = &Semafor{}
        mySemafor.set(0)
    }
}

func OnSemafor() bool {
    if mySemafor != nil {
        for !mySemafor.set(1) {
            for mySemafor.get() == 1 {
                SleepM(2)
            }
        }
      return true   
    } else {
      panic("Error : The semaphore is not initialized, InitSemafor()")
    }
}

func OffSemafor() {
    mySemafor.set(0)
}
Ulloa answered 26/4, 2023 at 21:22 Comment(2)
Use this code: InitSemafor() ... If OnSemafor() { .... critical code... } OffSemafor()Ulloa
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Choe

© 2022 - 2024 — McMap. All rights reserved.