The ability to search and discover things is a must have feature in nearly every platform we use. When developers think about tackling search, the full-text search engine, Elasticsearch, is often the first solution that comes to mind.
The platform that I’m currently working on uses MongoDB for storage and retrieval. Search is optimised using the $text
operator. It works well for simple use cases but fails to account for typos, phrases and the overall accuracy and performance did not meet our expectations.
To improve the search experience, I decided to integrate Elasticsearch into the platform for retrieval. One of the challenges with using two different data stores is keeping them in sync. Updates to MongoDB needed to be synced to Elasticsearch in near real-time.
Mongo Change Streams
I stumbled upon Mongo Change Streams which appeared to fit the use case perfectly. To utilize change streams, we must use a MongoDB replica set, reason being that the change stream works by using the oplog.
My approach was to setup a background worker that listens to events emitted by the change stream and publish them to Elasticsearch.
Opening a change stream against a collection is as simple as:
db.collection('person').watch()
Person.watch() // directly using the model
Change Events
A change event typically has the following structure:
{
_id : { <BSON Object> },
"operationType" : "<operation>",
"fullDocument" : { <document> },
"ns" : {
"db" : "<database>",
"coll" : "<collection>"
},
"to" : {
"db" : "<database>",
"coll" : "<collection>"
},
"documentKey" : { "_id" : <value> },
"updateDescription" : {
"updatedFields" : { <document> },
"removedFields" : [ "<field>", ... ]
}
"clusterTime" : <Timestamp>,
"txnNumber" : <NumberLong>,
"lsid" : {
"id" : <UUID>,
"uid" : <BinData>
}
}
_id
contains the event identifier and is used for resuming change streams
fullDocument
contains a point-in-time snapshot of the document.
documentKey
contains the _id
of the document in the event.
operationType
can be one of the following:
- insert: document is inserted
- update: document is updated
- replace: document is replaced
- delete: document is deleted
- drop: collection is dropped
- rename: collection is renamed
- dropDatabase: database is dropped
- invalidate: fired after a drop, rename or dropDatabase event
update
events by default have the updateDescription
field which indicates the fields that were updated or removed. If we want to get a point-in-time snapshot of the entire document, we have to specify the updateLookup: 'fullDocument'
option when opening the change stream.
const options = {
updateDocument: 'fullLookup'
}
Person.watch(options)
Filtering change events
When opening a change stream, we can limit the number of events returned using an aggregation pipeline. This is extremely useful if we only care about certain types of events or data.
Filtering for insert events
const pipeline = [
{
$match: {
operationType: 'insert'
}
}
]
Person.watch(pipeline)
Filtering for events that matche person documents with name = Jane
const pipeline = [
{
$match: {
'fullDocument.name': 'Jane'
}
}
]
Person.watch(pipeline)
Publishing events to Elasticsearch
Now comes the fun part. The .watch()
function returns a change stream cursor that we can iterate over to retrieve events. We can then perform transformations on the event and publish them to Elasticsearch.
const pipeline = [{ $match: { operationType: 'insert' } }]
const changeStream = Person.watch(pipeline, options)
while (!changeStream.isExhausted()) {
if (changeStream.hasNext()) {
const newEvent = changeStream.next()
// perform ES logic
}
}
Resuming a change stream
With any technology, chances are they will fail at some point. Building fault-tolerant systems are part and parcel of a developer’s life. If a change stream terminates due to any reason, we can resume the stream from a certain point by using either the startAfter
and resumeAfter
option.
Every change event has an _id
which serves as the resume token. We can save this value in a quick access storage system like redis.
const options = {
startAfter: getResumeTokenFromRedis()
}
const changeStream = Person.watch(options)
while (!changeStream.isExhausted()) {
if (changeStream.hasNext()) {
const newEvent = changeStream.next()
// perform ES logic
saveResumeTokenToCache(newEvent._id)
}
}
Change streams allow us to implement an application built on the Change Data Capture (CDC) design pattern where data changes are tracked and events are performed based on the changes.