One processing conduit, 2 IO sources of the same type
Asked Answered
O

1

61

In my GHC Haskell application utilizing stm, network-conduit and conduit, I have a strand for each socket which is forked automatically using runTCPServer. Strands can communicate with other strands through the use of a broadcasting TChan.

This showcases how I would like to set up the conduit "chain":

enter image description here

So, what we have here is two sources (each bound to helper conduits which) which produce a Packet object which encoder will accept and turn into ByteString, then send out the socket. I've had a great amount of difficulty with the efficient (performance is a concern) fusing of the two inputs.

I would appreciate if somebody could point me in the right direction.


Since it would be rude of me to post this question without making an attempt, I'll put what I've previously tried here;

I've written/cherrypicked a function which (blocking) produces a Source from a TMChan (closeable channel);

-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)

Likewise, a function to transform a Chan into a sink;

-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink

Then mergeSources is straightforward; fork 2 threads (which I really don't want to do, but what the heck) which can put their new items into the one list which I then produce a source of;

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan

While I was successful in making these functions typecheck, I was unsuccessful in getting any utilization of these functions to typecheck;

-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

I see this method as being flawed anyhow -- there are many intermediate lists and conversions. This can not be good for performance. Seeking guidance.


PS. From what I can understand, this is not a duplicate of; Fusing conduits with multiple inputs , as in my situation both sources produce the same type and I don't care from which source the Packet object is produced, as long as I'm not waiting on one while another has objects ready to be consumed.

PPS. I apologize for the usage (and therefore requirement of knowledge) of Lens in example code.

Overwork answered 26/5, 2013 at 7:25 Comment(4)
Is there a reason why you are not using Data.Conduit.TMChan from the stm-conduit package? It has all the functions you are defining, including mergeSources.Zig
Actually there is - I would like the source which merges both to close as soon as either sources close. The stm-conduit package uses refcounts (and waits until the last source closes to close the resultant source) which is not the desired behavior. By closing immediately after either source invalidates, it gives me the ability to, when I close my global TMChan, timely close every socket also.Overwork
An idle thought: What happens if you grab the mergeSources from TMChan, throw out the ref counting stuff, and replace the decRefCount refcount bit with code to close all of the sources?Asocial
I (kind of) tried that above but I had type checking issues compiling the versions of the respective stm-conduit functions on github so I had to modify them heavily (see OP). I'll try it again tonight using the hackage ones. I've been thinking about it a lot lately, and an intermediate Chan really may be necessary.Overwork
P
1

I don't know if it's any help, but I tried to implement Iain's suggestion and made a variant of mergeSources' that stops as soon as any of the channels does:

mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c

(This simple addition is available here).

Some comments to your version of mergeSources (take them with a grain of salt, it can be I didn't understand something well):

  • Using ...TMChan instead of ...TBMChan seems dangerous. If the writers are faster than the reader, your heap will blow. Looking at your diagram it seems that this can easily happen, if your TCP peer doesn't read data fast enough. So I'd definitely use ...TBMChan with perhaps large but limited bound.
  • You don't need the MonadSTM m constraint. All STM stuff is wrapped into IO with

    liftSTM = liftIO . atomically
    

    Maybe this will help you slightly when using mergeSources' in serverApp.

  • Just a cosmetic issue, I found

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    

    very hard to read due to its use of liftA2 on the (->) r monad. I'd say

    do
        c <- liftSTM newTMChan
        fsrc sx c
        retn c
    

    would be longer, but much easier to read.

Could you perhaps create a self-contained project where it would be possible to play with serverApp?

Parimutuel answered 5/7, 2013 at 19:8 Comment(1)
Thanks for the advice. I'll keep it in mind (I'll have to revisit the problem soon).Overwork

© 2022 - 2024 — McMap. All rights reserved.