← back

Change
Streams

FILE  41_change_streams
TOPIC  watch() · Event Types · Resume Tokens · Pipeline Filtering · fullDocument · Use Cases
LEVEL  Intermediate/Advanced
01
Overview & Requirements
Real-time change notification from the oplog
overview

Change Streams (v3.6+) provide a real-time stream of database changes by tailing the oplog internally. Unlike direct oplog access, Change Streams are a stable, documented API that abstracts the internal format, handles failovers automatically, and supports filtering with aggregation pipeline stages.

RequirementDetails
MongoDB version3.6+ for collection-level; 4.0+ for database and client level
DeploymentReplica set or sharded cluster (requires oplog). Standalone is NOT supported
Storage engineWiredTiger only
Read concernMajority read concern must be available (default on replica sets)
PermissionschangeStream privilege + read access to the collection

Use Cases

Use CasePattern
Real-time notificationsWatch orders collection → push to user when order status changes
Cache invalidationWatch product catalog → evict/update Redis cache on any change
Audit loggingWatch sensitive collections → append to audit trail in separate DB
Data replicationWatch source collection → replicate inserts/updates to another system (Elasticsearch, Kafka)
Denormalization syncWatch users collection → update embedded user data in posts collection
Event-driven microservicesWatch collection → trigger downstream service actions
NOTE
Change Streams are not a message queue. If your consumer is down and misses events, those events are gone unless they are still within the oplog window. For guaranteed delivery, use a resume token to re-open the stream from the last processed event — or pair with a message broker (Kafka, RabbitMQ) for durable delivery.
02
watch() API
Collection, database, and client-level streams
watch
// ── Collection-level watch (most common) ──────────────────────────────────
const changeStream = db.collection("orders").watch()

// Iterate with async iteration (Node.js >= 10)
for await (const change of changeStream) {
  console.log("Change received:", change.operationType)
  console.log("Document:", change.fullDocument)
}

// Iterate with next() — pull-based
while (true) {
  const next = await changeStream.next()
  await processChange(next)
}

// ── Database-level watch (v4.0+) — all collections in the DB ──────────────
const dbStream = db.watch()
for await (const change of dbStream) {
  console.log(change.ns.coll)     // which collection changed
  console.log(change.operationType)
}

// ── Client-level watch (v4.0+) — all databases and collections ──────────────
const clientStream = client.watch()

// ── watch() options ──────────────────────────────────────────────────────
const stream = db.collection("orders").watch(
  [],           // pipeline stages — empty = all events
  {
    fullDocument:             "updateLookup",    // fetch full doc on update events
    fullDocumentBeforeChange: "whenAvailable",   // pre-change snapshot (v6.0+)
    resumeAfter:              resumeToken,       // resume from a prior position
    startAfter:               startToken,        // like resumeAfter but works after invalidate
    startAtOperationTime:     opTime,            // start from a specific server timestamp
    maxAwaitTimeMS:           5000,              // how long to wait for next event
    batchSize:                100,              // documents per server round-trip
  }
)

