How do I resume a MongoDB ChangeStream at the first document and not just changes after I start listening
Asked Answered
C

6

26

My goal for this app is to create logic that monitors the database and will trigger actions when a document is added to the database (like sending an email). However, since this application may not be started when a database is first populated, how can I manually create a ResumeToken that points to the very first document that was added into a collection, so that on the first run, I can start at the beginning and iterate through the changes until I reach the end. I recognize that I'll need to store the ResumeToken from the lastChangeStreamDocument for future restarts, but I'm interested in the "first run" scenario. I though enumerator.Reset(); was the correct option, but it threw an exception indicating that it wasn't supported.

I've followed the test provided in https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs and have successfully configured a Change Stream with the following code

mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");

var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");

try
{
    var cursor = collection.Watch();
    var enumerator = cursor.ToEnumerable().GetEnumerator();

    enumerator.MoveNext();  //Blocks until a record is UPDATED in the database
    var lastChangeStreamDocument = enumerator.Current;
    enumerator.Dispose();
    //lastChangeStreamDocument.FullDocument.Should().Be(document);

}
catch( Exception ex)
{
    Logger.WriteException(ex);
}

However, with this code the enumerator.MoveNext() line blocks until an document is UPDATED, so I can only get reference to documents updated after I setup the Change Stream.

I had the idea to search the local.oplog database and get the UUID of the first document inserted into the collection and was successful, however, I don't see a way to convert this reference over to a ResumeToken object that I can feed the watch method.


Update:

The ResumeToken appears to be stored as Base64 that contains a timestamp, o._id ObjectID and also the ui UUID from the oplog entry. I need to traverse the code a little more, but it appears from this source code (https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp) that there are different formats of ResumeTokens. With this information, hopefully I can build my own ResumeToken to match the format the database is expecting.


Update #2:

After more research, I stumbled across the code for parsing a key_string in mongo at github.com/mongodb/mongo/src/mongo/db/storage/key_string.cpp. This file contains the definition of CType. I decoded the Base64 to a byte array, then with the CType enum definitions I was able to understand a little more about how to build my own ResumeToken.

Consider the following example: I captured a ResumeToken on a ChangeStream after I updated a document.

glp9zsgAAAABRmRfaWQAZFp9zH40PyabFRwB/ABaEAQESw1YexhL967nKLXsT5Z+BA==

This decoded to the byte array:

82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04

Which I have decoded to be:

//Timestamp (of oplog entry??)
82    //CType::TimeStamp
5a 7d ce c8 00 00 00 01   //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
//  that integer converts to 0x5A7DCEC8

//Unknown Object
46    //CType::Object
64 5f 69 64     //Either expecting a 32b value or null terminated
00    //Null terminator or divider

//Document ID
64    //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc  //o._id value from oplog entry
00    //OID expecting null terminated

//UUID
5a    //CType::BinData
10    //Length (16b)
04    //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e  //UUID value from oplog entry
04    //Unknown byte. Perhaps end of ResumeToken, or end of UUID mark?

