Synchronize Data From Multiple Data Sources
Asked Answered
A

4

8

Our team is trying to build a predictive maintenance system whose task is to look at a set of events and predict whether these events depict a set of known anomalies or not.

We are at the design phase and the current system design is as follows:

  • The events may occur on multiple sources of an IoT system (such as cloud platform, edge devices or any intermediate platforms)
  • The events are pushed by the data sources into a message queueing system (currently we have chosen Apache Kafka).
  • Each data source has its own queue (Kafka Topic).
  • From the queues, the data is consumed by multiple inference engines (which are actually neural networks).
  • Depending upon the feature set, an inference engine will subscribe to multiple Kafka topics and stream data from those topics to continuously output the inference.
  • The overall architecture follows the single-responsibility principle meaning that every component will be separate from each other and run inside a separate Docker container.

Problem:

In order to classify a set of events as an anomaly, the events have to occur in the same time window. e.g. say there are three data sources pushing their respective events into Kafka topics, but due to some reason, the data is not synchronized. So one of the inference engines pulls the latest entries from each of the kafka topics, but the corresponding events in the pulled data do not belong to the same time window (say 1 hour). That will result in invalid predictions due to out-of-sync data.

Question

We need to figure out how can we make sure that the data from all three sources are pushed in-order so that when an inference engine requests entries (say the last 100 entries) from multiple kakfa topics, the corresponding entries in each topic belong to the same time window?

Agrarian answered 27/5, 2019 at 12:57 Comment(1)
You've asked the pretty interesting question. Maybe this article will lead you to some solution.Crissy
L
1

I would suggest KSQL, which is a streaming SQL engine that enables real-time data processing against Apache Kafka. It also provides nice functionality for Windowed Aggregation etc.

There are 3 ways to define Windows in KSQL:

hopping windows, tumbling windows, and session windows. Hopping and tumbling windows are time windows, because they're defined by fixed durations they you specify. Session windows are dynamically sized based on incoming data and defined by periods of activity separated by gaps of inactivity.

In your context, you can use KSQL to query and aggregate the topics of interest using Windowed Joins. For example,

SELECT t1.id, ...
FROM topic_1 t1
INNER JOIN topic_2 t2
WITHIN 1 HOURS
ON t1.id = t2.id;
Lippi answered 5/6, 2019 at 10:6 Comment(0)
P
1

Some suggestions -

  1. Handle delay at the producer end - Ensure all three producers always send data in sync to Kafka topics by using batch.size and linger.ms. eg. if linger.ms is set to 1000, all messages would be sent to Kafka within 1 second.

  2. Handle delay at the consumer end - Considering any streaming engine at the consumer side (be it Kafka-stream, spark-stream, Flink), provides windows functionality to join/aggregate stream data based on keys while considering delayed window function.

Check this - Flink windows for reference how to choose right window type link

Pigg answered 30/5, 2019 at 7:4 Comment(0)
M
1

To handle this scenario, data sources must provide some mechanism for the consumer to realize that all relevant data has arrived. The simplest solution is to publish a batch from data source with a batch Id (Guid) of some form. Consumers can then wait until the next batch id shows up marking the end of the previous batch. This approach assumes sources will not skip a batch, otherwise they will get permanently mis-aligned. There is no algorithm to detect this but you might have some fields in the data that show discontinuity and allow you to realign the data.

A weaker version of this approach is to either just wait x-seconds and assume all sources succeed in this much time or look at some form of time stamps (logical or wall clock) to detect that a source has moved on to the next time window implicitly showing completion of the last window.

Morales answered 31/5, 2019 at 19:33 Comment(0)
L
1

I would suggest KSQL, which is a streaming SQL engine that enables real-time data processing against Apache Kafka. It also provides nice functionality for Windowed Aggregation etc.

There are 3 ways to define Windows in KSQL:

hopping windows, tumbling windows, and session windows. Hopping and tumbling windows are time windows, because they're defined by fixed durations they you specify. Session windows are dynamically sized based on incoming data and defined by periods of activity separated by gaps of inactivity.

In your context, you can use KSQL to query and aggregate the topics of interest using Windowed Joins. For example,

SELECT t1.id, ...
FROM topic_1 t1
INNER JOIN topic_2 t2
WITHIN 1 HOURS
ON t1.id = t2.id;
Lippi answered 5/6, 2019 at 10:6 Comment(0)
O
1

The following recommendations should maximize success of event synchronization for the anomaly detection problem using timeseries data.

  1. Use a network time synchronizer on all producer/consumer nodes
  2. Use a heartbeat message from producers every x units of time with a fixed start time. For eg: the messages are sent every two minutes at the start of the minute.
  3. Build predictors for producer message delay. use the heartbeat messages to compute this.

With these primitives, we should be able to align the timeseries events, accounting for time drifts due to network delays.

At the inference engine side, expand your windows at a per producer level to synch up events across producers.

Organization answered 5/6, 2019 at 23:1 Comment(3)
Thanks for the suggestions. Your solution is quite reasonable, but since I have limited knowledge of the whole system, I was looking for some practical solutions (tools available to implement the said task) in addition to the conceptual solution.Agrarian
For network time synch, use NTP. This can be done upon node startup or device reboot. Heartbeat messages can be published to a Kafka topic. You simply need ProducerId, TimeStamp, ArrivalTimeStamp. The presence of a message indicates the heartbeat. See: gerardnico.com/dit/kafka/timestamp for a discussion on timestamp extraction.Organization
The message delay predictors can be built using the same Machine Learning stack you are using for the inference engine. Since there can be lost messages, you need to consider a message survivor model such as Cox' proportional hazard to ensure accuracy.Organization

© 2022 - 2024 — McMap. All rights reserved.