perform event-sourcing projections over table storage
Asked Answered
P

4

12

I'm creating a tiny event-sourcing-style function app, where every invocation of a function will write an event to table storage. An example of such an event would be:

+------------+---------------+-----------------+
|   Event    |   Timestamp   |   Destination   |
+------------+---------------+-----------------+
| Connect    | 7/1/2019 4:52 | sftp.alex.com   |
| Disconnect | 7/1/2019 4:53 | sftp.liza.com   |
| Connect    | 7/1/2019 4:54 | sftp.yomama.com |
| Connect    | 7/1/2019 4:54 | sftp.alex.com   |
| Connect    | 7/1/2019 4:59 | sftp.liza.com   |
| Disconnect | 7/1/2019 4:59 | sftp.alex.com   |
| Disconnect | 7/1/2019 4:59 | sftp.yomama.com |
| Connect    | 7/1/2019 5:03 | sftp.alex.com   |
+------------+---------------+-----------------+

How do I create a projection over this table?

The main question that I would need to answer is:

How many connections does each destination currently have?

Potsdam answered 9/7, 2019 at 17:24 Comment(2)
Projections are normally real-time and table storage has no change feed or event grid trigger or anything else like it. Why do you need a projection? Why can't you just keep a snapshot of the last state on the side, you just have three fields to update....Toccaratoccata
How can you answer that question - "current connections" with a pattern that is almost always eventually consistent?Jew
C
6

I suppose there would be a lot of records in the table and iterating over all of them is not an option.
So here is a couple of ideas:

  1. Can't you just keep track of number of connections?

    That would be the easiest solution. I have no idea about your app and how it communicates with Azure, but at least there're triggers (although, judging by the supported bindings table, you will need to use some extra services... like for example Queue storage). And in them you should be able to store a current number of connections to each destination in a separate table, incrementing on Connect event and decrementing on Disconnect.

    But in case if you have a single writer (a single server that communicates with Azure) you can keep track of connections right inside your code.

    Also you can save the current number of connections to the table in an extra field. As a bonus, you'll be able to instantly get a number of connections at any given time in the past (at a memory cost).

  2. As you're talking about event-sourcing... then maybe you should use it once more? Idea is still the same: you keep track of Connect and Disconnect events but in some external receiver. As your're writing event-sourcing-style function app I believe it should be easy to create one. And you won't have to depend on extra Azure services.

    Then the only difference from the first idea is that if the receiver dies or disconnects or something - just remember the last events it received and, when the receiver is back online, iterate only over the younger ones.

    This last received event that you should remember (plus the counters) is essentially the snapshot others were talking in the comments.

Clipped answered 1/6, 2020 at 18:59 Comment(0)
B
5

Projections should be decoupled from the Event Stream because they are business driven while the Event Stream is purely a technical aspect.

I assume you are going to use SQL for persisting the projections to simplify the answer, but any Key/Value data store will do.

You can create a DestinationEvents table with the following structure:

+------------------+-----------------+-------------------+
|   Destination    |   Connections   |   Disconnections  |
+------------------+-----------------+-------------------+
| sftp.alex.com    |        3        |        1          |
| sftp.liza.com    |        1        |        1          |
+------------------+-----------------+-------------------+

With proper indexing this should give both fast reads and writes. For extra speed consider something like Redis to cache your projections.

The tricky bit is in solution design, you want it to scale. A naive approach may be to set up a SQL trigger for each write into the Event Stream, but this will slow you down if you have loads of writes.

If you want scalability, you need to start thinking about budget(time and money) and business requirements. Do the projections need to be available in real time?

  • if Not, you can have a scheduled process that computes/updates projections at a certain interval: daily, hourly, weekly, etc.
  • if Yes, you need to start looking into Queues/Message Brokers (RabbitMQ, Kafka, etc). Now you are entering in Producer/Consumer logic. Your App is the Producer, it publishes events. The EventStream and Projections storage are Consumers, they listen, transform and persist the incoming events. It is possible for the Queue/MessageBroker itself to replace your Event Stream table, it is easy with Kafka.

If you just want to learn, start with defining an in memory projection storage using a Dictionary<string, (int Connections, int Disconnections)> where Destination acts as Key and (int Connections, int Disconnections) is a tuple/class.

If you want to support other Projections, the in memory storage can be a Dictionary<string, Dictionary<string, (int Connections, int Disconnections)>> where the outer dictionary Key is the Projection name.