// Close the stream when done:
await changeStream.close()
TIP
Always wrap changeStream.close() in a finally block or a process signal handler (process.on('SIGINT', ...)) to release the cursor on shutdown. Unclosed change streams hold a server-side cursor and consume connection pool resources.
03
Change Event Document
Structure of every event returned by the stream
event
// Anatomy of a change event document
{
  _id: {                           // resume token — also the event's unique ID
    _data: "826523..."             // hex-encoded resume token string
  },
  operationType: "update",         // "insert"|"update"|"replace"|"delete"|
                                   // "drop"|"rename"|"dropDatabase"|"invalidate"
  clusterTime: Timestamp(...),     // server-side timestamp of the operation
  wallTime:    ISODate(...),       // wall clock time (v6.0+)
  ns: {                            // namespace
    db:   "shopDb",
    coll: "orders"
  },
  documentKey: { _id: ObjectId("...") },  // _id of the affected document

  // ── INSERT events ─────────────────────────────────────────────────────────
  fullDocument: {                  // the complete inserted document
    _id: ObjectId("..."),
    customerId: "C1",
    total: 99.99,
    status: "pending"
  },

  // ── UPDATE events (field-level operators: $set, $inc, etc.) ───────────────
  updateDescription: {
    updatedFields:   { "status": "confirmed", "updatedAt": ISODate("...") },
    removedFields:   [],           // fields removed via $unset
    truncatedArrays: []            // arrays truncated via $pop/$pull
  },
  // fullDocument is null by default for updates — use fullDocument:"updateLookup"

  // ── REPLACE events (replaceOne — full document replacement) ───────────────
  fullDocument: { /* complete new document */ },
  // updateDescription is absent for replace

  // ── DELETE events ─────────────────────────────────────────────────────────
  // fullDocument is null (document is gone)
  // documentKey._id is all you have

  // ── DROP / RENAME events ──────────────────────────────────────────────────
  // to: { db: "...", coll: "..." }  — for rename events only
  // After a drop, the stream receives "invalidate" then closes itself
}
04
Event Types
All operationType values and when they fire
types
operationTypeTriggerfullDocument?updateDescription?
insertinsertOne / insertManyYes — the new docNo
updateupdateOne / updateMany with field operatorsNull by default; use updateLookupYes — changed fields
replacereplaceOne — full document replacementYes — the new docNo
deletedeleteOne / deleteManyNull (doc is gone)No
dropCollection droppedNoNo
renameCollection renamedNoNo
dropDatabaseDatabase dropped (db/client-level streams)NoNo
invalidateCollection/DB dropped/renamed — stream becomes invalidNoNo
// Handle specific event types
for await (const change of changeStream) {
  switch (change.operationType) {
    case "insert":
      await notifyNewOrder(change.fullDocument)
      break

    case "update":
      if (change.updateDescription.updatedFields.status === "shipped") {
        await sendShippingNotification(change.documentKey._id)
      }
      break

    case "replace":
      // Full document replacement — treat as complete refresh
      await syncToCache(change.fullDocument)
      break

    case "delete":
      await removeFromCache(change.documentKey._id)
      break

    case "invalidate":
      // Stream is now closed — collection was dropped or renamed
      console.warn("Change stream invalidated — collection modified")
      break

    default:
      console.log("DDL event:", change.operationType)
  }
}
WARN
An invalidate event closes the stream automatically. After receiving it, calling next() will throw. Wrap your stream loop in a try/catch and re-open when needed. You cannot use resumeAfter after an invalidate — use startAfter (v4.2+) instead, which specifically supports resuming after an invalidate token.
05
Pipeline Filtering
Server-side filtering — only receive events you care about
filtering

The first argument to watch() is an aggregation pipeline applied to change events server-side before being returned to the client. Only $match, $project, $addFields, $replaceRoot, $replaceWith, $redact, and $set/$unset are allowed.

// Only receive insert and update events (skip deletes and DDL)
const stream = db.collection("orders").watch([
  { $match: { operationType: { $in: ["insert", "update"] } } }
])

// Only respond to status field changes specifically
const stream = db.collection("orders").watch([
  {
    $match: {
      operationType: "update",
      "updateDescription.updatedFields.status": { $exists: true }
    }
  }
])

// Filter by field value — requires fullDocument:"updateLookup" for update events
const stream = db.collection("orders").watch(
  [
    { $match: {
        operationType: { $in: ["insert", "update"] },
        "fullDocument.total": { $gte: 1000 }   // high-value orders only
    }}
  ],
  { fullDocument: "updateLookup" }
)

// Project to reduce payload — omit sensitive fields
const stream = db.collection("users").watch([
  { $match: { operationType: { $in: ["insert", "update"] } } },
  { $project: {
    _id:                    1,
    operationType:          1,
    "documentKey._id":      1,
    "fullDocument.email":   1,
    "fullDocument.status":  1,
    "fullDocument.passwordHash": 0   // exclude sensitive data
  }}
])

// Database-level stream: filter to specific collections
const dbStream = db.watch([
  { $match: { "ns.coll": { $in: ["orders", "payments"] } } }
])
NOTE
Filtering with a $match stage happens on the server — events that don't match are never sent to the client. This reduces network traffic and client processing cost, especially on high-write collections. Prefer server-side filtering over client-side filtering whenever possible.
06
Resume Tokens
Fault-tolerant streams — recover from failures without losing events
resilience

Every change event carries a resume token in its _id field. Persist this token after processing each event. If the stream dies (network error, restart), reopen it with resumeAfter to continue exactly from where you left off.

// Persist and resume pattern
let resumeToken = await loadResumeToken()   // load from persistent storage

async function openStream() {
  const opts = resumeToken
    ? { resumeAfter: resumeToken, fullDocument: "updateLookup" }
    : { fullDocument: "updateLookup" }
  return db.collection("orders").watch([], opts)
}

