Using TChan with Timeout
Asked Answered
Q

3

10

I have a TChan as input for a thread which should behave like this:

If sombody writes to the TChan within a specific time, the content should be retrieved. If there is nothing written within the specified time, it should unblock and continue with Nothing.

My attempt on this was to use the timeout function from System.Timeout like this:

timeout 1000000 $ atomically $ readTChan pktChannel

This seemed to work but now I discovered, that I am sometimes loosing packets (they are written to the channel, but not read on the other side. In the log I get this:

2014.063.11.53.43.588365 Pushing Recorded Packet: 2 1439
2014.063.11.53.43.592319 Run into timeout
2014.063.11.53.44.593396 Run into timeout
2014.063.11.53.44.593553 Pushing Recorded Packet: 3 1439
2014.063.11.53.44.597177 Sending Recorded Packet: 3 1439

Where "Pushing Recorded Packet" is the writing from the one thread and "Sending Recorded Packet" is the reading from the TChan in the sender thread. The line with Sending Recorded Packet 2 1439 is missing, which would indicate a successful read from the TChan.

It seems that if the timeout is received at the wrong point in time, the channel looses the packet. I suspect that the threadKill function used inside timeout and STM don't play well together.

Is this correct? Does somebody have another solution that does not loose the packet?

Quartic answered 4/3, 2014 at 12:24 Comment(0)
P
9

Use registerDelay, an STM function, to signal a TVar when the timeout is reached. You can then use the orElse function or the Alternative operator <|> to select between the next TChan value or the timeout.

import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import System.Random

-- write random values after a random delay
packetWriter :: Int -> TChan Int -> IO ()
packetWriter maxDelay chan = do
  let xs = randomRs (10000 :: Int, maxDelay + 50000) (mkStdGen 24036583)
  forM_ xs $ \ x -> do
    threadDelay x
    atomically $ writeTChan chan x

-- block (retry) until the delay TVar is set to True
fini :: TVar Bool -> STM ()
fini = check <=< readTVar

-- Read the next value from a TChan or timeout
readTChanTimeout :: Int -> TChan a -> IO (Maybe a)
readTChanTimeout timeoutAfter pktChannel = do
  delay <- registerDelay timeoutAfter
  atomically $
        Just <$> readTChan pktChannel
    <|> Nothing <$ fini delay

-- | Print packets until a timeout is reached
readLoop :: Show a => Int -> TChan a -> IO ()
readLoop timeoutAfter pktChannel = do
  res <- readTChanTimeout timeoutAfter pktChannel
  case res of
    Nothing -> putStrLn "timeout"
    Just val -> do
      putStrLn $ "packet: " ++ show val
      readLoop timeoutAfter pktChannel

main :: IO ()
main = do
  let timeoutAfter = 1000000

  -- spin up a packet writer simulation
  pktChannel <- newTChanIO
  tid <- forkIO $ packetWriter timeoutAfter pktChannel

  readLoop timeoutAfter pktChannel

  killThread tid
Priestly answered 4/3, 2014 at 14:54 Comment(4)
You could simplify fini by using checkMoonstone
@RomanCheplyaka good catch, I've never even noticed this was in the stm package.Priestly
Oh, I see. I completely missed the registerDelay function somehow... Thanks very much!Quartic
pure Nothing <* fini delay can be written Nothing <$ fini delay so it looks more like the Just caseMiltie
A
3

The thumb rule of concurrency is: if adding a sleep in some point inside an IO action matters, your program is not safe.

To understand why the code timeout 1000000 $ atomically $ readTChan pktChannel does not work, consider the following alternative implementation of atomically:

atomically' :: STM a -> IO a
atomically' action = do
  result <- atomically action
  threadDelay someTimeAmount
  return result

The above is equal to atomically, but for an extra innocent delay. Now it is easy to see that if timeout kills the thread during the threadDelay, the atomic action has completed (consuming a message from the channel), yet timeout will return Nothing.

A simple fix to timeout n $ atomically ... could be the following

smartTimeout :: Int -> STM a -> IO (Maybe a)
smartTimeout n action = do
   v <- atomically $ newEmptyTMvar
   _ <- timeout n $ atomically $ do
          result <- action
          putTMvar v result
   atomically $ tryTakeTMvar v

The above uses an extra transactional variable v to do the trick. The result value of the action is stored into v inside the same atomic block in which the action is run. The return value of timeout is not trusted, since it does not tell us if action was run or not. After that, we check the TMVar v, which will be full if and only if action was run.

Awl answered 4/3, 2014 at 20:17 Comment(0)
S
2

Instead of TChan a, use TChan (Maybe a) . Your normal producer (of x) now writes Just x. Fork an extra "ticking" process that writes Nothing to the channel (every x seconds). Then have a reader for the channel, and abort if you get two successive Nothing. This way, you avoid exceptions, which may cause data to get lost in your case (but I am not sure).

Stillness answered 4/3, 2014 at 14:43 Comment(2)
You don't need to change the producer, just the consumer. Something like (fmap Just $ readTChan pkgChannel) `orElse` readTChan failChanEffable
The use case is that normally packets are sent when they are available (they get to the sender from a simulation thread). When no packets are available for a certain time, an automatically generated idle packet should be sent (something like a ping in order to keep the connection up). This is a protocol detail of a lower level, so the simulation thread knows nothing about it. If I would have a ticking thread, then I would get idle packets when they are strictly seen not necessary and would occupy bandwidth without need.Quartic

© 2022 - 2024 — McMap. All rights reserved.