TCP requires that the application provide its own message boundary markers. A simple protocol to mark message boundaries is to send the length of a chunk of data, the chunk of data, and whether there are remaining chunks that are part of the same message. The optimal size for the header that holds the message boundary information depends on the distribution of message sizes.
Developing our own little message protocol, we'll use two bytes for our headers. The most significant bit from the bytes (treated as a Word16
) will hold whether or not there are remaining chunks in the message. The remaining 15 bits will hold the length of the message in bytes. This will allow chunk sizes up to 32k, larger than typical TCP packets. A two byte header will be less-than-optimal if messages are typically very small, especially if they are smaller than 127 bytes.
We're going to use network-simple for the networking portion of our code. We'll serialize or deserialize messages with the binary package which encode
s and decode
s to and from lazy ByteString
s.
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as B
import Network.Simple.TCP
import Data.Bits
import Data.Binary
import Data.Functor
import Control.Monad.IO.Class
The first utility we will need is the ability to write Word16
headers into strict ByteString
s and read them back out again. We'll write them in big-endian order. Alternatively, these could be written in terms of the Binary
instance for Word16
.
writeBE :: Word16 -> B.ByteString
writeBE x = B.pack . map fromIntegral $ [(x .&. 0xFF00) `shiftR` 8, x .&. 0xFF]
readBE :: B.ByteString -> Maybe Word16
readBE s =
case map fromIntegral . B.unpack $ s of
[w1, w0] -> Just $ w1 `shiftL` 8 .|. w0
_ -> Nothing
The main challenge will be to send and recieve the lazy ByteString
s forced on us by the binary package. Since we will only be able to send up to 32k bytes at a time, we need to be able to rechunk
a lazy bytestring into chunks with a total known length no more than our maximum. A single chunk might already be more than the maximum; any chunk that doesn't fit into our new chunks is split across multiple chunks.
rechunk :: Int -> [B.ByteString] -> [(Int, [B.ByteString])]
rechunk n = go [] 0 . filter (not . B.null)
where
go acc l [] = [(l, reverse acc)]
go acc l (x:xs) =
let
lx = B.length x
l' = lx + l
in
if l' <= n
then go (x:acc) l' xs
else
let (x0, x1) = B.splitAt (n-l) x
in (n, reverse (x0:acc)) : go [] 0 (x1:xs)
recvExactly
will loop until all of the bytes we requested have been received.
recvExactly :: MonadIO m => Socket -> Int -> m (Maybe [B.ByteString])
recvExactly s toRead = go [] toRead
where
go acc toRead = do
body <- recv s toRead
maybe (return Nothing) (go' acc toRead) body
go' acc toRead body =
if B.length body < toRead
then go (body:acc) (toRead - B.length body)
else return . Just . reverse $ acc
Sending a lazy ByteString
consists of breaking it into chunks of a size we know we can send and sending each chunk along with the header holding the size and whether there are any more chunks.
sendLazyBS :: (MonadIO m) => Socket -> L.ByteString -> m ()
sendLazyBS s = go . rechunk maxChunk . L.toChunks
where
maxChunk = 0x7FFF
go [] = return ()
go ((li, ss):xs) = do
let l = fromIntegral li
let h = writeBE $ if null xs then l else l .|. 0x8000
sendMany s (h:ss)
go xs
Receiving a lazy ByteString
consists of reading the two byte header, reading a chunk of the size indicated by the header, and continuing to read as long as the header indicated there are more chunks.
recvLazyBS :: (MonadIO m, Functor m) => Socket -> m (Maybe L.ByteString)
recvLazyBS s = fmap L.fromChunks <$> go []
where
go acc = do
header <- recvExactly s 2
maybe (return Nothing) (go' acc) (header >>= readBE . B.concat)
go' acc h = do
body <- recvExactly s . fromIntegral $ h .&. 0x7FFF
let next = if h .&. 0x8000 /= 0
then go
else return . Just . concat . reverse
maybe (return Nothing) (next . (:acc) ) body
Sending or receiving a message that has a Binary
instance is just sending an encode
d lazy ByteString
or receiving the lazy ByteString
and decode
ing it.
sendBinary :: (MonadIO m, Binary a) => Socket -> a -> m ()
sendBinary s = sendLazyBS s . encode
recvBinary :: (MonadIO m, Binary a, Functor m) => Socket -> m (Maybe a)
recvBinary s = d . fmap decodeOrFail <$> recvLazyBS s
where
d (Just (Right (_, _, x))) = Just x
d _ = Nothing
hPut
andhGet
your numbers directly to the socket than to build up a largeByteString
and send that. The difference in speed can be as much as 5-fold faster forhPut
/hGet
. This is how, for example, how all theblaze-*
packages get their speed improvements. – PlagiarizehPut
andhGet
use functions that already do their own buffering? – CapitallyByteString
intermediate entirely (such as when writing to a handle or socket), but on closer reading I see that's not what you meant. – Plagiarize