Here's some code that implements a small receiving server using conduit
, network-conduit
, and stm-conduit
. It receives data on a socket and then streams it through an STM-channel to the main thread.
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)
import System.Directory (removeFile)
import System.IO
type BSChan = TBMChan ByteString
listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ listen soc 2 >> loop where
loop = do
(conn, _) <- accept soc
sourceSocket conn $$ sinkTBMChan chan
close conn
loop
main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
socChan <- listenSocket soc 8
sourceTBMChan socChan $$ DCB.sinkHandle stdout
removeFile "mysock"
(In the real application, the stream of data from the socket gets merged with some others, which is why I don't handle it directly in the listener).
The problem is that, where I had expected this to stay open until the main thread is killed, instead it exits after the first message is received on the socket. I cannot work out why it does this, unless it's that the sink (on 2nd to last line) is exiting once it sees the end of the first stream of data. Can I persuade it not to do this? There's some stuff in Conduit
about making a source resumable, but not a sink.