I am writing a Splittable DoFn to read a MongoDB change stream. It allows me to observe events describing changes to a collection, and I can start reading at an arbitrary cluster timestamp I want, provided oplog has enough history. Cluster timestamps are seconds since epoch combined with the serial number of operation in a given second.
I have looked at other examples of an SDF but all I have seen so far assume a "seekable" data source (Kafka topic-partition, Parquet/Avro file, etc.)
The interface exposed by MongoDB is a simple Iterable, so I cannot really seek
to a precise offset
(aside from getting a new Iterable starting after a timestamp), and events produced by it have only cluster timestamps - again, no precise offset associated with an output element.
To configure the SDF I use the following class as my input element type:
public static class StreamConfig implements Serializable {
public final String databaseName;
public final String collectionName;
public final Instant startFrom;
...
}
As the restriction I am using an OffsetRange
since I can convert those timestamps into Long values and back.
For offset tracker I chose a GrowableOffsetRangeTracker since this once can handle a potentially infinite range.
I have had problems coming up with a range end estimator - in the end I assumed now()
would be
the maximum potential timestamp since the fastest we can read the stream is real-time.
@GetInitialRestriction
public OffsetRange getInitialRestriction(@Element StreamConfig element) {
final int fromEpochSecond =
(int) (Optional.ofNullable(element.startFrom).orElse(Instant.now()).getMillis() / 1000);
final BsonTimestamp bsonTimestamp = new BsonTimestamp(fromEpochSecond, 0);
return new OffsetRange(bsonTimestamp.getValue(), Long.MAX_VALUE);
}
@NewTracker
public GrowableOffsetRangeTracker newTracker(@Restriction OffsetRange restriction) {
return new GrowableOffsetRangeTracker(restriction.getFrom(), new MongoChangeStreamEstimator());
}
public static class MongoChangeStreamEstimator implements RangeEndEstimator {
@Override
public long estimate() {
// estimating the range to current timestamp since we're reading them in real-time
return new BsonTimestamp((int) (Instant.now().getMillis() / 1000L), Integer.MAX_VALUE)
.getValue();
}
}
Is there a better choice of a restriction type in such situation - infinite stream of elements with timestamps but no designated offset?
Also this implementation seems to consume a lot of CPU when run on DirectRunner - tryClaim
returns false
which seems to open a lot of new iterators.
Is there a way to tell Beam not to split the restriction or parallelize this operation less aggressively?
@ProcessElement
public ProcessContinuation process(
@Element StreamConfig element,
RestrictionTracker<OffsetRange, Long> tracker,
OutputReceiver<ChangeStreamDocument<BsonDocument>> outputReceiver) {
final BsonTimestamp restrictionStart =
new BsonTimestamp(tracker.currentRestriction().getFrom());
final MongoCollection<BsonDocument> collection = getCollection(element);
final ChangeStreamIterable<BsonDocument> iterable =
collection.watch().startAtOperationTime(restrictionStart);
final long restrictionEnd = tracker.currentRestriction().getTo();
try {
final MongoCursor<ChangeStreamDocument<BsonDocument>> iterator = iterable.iterator();
while (iterator.hasNext()) {
ChangeStreamDocument<BsonDocument> changeStreamDocument = iterator.next();
final BsonTimestamp clusterTime = changeStreamDocument.getClusterTime();
final long clusterTimeValue = clusterTime.getValue();
if (clusterTimeValue >= restrictionEnd) {
LOGGER.warn(
"breaking out: " + clusterTimeValue + " outside restriction " + restrictionEnd);
break;
}
if (!tracker.tryClaim(clusterTimeValue)) {
LOGGER.warn("failed to claim " + clusterTimeValue);
iterator.close();
return ProcessContinuation.stop();
}
final int epochSecondsClusterTs = clusterTime.getTime();
outputReceiver.outputWithTimestamp(
changeStreamDocument, Instant.ofEpochSecond(epochSecondsClusterTs));
}
} catch (MongoNodeIsRecoveringException | MongoChangeStreamException | MongoSocketException e) {
LOGGER.warn("Failed to open change stream, retrying", e);
return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(10L));
}
return ProcessContinuation.resume();
}
clusterTimeValue >= restrictionEnd
condition is redundant with the tryClaim. – Brainstorming