The problem I have now is that if I many oplog entries for a collection, and I use the ts, ui and o._id values from the first entry in the oplog to build my own ResumeToken (hard-coding the unknown 0x4664 5f69 6400 block and also the ending 0x04 byte, then the server accepts this as a valid ResumeToken when setting up collection.Watch. However, the document returned by the enumerator.moveNext() call always returns the 3rd oplog entry and not the 2nd one!

I'm nervous relying on this in production without knowing the purpose of that 12Byte block and also without knowing why I'm pointing at the 3rd and not 2nd entry.


Update #3:

Those blocks of bytes in question:

46 64 5f 69 64 00

0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL

The following block of bytes describes the ObjectId of affected document, or it's "_id" key. So what is the significance of the "d" char?

Ceyx answered 7/2, 2018 at 13:45 Comment(0)
C
21

I've been updating the question with additional information as I worked through this, and I have managed to piece it altogether now, so that it is working.

Below is the code that I've created to:

  1. Find the first entry of a namespace in the local.oplog collection
  2. Generate a ResumeToken from that oplog document (so we resume on the second entry)
  3. Example for testing those function.

Hopefully this code will be beneficial for others who are attempting to do the same.

/// <summary>
/// Locates the first document for the given namespace in the local.oplog collection
/// </summary>
/// <param name="docNamespace">Namespace to search for</param>
/// <returns>First Document found in the local.oplog collection for the specified namespace</returns>
internal static BsonDocument GetFirstDocumentFromOpLog(string docNamespace)
{
    mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
    IMongoDatabase localDB = mongoClient.GetDatabase("local");
    var collection = localDB.GetCollection<BsonDocument>("oplog.rs");

    //Find the documents from the specified namespace (DatabaseName.CollectionName), that have an operation type of "insert" (The first entry to a collection must always be an insert)
    var filter = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument>("{ $and: [ { 'ns': '" + docNamespace + "'}, { 'op': 'i'}] }");

    BsonDocument retDoc = null;
    try //to get the first document from the oplog entries
    {       
        retDoc = collection.Find<BsonDocument>(filter).First();
    }
    catch(Exception ex) { /*Logger.WriteException(ex);*/ }
    return retDoc;
}

/// <summary>
/// Takes a document from the OpLog and generates a ResumeToken
/// </summary>
/// <param name="firstDoc">BsonDocument from the local.oplog collection to base the ResumeToken on</param>
/// <returns>A ResumeToken that can be provided to a collection watch (ChangeStream) that points to the firstDoc provided</returns>
private static BsonDocument GetResumeTokenFromOpLogDoc(BsonDocument firstDoc)
{
    List<byte> hexVal = new List<byte>(34);

    //Insert Timestamp of document
    hexVal.Add(0x82);   //TimeStamp Tag
    byte[] docTimeStampByteArr = BitConverter.GetBytes(firstDoc["ts"].AsBsonTimestamp.Timestamp); //Timestamp is an integer, so we need to reverse it
    if (BitConverter.IsLittleEndian) { Array.Reverse(docTimeStampByteArr); }
    hexVal.AddRange(docTimeStampByteArr);

    //Expecting UInt64, so make sure we added 8 bytes (likely only added 4)
    hexVal.AddRange(new byte[] { 0x00, 0x00, 0x00, 0x01 }); //Not sure why the last bytes is a 0x01, but it was present in observed ResumeTokens

    //Unknown Object observed in a ResumeToken
    //0x46 = CType::Object, followed by the string "d_id" NULL
    //This may be something that identifies that the following value is for the "_id" field of the ObjectID given next
    hexVal.AddRange(new byte[] { 0x46, 0x64, 0x5F, 0x69, 0x64, 0x00 }); //Unknown Object, expected to be 32 bits, with a 0x00 terminator

    //Insert OID (from 0._id.ObjectID)
    hexVal.Add(0x64);   //OID Tag
    byte[] docByteArr = firstDoc["o"]["_id"].AsObjectId.ToByteArray();
    hexVal.AddRange(docByteArr);
    hexVal.Add(0x00);   //End of OID

    //Insert UUID (from ui) as BinData
    hexVal.AddRange(new byte[] { 0x5a, 0x10, 0x04 });   //0x5A = BinData, 0x10 is Length (16 bytes), 0x04 is BinDataType (newUUID)
    hexVal.AddRange(firstDoc["ui"].AsByteArray);

    hexVal.Add(0x04);   //Unknown marker (maybe end of resumeToken since 0x04 == ASCII 'EOT')

    //Package the binary data into a BsonDocument with the key "_data" and the value as a Base64 encoded string
    BsonDocument retDoc = new BsonDocument("_data", new BsonBinaryData(hexVal.ToArray()));
    return retDoc;
}


/// <summary>
/// Example Code for setting up and resuming to the second doc
/// </summary>
internal static void MonitorChangeStream()
{
    mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
    IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
    var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");

    var options = new ChangeStreamOptions();
    options.FullDocument = ChangeStreamFullDocumentOption.UpdateLookup;

    try
    {
        var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] } }");  //Works

        //Build ResumeToken from the first document in the oplog collection
        BsonDocument resumeTokenRefDoc = GetFirstDocumentFromOpLog(collection.CollectionNamespace.ToString());
        if (resumeTokenRefDoc != null)
        {
            BsonDocument docResumeToken = GetResumeTokenFromOpLogDoc(resumeTokenRefDoc);
            options.ResumeAfter = docResumeToken;
        }

        //Setup the ChangeStream/Watch Cursor
        var cursor = collection.Watch(pipeline, options);
        var enumerator = cursor.ToEnumerable().GetEnumerator();

        enumerator.MoveNext();  //Blocks until a record is UPDATEd, REPLACEd or INSERTed in the database (thanks to the pipeline arg), or returns the second entry (thanks to the ResumeToken that points to the first entry)

        ChangeStreamDocument<BsonDocument> lastChangeStreamDocument = enumerator.Current;
        //lastChangeStreamDocument is now pointing to the second entry in the oplog, or the just received entry
        //A loop can be setup to call enumerator.MoveNext() to step through each entry in the oplog history and to also receive new events

        enumerator.Dispose();   //Be sure to dispose of the enumerator when finished.
    }
    catch( Exception ex)
    {
        //Logger.WriteException(ex);
    }
}

