My goal for this app is to create logic that monitors the database and will trigger actions when a document is added to the database (like sending an email). However, since this application may not be started when a database is first populated, how can I manually create a ResumeToken that points to the very first document that was added into a collection, so that on the first run, I can start at the beginning and iterate through the changes until I reach the end. I recognize that I'll need to store the ResumeToken from the lastChangeStreamDocument for future restarts, but I'm interested in the "first run" scenario. I though enumerator.Reset();
was the correct option, but it threw an exception indicating that it wasn't supported.
I've followed the test provided in https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs and have successfully configured a Change Stream with the following code
mongoClient = mongoClient ?? new MongoClient(ConnectionString); //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");
try
{
var cursor = collection.Watch();
var enumerator = cursor.ToEnumerable().GetEnumerator();
enumerator.MoveNext(); //Blocks until a record is UPDATED in the database
var lastChangeStreamDocument = enumerator.Current;
enumerator.Dispose();
//lastChangeStreamDocument.FullDocument.Should().Be(document);
}
catch( Exception ex)
{
Logger.WriteException(ex);
}
However, with this code the enumerator.MoveNext() line blocks until an document is UPDATED, so I can only get reference to documents updated after I setup the Change Stream.
I had the idea to search the local.oplog database and get the UUID of the first document inserted into the collection and was successful, however, I don't see a way to convert this reference over to a ResumeToken object that I can feed the watch method.
Update:
The ResumeToken appears to be stored as Base64 that contains a timestamp, o._id ObjectID and also the ui UUID from the oplog entry. I need to traverse the code a little more, but it appears from this source code (https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp) that there are different formats of ResumeTokens. With this information, hopefully I can build my own ResumeToken to match the format the database is expecting.
Update #2:
After more research, I stumbled across the code for parsing a key_string
in mongo at github.com/mongodb/mongo/src/mongo/db/storage/key_string.cpp. This file contains the definition of CType. I decoded the Base64 to a byte array, then with the CType enum definitions I was able to understand a little more about how to build my own ResumeToken.
Consider the following example: I captured a ResumeToken on a ChangeStream after I updated a document.
glp9zsgAAAABRmRfaWQAZFp9zH40PyabFRwB/ABaEAQESw1YexhL967nKLXsT5Z+BA==
This decoded to the byte array:
82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04
Which I have decoded to be:
//Timestamp (of oplog entry??)
82 //CType::TimeStamp
5a 7d ce c8 00 00 00 01 //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
// that integer converts to 0x5A7DCEC8
//Unknown Object
46 //CType::Object
64 5f 69 64 //Either expecting a 32b value or null terminated
00 //Null terminator or divider
//Document ID
64 //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc //o._id value from oplog entry
00 //OID expecting null terminated
//UUID
5a //CType::BinData
10 //Length (16b)
04 //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e //UUID value from oplog entry
04 //Unknown byte. Perhaps end of ResumeToken, or end of UUID mark?
The problem I have now is that if I many oplog entries for a collection, and I use the ts, ui and o._id values from the first entry in the oplog to build my own ResumeToken (hard-coding the unknown 0x4664 5f69 6400
block and also the ending 0x04
byte, then the server accepts this as a valid ResumeToken when setting up collection.Watch
. However, the document returned by the enumerator.moveNext() call always returns the 3rd oplog entry and not the 2nd one!
I'm nervous relying on this in production without knowing the purpose of that 12Byte block and also without knowing why I'm pointing at the 3rd and not 2nd entry.
Update #3:
Those blocks of bytes in question:
46 64 5f 69 64 00
0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL
The following block of bytes describes the ObjectId of affected document, or it's "_id" key. So what is the significance of the "d" char?