Operating on parsed data with attoparsec
Asked Answered
A

3

6

Background

I've written a logfile parser using attoparsec. All my smaller parsers succeed, as does the composed final parser. I've confirmed this with tests. But I'm stumbling over performing operations with the parsed stream.

What I've tried

I started by trying to pass the successfully parsed input to a function. But all the seems to get is Done (), which I'm presuming means the logfile has been consumed by this point.

prepareStats :: Result Log -> IO ()
prepareStats r =
case r of
    Fail _ _ _ -> putStrLn $ "Parsing failed"
    Done _ parsedLog -> putStrLn "Success" -- This now has a [LogEntry] array. Do something with it.

main :: IO ()
main = do
[f] <- getArgs
logFile <- B.readFile (f :: FilePath)
let results = parseOnly parseLog logFile
putStrLn "TBC"

What I'm trying to do

I want to accumulate some stats from the logfile as I consume the input. For example, I'm parsing response codes and I'd like to count how many 2** responses there were and how many 4/5** ones. I'm parsing the number of bytes each response returned as Ints, and I'd like to efficiently sum these (sounds like a foldl'?). I've defined a data type like this:

data Stats = Stats {
    successfulRequestsPerMinute :: Int
  , failingRequestsPerMinute    :: Int
  , meanResponseTime            :: Int
  , megabytesPerMinute          :: Int
  } deriving Show

And I'd like to constantly update that as I parse the input. But the part of performing operations as I consume is where I got stuck. So far print is the only function I've successfully passed output to and it showed the parsing is succeeding by returning Done before printing the output.

My main parser(s) look like this:

parseLogEntry :: Parser LogEntry
parseLogEntry = do
ip <- logItem
_ <- char ' '
logName <- logItem
_ <- char ' '
user <- logItem
_ <- char ' '
time <- datetimeLogItem
_ <- char ' '
firstLogLine <- quotedLogItem
_ <- char ' '
finalRequestStatus <- intLogItem
_ <- char ' '
responseSizeB <- intLogItem
_ <- char ' '
timeToResponse <- intLogItem
return $ LogEntry ip logName user time firstLogLine finalRequestStatus responseSizeB timeToResponse

type Log = [LogEntry]

parseLog :: Parser Log
parseLog = many $ parseLogEntry <* endOfLine

Desired outcome

I want to pass each parsed line to a function that will update the above data type. Ideally I want this to be very memory efficient because it'll be operating on large files.

Armillas answered 8/9, 2015 at 21:41 Comment(1)
Please edit your question and make your code self-contained. In particular, add the required import statements. Also, have you considered using the applicative style for parseLogEntry? It wouldn't affect performance, but it would readability.Fungous
A
2

If each log entry is exactly one line, here's a simpler solution:

do loglines <- fmap BS.lines $ BS.readfile "input-file.log"
   foldl' go initialStats loglines
   where
     go stats logline = 
        case parseOnly yourParser logline of
          Left e  -> error $ "oops: " ++ e
          Right r -> let stats' = ... combine r with stats ...
                     in stats'

Basically you are just reading the file line-by-line and calling parseOnly on each line and accumulating the results.

Alius answered 8/9, 2015 at 23:10 Comment(3)
I think I understand this but I'm not sure. I'm interpreting this as: 1 - get array (I presume lazily) of lines from the input file; 2 - setup a foldl' using a function 'go', with an initial value of 'initialStats', which I presume I'd initialise as Stats record with all 0s; 3 - define the function 'go', which parses each line and either returns an error or calls a function that updates the initialStats record; 4 - I don't know what in stats' is doing here. Am I understanding correctly up to the point at 4 where I don't? If not, are you able to clarify? Thanks for your help.Armillas
You've got it right. stats' is the updated version of stats. For instance, if stats is just a Int which is counting the number of lines parsed, then you can define stats' as stats+1. Folds are the way you loop in Haskell while carrying around a piece of state from iteration to iteration. Also see this wiki page for more information about strictness and folds.Alius
fwiw I modified this for use with the standard streaming libraries below.Frostbitten
A
3

You have to make your unit of parsing a single log entry rather than a list of log entries.

It's not pretty, but here is an example of how to interleave parsing and processing:

(Depends on bytestring, attoparsec and mtl)

{-# LANGUAGE NoMonomorphismRestriction, FlexibleContexts #-}

import qualified Data.ByteString.Char8 as BS
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Attoparsec.ByteString.Char8 hiding (takeWhile)
import Data.Char
import Control.Monad.State.Strict

aWord :: Parser BS.ByteString
aWord = skipSpace >> A.takeWhile isAlphaNum

getNext :: MonadState [a] m => m (Maybe a)
getNext = do
  xs <- get
  case xs of
    [] -> return Nothing
    (y:ys) -> put ys >> return (Just y)

loop iresult =
  case iresult of
    Fail _ _ msg  -> error $ "parse failed: " ++ msg
    Done x' aword -> do lift $ process aword; loop (parse aWord x')
    Partial _     -> do
      mx <- getNext
      case mx of
        Just y  -> loop (feed iresult y)
        Nothing -> case feed iresult BS.empty of
                     Fail _ _ msg  -> error $ "parse failed: " ++ msg
                     Done x' aword -> do lift $ process aword; return ()
                     Partial _     -> error $ "partial returned"  -- probably can't happen

process :: Show a => a -> IO ()
process w = putStrLn $ "got a word: " ++ show w

theWords = map BS.pack [ "this is a te", "st of the emergency ", "broadcasting sys", "tem"]


main = runStateT (loop (Partial (parse aWord))) theWords

Notes:

  • We parse a aWord at a time and call process after each word is recognized.
  • Use feed to feed the parser more input when it returns a Partial.
  • Feed the parser an empty string when there is no more input left.
  • When Done is return, process the recognized word and continue with parse aWord.
  • getNext is just an example of a monadic function which gets the next unit of input. Replace it with your own version - i.e. something that reads the next line from a file.

Update

Here is a solution using parseWith as @dfeuer suggested:

noMoreInput = fmap null get

loop2 x = do
  iresult <- parseWith (fmap (fromMaybe BS.empty) getNext) aWord x
  case iresult of
    Fail _ _ msg  -> error $ "parse failed: " ++ msg
    Done x' aword -> do lift $ process aword;
                        if BS.null x'
                           then do b <- noMoreInput
                                   if b then return ()
                                        else loop2 x'
                           else loop2 x'
    Partial _     -> error $ "huh???" -- this really can't happen

main2 = runStateT (loop2 BS.empty) theWords
Alius answered 8/9, 2015 at 22:53 Comment(2)
It is definitely possible, in general, to get a Partial, feed it some more input, and get another Partial. The easiest way to run an attoparsec parser (if you're not using something like pipes or conduit, is to use parseWith, which will never return Partial.Hermilahermina
Added solution based on parseWith.Alius
A
2

If each log entry is exactly one line, here's a simpler solution:

do loglines <- fmap BS.lines $ BS.readfile "input-file.log"
   foldl' go initialStats loglines
   where
     go stats logline = 
        case parseOnly yourParser logline of
          Left e  -> error $ "oops: " ++ e
          Right r -> let stats' = ... combine r with stats ...
                     in stats'

Basically you are just reading the file line-by-line and calling parseOnly on each line and accumulating the results.

Alius answered 8/9, 2015 at 23:10 Comment(3)
I think I understand this but I'm not sure. I'm interpreting this as: 1 - get array (I presume lazily) of lines from the input file; 2 - setup a foldl' using a function 'go', with an initial value of 'initialStats', which I presume I'd initialise as Stats record with all 0s; 3 - define the function 'go', which parses each line and either returns an error or calls a function that updates the initialStats record; 4 - I don't know what in stats' is doing here. Am I understanding correctly up to the point at 4 where I don't? If not, are you able to clarify? Thanks for your help.Armillas
You've got it right. stats' is the updated version of stats. For instance, if stats is just a Int which is counting the number of lines parsed, then you can define stats' as stats+1. Folds are the way you loop in Haskell while carrying around a piece of state from iteration to iteration. Also see this wiki page for more information about strictness and folds.Alius
fwiw I modified this for use with the standard streaming libraries below.Frostbitten
F
2

This is properly done with a streaming library

main = do
  f:_ <- getArgs
  withFile f ReadMode $ \h -> do
       result <- foldStream $ streamProcess $ streamHandle h
       print result
where
 streamHandle  = undefined
 streamProcess = undefined
 foldStream    = undefined

where the blanks can be filled by any streaming library, e.g.

 import qualified Pipes.Prelude as P
 import Pipes
 import qualified Pipes.ByteString as PB
 import Pipes.Group (folds)
 import qualified Control.Foldl as L
 import Control.Lens (view) -- or import Lens.Simple (view), or whatever

 streamHandle =  Pipes.ByteStream.fromHandle :: Handle -> Producer ByteString IO ()

in that case we might then divide the labor further thus:

 streamProcess :: Producer ByteString m r -> Producer LogEntry m r
 streamProcess p =  streamLines p >-> lineParser

 streamLines :: Producer ByteString m r -> Producer ByteString m r
 streamLines p = L.purely fold L.list (view (Pipes.ByteString.lines p)) >-> P.map B.toStrict

 lineParser :: Pipe ByteString LogEntry m r
 lineParser = P.map (parseOnly line_parser) >-> P.concat -- concat removes lefts

(This is slightly laborious because pipes is sensible persnickety about accumulating lines, and memory generally: we are just trying to get a producer of individual strict bytestring lines, and then to convert that into a producer of parsed lines, and then to throw out bad parses, if there are any. With io-streams or conduit, things will be basically the same, and that particular step will be easier.)

We are now in a position to fold over our Producer LogEntry IO (). This can be done explicitly using Pipes.Prelude.fold, which makes a strict left fold. Here we will just cop the structure from user5402

 foldStream str = P.fold go initial_stats id
  where
   go stats_till_now new_entry = undefined

If you get used to the use of the foldl library and the application of a fold to a Producer with L.purely fold some_fold, then you can build Control.Foldl.Folds for your LogEntries out of components and slot in different requests as you please.

If you use pipes-attoparsec and include the newline bit in your parser, then you can just write

 handleToLogEntries :: Handle -> Producer LogEntry IO ()
 handleToLogEntries h = void $ parsed my_line_parser (fromHandle h) >-> P.concat

and get the Producer LogEntry IO () more directly. (This ultra-simple way of writing it will, however, stop at a bad parse; dividing on lines first will be faster than using attoparsec to recognize newlines.) This is very simple with io-streams too, you would write something like

import qualified System.IO.Streams as Streams

io :: Handle -> IO ()
io h = do  
    bytes <- Streams.handleToInputStream h
    log_entries <- Streams.parserToInputStream my_line_parser bytes
    fold_result <- Stream.fold go initial_stats log_entries
    print fold_result

or to keep with the structure above:

 where 
  streamHandle = Streams.handleToInputStream
  streamProcess io_bytes = 
      io_bytes >>= Streams.parserToInputStream my_line_parser
  foldStream io_logentries =
      log_entries >>= Stream.fold go initial_stats 

Either way, my_line_parser should return a Maybe LogEntry and should recognize the newline.

Frostbitten answered 11/9, 2015 at 11:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.