async function run() {
  while (true) {
    let stream
    try {
      stream = await openStream()
      for await (const change of stream) {
        await processChange(change)      // your handler
        resumeToken = change._id         // save token AFTER successful processing
        await saveResumeToken(resumeToken)  // persist to DB/Redis/file
      }
    } catch (err) {
      if (err.name === "MongoNetworkError"
          || err.hasErrorLabel?.("ResumableChangeStreamError")) {
        console.warn("Stream error — retrying:", err.message)
        await new Promise(r => setTimeout(r, 1000))
        continue   // re-open using saved resumeToken
      }
      throw err    // unexpected error — escalate
    } finally {
      await stream?.close()
    }
  }
}

// startAtOperationTime — start from a cluster timestamp (no prior token needed)
const stream = db.collection("orders").watch([], {
  startAtOperationTime: new Date("2024-03-01T00:00:00Z")
})
// Timestamp must be within the current oplog window

// startAfter — like resumeAfter but works after an "invalidate" event (v4.2+)
const stream = db.collection("orders").watch([], {
  startAfter: lastTokenBeforeInvalidate
})
TIP
Save the resume token after successfully processing each event, not before. If the process crashes between receiving the event and saving the token, you will re-process that event on restart (at-least-once delivery) — this is safer than at-most-once. Design your handlers to be idempotent to handle duplicate delivery gracefully.
07
fullDocument & Practical Patterns
Post-image, pre-image, and production use cases
patterns

fullDocument Options

OptionBehaviorNotes
"default"Insert/replace: full doc included. Update: fullDocument is null.No extra read
"updateLookup"For update events: performs a lookup to include the current document stateMay not be exact post-image (race condition possible)
"whenAvailable"Include full doc when available, null otherwise (v6.0+)Needs pre/post images enabled
"required"Include full doc or throw (v6.0+)Needs pre/post images enabled

Pre-image with fullDocumentBeforeChange (v6.0+)

// Enable pre/post images on the collection
db.runCommand({
  collMod: "orders",
  changeStreamPreAndPostImages: { enabled: true }
})

// Open stream with pre-image support
const stream = db.collection("orders").watch([], {
  fullDocument:             "required",
  fullDocumentBeforeChange: "required"
})

for await (const change of stream) {
  if (change.operationType === "update") {
    console.log("Before:", change.fullDocumentBeforeChange)  // old state
    console.log("After:",  change.fullDocument)              // new state
    // Use for: granular auditing, undo operations, diff computation
  }
}
// Pre-images stored in config.system.preimages (auto-expires per retention setting)

Production Patterns

// ── Cache invalidation ────────────────────────────────────────────────────
const stream = db.collection("products").watch([
  { $match: { operationType: { $in: ["insert","update","replace","delete"] } } }
], { fullDocument: "updateLookup" })

for await (const change of stream) {
  const id = change.documentKey._id.toString()
  if (change.operationType === "delete") {
    await redis.del(`product:${id}`)
  } else {
    await redis.set(`product:${id}`, JSON.stringify(change.fullDocument), "EX", 3600)
  }
  resumeToken = change._id
  await saveToken(resumeToken)
}

// ── Denormalization sync ──────────────────────────────────────────────────
// When a user's name changes, update all their embedded posts
const userStream = db.collection("users").watch([
  {
    $match: {
      operationType: "update",
      "updateDescription.updatedFields.name": { $exists: true }
    }
  }
], { fullDocument: "updateLookup" })

for await (const change of userStream) {
  await db.collection("posts").updateMany(
    { "author._id": change.documentKey._id },
    { $set: { "author.name": change.fullDocument.name } }
  )
}

// ── Event-driven microservice bridge ──────────────────────────────────────
// Publish MongoDB changes to Kafka topic for downstream services
for await (const change of changeStream) {
  await kafkaProducer.send({
    topic: `mongo.${change.ns.coll}.${change.operationType}`,
    messages: [{ key: change.documentKey._id.toString(), value: JSON.stringify(change) }]
  })
  await saveResumeToken(change._id)
}
WARN
The "updateLookup" option performs a separate read after the event arrives. If the document was modified again between the event and the lookup, you get the latest state, not the exact post-image of that specific update. For exact post-images use MongoDB 6.0+ with fullDocument: "required" and changeStreamPreAndPostImages enabled on the collection.