STM with partial atomicity for certain TVars
Asked Answered
S

1

7

I am doing things with STM and have among other things used the TBQueue data structure with great success. An useful feature I've been using it for involves reading from it based on a precondition in a TVar, basically like so:

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readTBQueue queue
    doSomethingWith a
  else doSomethingElse

If we assume that queue is empty and shouldReadVar contains True before executing this block, it will result in readTBQueue calling retry, and the block will be re-executed when shouldReadVar contains False or queue contains an element, whatever happens first.


I am now in need of a synchronous channel data structure, similar to the structure described in this article (Please read it if you want to understand this question), except it needs to be readable with a pre-condition like in the previous example, and possibly compose with other stuff as well.

Let's call this data structure SyncChan with writeSyncChan and readSyncChan operations defined on it.

And here's a possible use case: This (pseudo) code (which will not work because I mix STM/IO concepts):

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readSyncChan syncChan
    doSomethingWith a
  else doSomethingElse

Assuming that no other thread is currently blocking on a writeSyncChan call, and shouldReadChan contains True, I want the block to "retry" until either shouldReadChan contains False, or a different thread blocks on a writeSyncChan. In other words: when one thread retrys on writeSyncChan and another thread blocks reaches a readSyncChan, or vice versa, I want the value to be transferred along the channel. In all other cases, both sides should be in a retry state and thus react to a change in shouldReadVar, so that the read or write can be cancelled.

The naïve approach described in the article linked above using two (T)MVars is of course not possible. Because the data structure is synchronous, it is impossible to use it within two atomically blocks, because you cannot change one TMVar and wait for another TMVar to be changed in an atomic context.

Instead, I am looking for a kind of partial atomicity, where I can "commit" a certain part of a transaction and only roll it back when certain variables change, but not others. If I have "msg" and "ack" variables like the first example in the article above, I want to be able to write to the "msg" variable, then wait for either a value to arrive on "ack", or for my other transactional variables to change. If other transactional variables change, the whole atomic block should be retried, and if an "ack" value arrives, the transaction should continue as it was in the previous state. For the reading side, something similar should happen, except I would of course be reading from "msg" and writing to "ack."

Is this possible to do using GHC STM, or do I need to do manual MVar/rollback handling?

Sparid answered 13/6, 2013 at 21:22 Comment(0)
W
3

This is what you want:

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad

data SyncChan a = SyncChan (TMVar a) (TMVar ())

newSyncChan :: IO (SyncChan a)
newSyncChan = do
    msg <- newEmptyTMVarIO
    ack <- newEmptyTMVarIO
    return (SyncChan msg ack)

readIf :: SyncChan a -> TVar Bool -> STM (Maybe a)
readIf (SyncChan msg ack) shouldReadVar = do
    b <- readTVar shouldReadVar
    if b
        then do
            a <- takeTMVar msg
            putTMVar ack ()
            return (Just a)
        else return Nothing

write :: SyncChan a -> a -> IO ()
write (SyncChan msg ack) a = do
    atomically $ putTMVar msg a
    atomically $ takeTMVar ack

main = do
    sc <- newSyncChan
    tv <- newTVarIO True
    forkIO $ forever $ forM_ [False, True] $ \b -> do
        threadDelay 2000000
        atomically $ writeTVar tv b
    forkIO $ forM_ [0..] $ \i -> do
        putStrLn "Writing..."
        write sc i
        putStrLn "Write Complete"
        threadDelay 300000
    forever $ do
        putStrLn "Reading..."
        a <- atomically $ readIf sc tv
        print a
        putStrLn "Read Complete"

This gives the behavior you had in mind. While the TVar is True the input and output ends will be synchronized with each other. When the TVar switches to False then the read end freely aborts and returns Nothing.

Wartburg answered 13/6, 2013 at 22:5 Comment(8)
You assume that there is a data structure called SyncChan with certain semantics. However, there is no such data structure; the problem comes up when trying to implement it. You've basically taken the code from my second code block in the question and extracted the branch into a Maybe value. The actual problem lies with implementing readSyncChan and writeSyncChan!Sparid
@Sparid I fixed it and wrote up the entire implementation, including example use code.Wartburg
thanks for taking the time to write up all this code. However, here it is not possible to do conditional writes (with a shouldWriteVar, so to speak). It doesn't work by just adding to the first atomically block in the write function, because if a value has been written ahd the thread is waiting for the ack, there is no way to respond to the change in shouldWriteVar! Is the cleanest way to check shouldWriteVar again here, or is there some other option which avoids some strange deadlock situation I haven't considered?Sparid
Yeah, I don't know any way to cleanly do that because it implies interleaving distinct STM transactions, which would violate the "Isolation" property of STM.Wartburg
Well, I realize that the outer monad would have to be IO, of course, so it should be possible to interleave transactions in a controlled manner.Sparid
@Sparid Yeah, I think you might have to implement your own rollback scheme. However, I'm curious why you want the writer to block until the downstream value is received. Why not just deposit the value and not wait for acknowledgement from the reader?Wartburg
As an experiment, I'm trying to implement a concurrency system similar/semantically identical to the one used by the Go language. An unbuffered channel in Go is not only used for data transfer, but also for synchronization between two threads. As for the reason why you'd want to do this, you can look it up in articles on Go all over the web. I would normally not use this kind of concurrency, but would of course use a separate barrier/channel instead; this is just an experiment to see how powerful Haskell concurrency primitives are.Sparid
Got it. If you figure out an elegant way to do this completely in STM then let me know and I will also add it to pipes-concurrency as one of the buffering options. However, right now I think your best bet is the manual rollback system you proposed if you want both the read and write to be conditional.Wartburg

© 2022 - 2024 — McMap. All rights reserved.