Replies: 2 comments 1 reply
-
I was using _id._data as resumeToken, but it shoud be the whole _id |
Beta Was this translation helpful? Give feedback.
1 reply
-
export const changeStreamListen = async <T extends any>(
model: Model<T>,
fn: (data: ChangeStreamData<T>) => void,
) => {
const name = model.collection.name;
const key = `resumetoken:${name}`;
const resumeToken = await RedisCache.get(key);
const clusterTime = await RedisCache.get(`clustertime:${name}`);
const getStartAtOperationTime = () => {
if (clusterTime) {
return Timestamp.fromString(clusterTime.$timestamp);
}
return null;
};
const getResumeOptions = () => {
if (resumeToken) {
return {
resumeAfter: resumeToken,
// startAfter: resumeToken,
// startAtOperationTime,
};
}
const startAtOperationTime = getStartAtOperationTime();
if (startAtOperationTime) {
return {
startAtOperationTime,
};
}
return {};
};
const stream = model.watch([], {
fullDocument: 'updateLookup',
// fullDocumentBeforeChange: 'whenAvailable',
// fullDocumentBeforeChange: 'required',
// batchSize: 100,
...getResumeOptions(),
});
stream.on('change', changeStreamMiddleware(name, fn)).on('error', (err) => {
// eslint-disable-next-line
console.log('change error:', err);
Sentry.setExtra('error', err);
Sentry.captureException(err);
});
}; |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
We are trying to make our change stream more resilient in production
Imagine that we have 1 pod that watch streams of a few collections
what if the pod crashes?
can we resume from where we stopped ?
We are trying to implement this using resumeToken
the problem is that we do not receive the events (insert, updates), that we missed until our pod was not working
what is the recommendation for this?
Beta Was this translation helpful? Give feedback.
All reactions