I need an advice on the data structure to use as an atomic change log.
I'm trying to implement the following algorithm. There is a flow of incoming changes updating an in-memory map. In Haskell-like pseudocode it is
update :: DataSet -> SomeListOf Change -> Change -> STM (DataSet, SomeListOf Change)
update dataSet existingChanges newChange = do
...
return (dataSet, existingChanges ++ [newChange])
where DataSet is a map (currently it is the Map from the stm-containers package, https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Map.html). The whole "update" is called from arbitrary number of threads. Some of the Change's can be rejected due to domain semantics, I use throwSTM for that to throw away the effect of the transaction. In case of successful commit the "newChange" is added to the list.
There exists separate thread which calls the following function:
flush :: STM (DataSet, SomeListOf Change) -> IO ()
this function is supposed to take the current snapshot of DataSet together with the list of changes (it has to a consistent pair) and flush it to the filesystem, i.e.
flush data = do
(dataSet, changes) <- atomically $ readTVar data_
-- write them both to FS
-- ...
atomically $ writeTVar data_ (dataSet, [])
I need an advice about the data structure to use for "SomeListOf Change". I don't want to use [Change] because it is "too ordered" and I'm afraid there will be too many conflicts, which will force the whole transaction to retry. Please correct me, if I'm wrong here.
I cannot use the Set (https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Set.html) because I still need to preserve some order, e.g. the order of transaction commits. I could use TChan for it and it looks like a good match (exactly the order of transaction commits), but I don't know how to implement the "flush" function so that it would give the consistent view of the whole change log together with the DataSet.
The current implementation of that is here https://github.com/lolepezy/rpki-pub-server/blob/add-storage/src/RRDP/Repo.hs, in the functions applyActionsToState and rrdpSyncThread, respectively. It uses TChan and seems to do it in a wrong way.
Thank you in advance.
Update: A reasonable answer seems to be like that
type SomeListOf c = TChan [c]
update :: DataSet -> TChan [Change] -> Change -> STM DataSet
update dataSet existingChanges newChange = do
...
writeTChan changeChan $ reverse (newChange : existingChanges)
return dataSet
flush data_ = do
(dataSet, changes) <- atomically $ (,) <$> readTVar data_ <*> readTChan changeChan
-- write them both to FS
-- ...
But I'm still not sure whether it's a neat solution to pass the whole list as an element of the channel.
TChan
is a dead-simple([a], [a])
functional dequeue; it sounds like it might make sense for you to implement your own variation on it. – Ahlgrenupdate
with otherSTM
operations, or does it always run in its own transaction? – Flyover