If anyone has any suggestions on improvements to the code, please offer suggestions. I'm still learning.

Ceyx answered 12/2, 2018 at 17:16 Comment(5)
Thanks for the resumeToken decomposition, this is putting light on my doublts, but I still have a question about this. If you find out the resumeToken for the first valid operation, when you use it to ask changeStream to position, that operation is ignored. Resume Token is intended to let you access the first new record AFTER the one containing the resumeToken you're supposed to have collected. How can you position the changeStream cursor BEFORE in order to "consume" that record too? Any Idea?Rager
@Rager You are correct, the first document would be skipped with this approach. The above code doesn't reflect this, but I designed around this problem with a producer/consumer thread via a queue of the changed documents. The resumeTokenRefDoc is pushed into the queue manually, and also used to get the ResumeToken to feed into the collection.watch options. I then loop around the enumerator.MoveNext() call and each time I push the lastChangeStreamDocument object into the same queue for processing by a consumer thread.Ceyx
This also assumes that the first update (or an update you point to) hasn't rolled off the OpLog history.Maressa
@Maressa - Yes, that is a scenario I didn't cover in this code. The latest version of Mongo added support for providing a Timestamp as a ChangeStreamOption instead of a ResumeToken. I've changed the above code to utilize this approach since that doesn't require me to hope the ResumeToken style change in the future. I'll have to update the answer with the new code that I'm using. The above code also has issues with dealing with a collection that was dropped, then a new collection added with the same name.Ceyx
Until I can add the new code: as best as I can recall, my current approach is to search for the last creation event in the oplog for this collection (search from the end, and get the first), grab the timestamp of that event and use that to start the ChangeStream. To handle the scenario Jeremy pointed out, if there is no creation event, find the oldest event (any type) for the desired collection, then start there. For the documents before then, you'll have to do it the manual way and traverse all the other documents in the database (if using the built in ObjectID, use it's timestamp property)Ceyx
M
6

Many of the above answers are valiant (amazing really)... but ultimately I worry that they are fragile.

Please see the startAtOperationTime parameter of the watch() method. This allows you to start watching a collection from a given point of time. What I don't know is: how to I get and set the server time from MongoDB? To me it doesn't make sense to use this parameter with a client-time value.

Maytime answered 12/8, 2019 at 19:12 Comment(2)
I definitely agree to their fragility. Since Mongo 4.0, there is an option to resume after a timestamp - github.com/mongodb/specifications/blob/master/source/…. This is the method I use now. If you're using the default ObjectID, which includes an embedded timestamp, then you just need to find the oldest document in the collection, extract the timestamp from the ObjectID, the feed it into your changestream with the startAtOperationTime option.Ceyx
If you have access to oplog.rs collection you can simply take a first (oldest) item from it is use timestamp of that item for startAtOperationTime parameter.Lheureux
S
2

I couldn't have done without this post.

It is 2020, but still I have to handle change stream in Mongodb 3.6.

In case of someone try this in Python, upload my poor Python3 code.

def make_resume_token(oplog_doc):
    rt = b'\x82'
    rt += oplog_doc['ts'].time.to_bytes(4, byteorder='big') + oplog_doc['ts'].inc.to_bytes(4, byteorder='big')
    rt += b'\x46\x64\x5f\x69\x64\x00\x64'
    rt += bytes.fromhex(str(oplog_doc['o']['_id']))
    rt += b'\x00\x5a\x10\x04'
    rt += oplog_doc['ui'].bytes
    rt += b'\x04'

    return {'_data' : rt}

cursor = db['COLLECTION_NAME'].watch(resume_after=make_resume_token(oplog_doc))
Salford answered 16/1, 2020 at 7:30 Comment(0)
G
1

I somehow managed to compose a resumeAfter token from the latest document of oplog.rs collection.

Following code is written in Node.js:

const _ = require('lodash');
const { MongoClient } = require('mongodb');