Bettyannbettye answered 2/6, 2020 at 22:11 Comment(0)
B
4

The basic idea is to replay events over an aggregate to get current state. Below is the code that illustrates it. Warn: it's not a production code, it even doesn't compile.

public class ConnectionCounters
{
    private Dictionary<string, ConnectionCounter> _counters = new Dictionary<string, ConnectionCounter>();

    public IEnumerable<ConnectionCounter> GetCounters()
    {
        return _counters.Values;
    }

    public void Handle(ConnectionEvent @event)
    {
        var counter = GetOrCreateCounter(@event.Destination);
        if (@event is ConnectEvent)
            counter.ConnectionCount += 1;
        if (@event is DisconnectEvent)
            counter.ConnectionCount -= 1;
    }

    private ConnectionCounter GetOrCreateCounter(string destination)
    {
        if (_counters.ContainsKey(destination))
            return _counters[destination];

        var counter = new ConnectionCounter() { Destination = destination };
        _counters[destination] = counter;
        return counter;
    }
}

public class ConnectionCounter
{
    public string Destination { get; set; }
    public int ConnectionCount { get; set; }
}

public class ConnectEvent : ConnectionEvent { }

public class DisconnectEvent : ConnectionEvent { }

public class ConnectionEvent 
{
    public string Destination { get; set; }
}

// .....

private ConnectionCounters _connectionCounters = new ConnectionCounters();
public void Main()
{
    var events = ReadEvents(); // read events somehow
    foreach (var @event in events)
    {
        _connectionCounters.Handle(@event);
    }

    foreach (var counter in _connectionCounters.GetCounters())
        Console.WriteLine($"{counter.Destination} has {counter.ConnectionCount} connections.")
}
Bilge answered 10/7, 2019 at 9:29 Comment(1)
You can optimize this approach by using snapshots so that you aren't recalculating these statistics from scratch each time you want to answer the question, but the concept remains: to answer a temporal question, one that must take into consideration changes that happen over time, you have to process each event. This answer is correct for the question you are asking.Carnatic
B
0

This is a simple counter, it can be shared between threads safely for counting connection for each event destination, you can inject it as a service to all the places that get a connect and disconnect events

example of usage :

    static void Main(string[] args)
    {
        ConnectionsManager connectionsCounter = new ConnectionsManager();

        connectionsCounter.Connnect("sftp.alex.com");
        connectionsCounter.Connnect("sftp.liza.com");
        connectionsCounter.Connnect("sftp.alex.com");
        connectionsCounter.Disconnnect("sftp.alex.com");
        connectionsCounter.Connnect("sftp.alex.com");

        Console.WriteLine($"Count of {"sftp.alex.com"} is {connectionsCounter.GetConnectionCount("sftp.alex.com")}");

        Console.WriteLine(Environment.NewLine + "Count : " + Environment.NewLine);
        foreach (var kvp in connectionsCounter.GetAllConnectionsCount())
        {
            Console.WriteLine($"Count of {kvp.Key} is {kvp.Value}");
        }
    }

Output :

Count of sftp.alex.com is 2

Count :

Count of sftp.alex.com is 2
Count of sftp.liza.com is 1

ConnectionsManager code :

public class ConnectionsManager
{
    private ConcurrentDictionary<string, long> _destinationCounter;

    public ConnectionsManager()
    {
        _destinationCounter = new ConcurrentDictionary<string, long>();
    }

    public long Connnect(string destination)
    {
        long count = _destinationCounter.TryGetValue(destination, out long currentCount)
            ? currentCount + 1 : 1;
        _destinationCounter[destination] = count;
        return count;
    }

    public long Disconnnect(string destination)
    {
        if (_destinationCounter.TryGetValue(destination, out long count))
        {
            count--;
            if (count < 0) { } // Something went wrong

            _destinationCounter[destination] = count;
            return count;
        }
        throw new ArgumentException("Destionation not found", nameof(destination));
    }

    public long GetConnectionCount(string destination)
    {
        if (_destinationCounter.TryGetValue(destination, out long count))
            return count;
        throw new ArgumentException("Destionation not found", nameof(destination));
    }

    public Dictionary<string, long> GetAllConnectionsCount()
    {
        return new Dictionary<string, long>(_destinationCounter);
    }
}
Bill answered 17/8, 2020 at 19:5 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.