What is a correct RestrictionT to use for Splittable DoFn reading an unbounded Iterable?
Asked Answered
B

1

6

I am writing a Splittable DoFn to read a MongoDB change stream. It allows me to observe events describing changes to a collection, and I can start reading at an arbitrary cluster timestamp I want, provided oplog has enough history. Cluster timestamps are seconds since epoch combined with the serial number of operation in a given second.

I have looked at other examples of an SDF but all I have seen so far assume a "seekable" data source (Kafka topic-partition, Parquet/Avro file, etc.)

The interface exposed by MongoDB is a simple Iterable, so I cannot really seek to a precise offset (aside from getting a new Iterable starting after a timestamp), and events produced by it have only cluster timestamps - again, no precise offset associated with an output element.

To configure the SDF I use the following class as my input element type:

  public static class StreamConfig implements Serializable {
    public final String databaseName;
    public final String collectionName;
    public final Instant startFrom;

  ...
  }

As the restriction I am using an OffsetRange since I can convert those timestamps into Long values and back. For offset tracker I chose a GrowableOffsetRangeTracker since this once can handle a potentially infinite range.

I have had problems coming up with a range end estimator - in the end I assumed now() would be the maximum potential timestamp since the fastest we can read the stream is real-time.

  @GetInitialRestriction
  public OffsetRange getInitialRestriction(@Element StreamConfig element) {
    final int fromEpochSecond =
        (int) (Optional.ofNullable(element.startFrom).orElse(Instant.now()).getMillis() / 1000);
    final BsonTimestamp bsonTimestamp = new BsonTimestamp(fromEpochSecond, 0);
    return new OffsetRange(bsonTimestamp.getValue(), Long.MAX_VALUE);
  }

  @NewTracker
  public GrowableOffsetRangeTracker newTracker(@Restriction OffsetRange restriction) {
    return new GrowableOffsetRangeTracker(restriction.getFrom(), new MongoChangeStreamEstimator());
  }

  public static class MongoChangeStreamEstimator implements RangeEndEstimator {
    @Override
    public long estimate() {
      // estimating the range to current timestamp since we're reading them in real-time
      return new BsonTimestamp((int) (Instant.now().getMillis() / 1000L), Integer.MAX_VALUE)
          .getValue();
    }
  }

Is there a better choice of a restriction type in such situation - infinite stream of elements with timestamps but no designated offset?

Also this implementation seems to consume a lot of CPU when run on DirectRunner - tryClaim returns false which seems to open a lot of new iterators.

Is there a way to tell Beam not to split the restriction or parallelize this operation less aggressively?


@ProcessElement
  public ProcessContinuation process(
      @Element StreamConfig element,
      RestrictionTracker<OffsetRange, Long> tracker,
      OutputReceiver<ChangeStreamDocument<BsonDocument>> outputReceiver) {
    final BsonTimestamp restrictionStart =
        new BsonTimestamp(tracker.currentRestriction().getFrom());

    final MongoCollection<BsonDocument> collection = getCollection(element);

    final ChangeStreamIterable<BsonDocument> iterable =
        collection.watch().startAtOperationTime(restrictionStart);
    final long restrictionEnd = tracker.currentRestriction().getTo();

    try {

      final MongoCursor<ChangeStreamDocument<BsonDocument>> iterator = iterable.iterator();
      while (iterator.hasNext()) {
        ChangeStreamDocument<BsonDocument> changeStreamDocument = iterator.next();
        final BsonTimestamp clusterTime = changeStreamDocument.getClusterTime();
        final long clusterTimeValue = clusterTime.getValue();

        if (clusterTimeValue >= restrictionEnd) {
          LOGGER.warn(
              "breaking out: " + clusterTimeValue + " outside restriction " + restrictionEnd);
          break;
        }

        if (!tracker.tryClaim(clusterTimeValue)) {
          LOGGER.warn("failed to claim " + clusterTimeValue);
          iterator.close();
          return ProcessContinuation.stop();
        }

        final int epochSecondsClusterTs = clusterTime.getTime();

        outputReceiver.outputWithTimestamp(
            changeStreamDocument, Instant.ofEpochSecond(epochSecondsClusterTs));
      }
    } catch (MongoNodeIsRecoveringException | MongoChangeStreamException | MongoSocketException e) {

      LOGGER.warn("Failed to open change stream, retrying", e);
      return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(10L));
    }

    return ProcessContinuation.resume();
  }
Burkhart answered 27/9, 2021 at 9:40 Comment(1)
Note that the clusterTimeValue >= restrictionEnd condition is redundant with the tryClaim.Brainstorming
B
2

Using the the timestamp as the offset is a perfectly fine thing to use as for the restriction, as long as you are able to guarantee you are able to read all elements up to a given timestamp. (The loop above assumes that the iterator yields elements in timestamp order, specifically, that once you see a timestamp outside the range you can exit the loop and not worry about earlier elements in later parts of the iterator.)

As for why tryClaim is failing so often, this is likely because the direct runner does fairly aggressive splitting: https://github.com/apache/beam/blob/release-2.33.0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178

Brainstorming answered 29/9, 2021 at 23:41 Comment(4)
Sounds like the elements will arrive in timestamp order according to MongoDB docs: Change streams provide a total ordering of changes across shards by utilizing a global logical clock. MongoDB guarantees the order of changes are preserved and change stream notifications can be safely interpreted in the order received. For example, a change stream cursor opened against a 3-shard sharded cluster returns change notifications respecting the total order of those changes across all three shards. docs.mongodb.com/manual/administration/…Burkhart
Would providing my own DoFn.SplitRestriction make sense in this use case? If I understand correctly I could then build my own logic to checkpoint, say every 1 minute (based on the cluster timestamp)Burkhart
DoFn.SplitRestriction only impacts the initial restrictions.Brainstorming
You could, however, force larger chunks by rounding your timestamps to a lower granularity. E.g. you could only claim the timestamps rounded down to the nearest 10 or 60 or whatever seconds.Brainstorming

© 2022 - 2024 — McMap. All rights reserved.