localDB.collection('oplog.rs').findOne(
    {'ns': 'yourCollection'},
    {'sort': {'$natural': -1}},
    (err, doc) => {
        if (err || _.isEmpty(doc)) {
            someErrorCheck();
        }

        const resumeAfterData = [
            '82', // unknown
            doc.ts.toString(16), // timestamp
            '29', // unknown
            '29', // unknown
            '5A', // CType::BinData
            '10', // length (16)
            '04', // BinDataType of newUUID
            doc.ui.toString('hex'), // uuid
            '46', // CType::Object
            '64', // CType::OID (vary from the type of document primary key)
            '5F', // _ (vary from the field name of document primary key)
            '69', // i
            '64', // d
            '00', // null
            '64', // CType::OID (vary from the type of document primary key)
            _.get(doc, 'o2._id', 'o._id').toString('hex'), // ObjectId, update operations have `o2` field and others have `o` field
            '00', // null
            '04', // unknown
        ].join('').toUpperCase();

        console.log(resumeAfterData);
    },
);

But I still don't know what those 82 29 29 04 mean.

Some MongoDB config related to the format of resumeAfter token, here is what I have:

db.adminCommand({getParameter: 1, featureCompatibilityVersion: 1});

{
    "featureCompatibilityVersion" : {
        "version" : "4.0"
    },
    "ok" : 1.0,
    "operationTime" : Timestamp(1546854769, 1)
}
Gaussmeter answered 7/1, 2019 at 9:54 Comment(0)
P
1

Thank you very much for your help.

ResumeToken will be deleted when the application is closed. So you can use the following structure.

                enumerator.MoveNext();
                ChangeStreamDocument<Order> lastChangeStreamDocument = enumerator.Current;
                await UpsertResumeToken(lastChangeStreamDocument.ResumeToken);
                
                enumerator.Dispose();

Let's create a token with Upsert in the first step

public async Task UpsertResumeToken(BsonDocument resumeToken)
    {
        IMongoCollection<ResumeTokenCollection> collection =
            _mongoClient.GetDatabase("COLLECTION_NAME_HERE")
                .GetCollection<ResumeTokenCollection>("ResumeToken");

        ResumeTokenCollection token = collection.Find(_ => true).FirstOrDefault();

        if (token == null)
        {
            ResumeTokenCollection tokenDocument= new ResumeTokenCollection()
            {
                DateTime = DateTime.Now,
                ResumeToken = resumeToken
            };
            
            await collection.InsertOneAsync(tokenDocument);
        }
        else
        {
            UpdateResult updateResult = await collection.UpdateOneAsync(
                Builders<ResumeTokenCollection>.Filter.Eq(t => t.Id, token.Id),
                Builders<ResumeTokenCollection>.Update.Set(s => s.ResumeToken, resumeToken));

            if (updateResult.ModifiedCount == 0)
            {
                throw new Exception("Error...");
            }
        }

Let's check the token when the loop runs again

public async Task<BsonDocument> CheckResumeToken()
    {
        IMongoCollection<ResumeTokenCollection> collection =
            _mongoClient.GetDatabase("COLLECTION_NAME_HERE")
                .GetCollection<ResumeTokenCollection>("ResumeToken");

        ResumeTokenCollection token = collection.Find(_ => true).SortByDescending(t=>t.DateTime).FirstOrDefault();

        if (token != null)
        {
            return token.ResumeToken;
        }

        return null;
    }

Output

Procurance answered 29/7, 2022 at 20:24 Comment(0)
P
1

I have simplified my solution for this by replacing the usage of the StartAfter and instead use StartAtOperationTime.

I save the timestamp of the change alongside the ResumeToken in my own collection, and use the timestamp in place of the actual resumeToken.

The Timestamp I save and use is change.ClusterTime. var timestamp = change.ClusterTime;

foreach (var change in cursor.ToEnumerable(cancellationToken: cancellationToken))
{
    timestamp = change.ClusterTime;
...
}

Then i use the timestamp in the ChangeStreamOptions: changeStreamOptions.StartAtOperationTime = timestamp;

This seems to work when the resumetoken you have saved in your own collection is no longer in the oplog. Because the changestream is started at the first entry after the timestamp if there are none at the exact timestamp.

So to recap: When you start up your app, first you get your stored timestamp. Then you set it in the StartAtOperationTime . If you have no timestamp yet, you can use null for StartAtOperationTime to start of fresh.

Then in your change loop, you save the new timestamp into your collection after successful handling of each change.

You will find code for the other parts in other answers her, so I will not repeat all of it again here.

Pleven answered 24/5 at 13:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.