Skip to content

Commit

Permalink
Merge pull request juju#18323 from manadart/3.5-changestream-size
Browse files Browse the repository at this point in the history
juju#18323

The Juju `txnwatcher` is a foundational watcher in Juju that reads the Mongo change-stream and emits all changes. It is driven by the Mongo `aggregate` command.

The linked bug describes a situation where the size of documents within a single batch processed by this command means that the aggregated return document exceeds the maximum size allowable by Mongo (16MB). This is probably due to something like action results.

We have to-date been processing entire documents in the pipeline, only to serialise them into a struct with just the transaction revision number.

Here we add a `$project` stanza to the `aggregate` command, which limits the fields pulled from the documents.

I verified the efficacy of the change by applying this patch, and comparing before/after the change:
```diff
diff --git a/state/watcher/txnwatcher.go b/state/watcher/txnwatcher.go
index 275caecaeb..b138704844 100644
--- a/state/watcher/txnwatcher.go
+++ b/state/watcher/txnwatcher.go
@@ -4,15 +4,13 @@
 package watcher
 
 import (
- "time"
-
 "github.com/juju/errors"
+ "github.com/juju/juju/wrench"
 "github.com/juju/mgo/v3"
 "github.com/juju/mgo/v3/bson"
 "github.com/juju/worker/v3"
 "gopkg.in/tomb.v2"
-
- "github.com/juju/juju/wrench"
+ "time"
 )
 
 // Hub represents a pubsub hub. The TxnWatcher only ever publishes
@@ -423,10 +421,13 @@ func (w *TxnWatcher) init() (bool, error) {
 }
 
 func (w *TxnWatcher) process(changes []bson.Raw) (bool, error) {
+ var size uintptr
 added := false
 
 for i := len(changes) - 1; i >= 0; i-- {
 changeRaw := changes[i]
+ size += uintptr(len(changeRaw.Data))
+
 change := changeStreamDocument{}
 err := bson.Unmarshal(changeRaw.Data, &change)
 if err != nil {
@@ -495,6 +496,7 @@ func (w *TxnWatcher) process(changes []bson.Raw) (bool, error) {
 w.resumeToken = change.Id
 }
 
+ w.logger.Criticalf("*** size of changes: %d", size)
 return added, nil
 }
 ```
I then added a model and deployed a unit of ubuntu, accumulated all the logs, then used sed/awk to calculate the average size of processed changes for non-zero results.

Before: 1660.7
**After: 875.107**

This means roughly half the data pulled from Mongo and processed in-memory. And this is only for the most trivial example. If we factor large settings documents and action results, the difference could be many times greater.

Shout out to @jameinel for the rubber ducky on this one.

## QA steps

Just boostrap, add a model, deploy a unit. Normal functionality indicates the watchers are firing as expected.

## Documentation changes

None.

## Links

<!-- Link to all relevant specification, documentation, bug, issue or JIRA card. -->

**Launchpad bug:** https://bugs.launchpad.net/juju/+bug/2085840

**Jira card:** [JUJU-7081](https://warthogs.atlassian.net/browse/JUJU-7081)

[JUJU-7081]: https://warthogs.atlassian.net/browse/JUJU-7081?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
  • Loading branch information
jujubot authored Nov 11, 2024
2 parents 5043424 + e66900c commit 8452534
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions state/watcher/txnwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,15 @@ func (w *TxnWatcher) init() (bool, error) {
{"pipeline", []bson.D{
{{"$changeStream", cs}},
{{"$match", match}},
{{"$project", bson.M{
"operationType": 1,
"documentKey": 1,
"ns": 1,
"updateDescription": 1,
"txnNumber": 1,
"fullDocument._id": 1,
"fullDocument.txn-revno": 1,
}}},
}},
{"cursor", bson.D{{"batchSize", 10}}},
{"readConcern", bson.D{{"level", "majority"}}},
Expand Down

0 comments on commit 8452534

Please sign in to comment.