Skip to content

Commit

Permalink
fix: add retries to read ops
Browse files Browse the repository at this point in the history
  • Loading branch information
adrians5j committed Aug 16, 2024
1 parent d95d4e6 commit 307efff
Showing 1 changed file with 91 additions and 10 deletions.
101 changes: 91 additions & 10 deletions packages/migrations/src/migrations/5.39.6/001/ddb-es/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ const createInitialStatus = (): MigrationStatus => {
};
};

const BATCH_WRITE_MAX_CHUNK = 20;
let BATCH_WRITE_MAX_CHUNK = 20;
if (process.env.WEBINY_MIGRATION_5_39_6_001_BATCH_WRITE_MAX_CHUNK) {
BATCH_WRITE_MAX_CHUNK = parseInt(process.env.WEBINY_MIGRATION_5_39_6_001_BATCH_WRITE_MAX_CHUNK);
}

(async () => {
const logger = createPinoLogger(
Expand Down Expand Up @@ -194,12 +197,28 @@ const BATCH_WRITE_MAX_CHUNK = 20;
// Get the oldest revision's `createdOn` value. We use that to set the entry-level `createdOn` value.
const createdOn = await getOldestRevisionCreatedOn({
entry: item,
entryEntity: ddbEntryEntity
entryEntity: ddbEntryEntity,
retryOptions: {
onFailedAttempt: error => {
logger.warn(
{ error, item },
`getOldestRevisionCreatedOn attempt #${error.attemptNumber} failed: ${error.message}`
);
}
}
});

const firstLastPublishedOnByFields = await getFirstLastPublishedOnBy({
entry: item,
entryEntity: ddbEntryEntity
entryEntity: ddbEntryEntity,
retryOptions: {
onFailedAttempt: error => {
logger.warn(
{ error, item },
`getFirstLastPublishedOnBy attempt #${error.attemptNumber} failed: ${error.message}`
);
}
}
});

assignNewMetaFields(item, {
Expand All @@ -226,7 +245,15 @@ const BATCH_WRITE_MAX_CHUNK = 20;
try {
const fallbackIdentity = await getFallbackIdentity({
entity: ddbEntryEntity,
tenant: item.tenant
tenant: item.tenant,
retryOptions: {
onFailedAttempt: error => {
logger.warn(
{ error, item },
`getFallbackIdentity attempt #${error.attemptNumber} failed: ${error.message}`
);
}
}
});

ensureAllNonNullableValues(item, {
Expand Down Expand Up @@ -273,9 +300,20 @@ const BATCH_WRITE_MAX_CHUNK = 20;
/**
* Get all the records from DynamoDB Elasticsearch.
*/
const ddbEsRecords = await batchReadAll<DynamoDbElasticsearchRecord>({
table: ddbEsEntryEntity.table,
items: Object.values(ddbEsItemsToBatchRead)
const executeBatchReadAll = () => {
return batchReadAll<DynamoDbElasticsearchRecord>({
table: ddbEsEntryEntity.table,
items: Object.values(ddbEsItemsToBatchRead)
});
};

const ddbEsRecords = await executeWithRetry(executeBatchReadAll, {
onFailedAttempt: error => {
logger.warn(
{ error, items: Object.values(ddbEsItemsToBatchRead) },
`[DDB-ES Table] Batch (ddbEsItemsToBatchRead) read attempt #${error.attemptNumber} failed: ${error.message}`
);
}
});

for (const ddbEsRecord of ddbEsRecords) {
Expand All @@ -294,12 +332,34 @@ const BATCH_WRITE_MAX_CHUNK = 20;
// Get the oldest revision's `createdOn` value. We use that to set the entry-level `createdOn` value.
const createdOn = await getOldestRevisionCreatedOn({
entry: { ...decompressedData, PK: ddbEsRecord.PK },
entryEntity: ddbEntryEntity
entryEntity: ddbEntryEntity,
retryOptions: {
onFailedAttempt: error => {
logger.warn(
{
error,
item: { ...decompressedData, PK: ddbEsRecord.PK }
},
`[DDB-ES Table] getOldestRevisionCreatedOn attempt #${error.attemptNumber} failed: ${error.message}`
);
}
}
});

const firstLastPublishedOnByFields = await getFirstLastPublishedOnBy({
entry: { ...decompressedData, PK: ddbEsRecord.PK },
entryEntity: ddbEntryEntity
entryEntity: ddbEntryEntity,
retryOptions: {
onFailedAttempt: error => {
logger.warn(
{
error,
item: { ...decompressedData, PK: ddbEsRecord.PK }
},
`[DDB-ES Table] getFirstLastPublishedOnBy attempt #${error.attemptNumber} failed: ${error.message}`
);
}
}
});

assignNewMetaFields(decompressedData, {
Expand All @@ -321,7 +381,15 @@ const BATCH_WRITE_MAX_CHUNK = 20;
try {
const fallbackIdentity = await getFallbackIdentity({
entity: ddbEntryEntity,
tenant: decompressedData.tenant
tenant: decompressedData.tenant,
retryOptions: {
onFailedAttempt: error => {
logger.warn(
{ error, item: ddbEntryEntity },
`[DDB-ES Table] getFallbackIdentity attempt #${error.attemptNumber} failed: ${error.message}`
);
}
}
});

ensureAllNonNullableValues(decompressedData, {
Expand Down Expand Up @@ -482,6 +550,19 @@ const BATCH_WRITE_MAX_CHUNK = 20;

// Continue further scanning.
return true;
},
{
retry: {
onFailedAttempt: error => {
logger.warn(
{
lastEvaluatedKey: status.lastEvaluatedKey,
error
},
`ddbScanWithCallback attempt #${error.attemptNumber} failed: ${error.message}`
);
}
}
}
);

Expand Down

0 comments on commit 307efff

Please sign in to comment.