Skip to content

Commit

Permalink
refactor: optimize files sync for onedrive
Browse files Browse the repository at this point in the history
  • Loading branch information
amuwal committed Dec 18, 2024
1 parent 85f26df commit 78f8c71
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 36 deletions.
64 changes: 34 additions & 30 deletions packages/api/src/@core/sync/sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,20 @@ export class CoreSyncService {
integrationId: provider,
linkedUserId: linkedUserId,
}),
];
if (provider == 'googledrive') {
tasks.push(() =>
() =>
this.registry.getService('filestorage', 'file').syncForLinkedUser({
integrationId: provider,
linkedUserId: linkedUserId,
}),
);
}
];
// if (provider == 'googledrive') {
// tasks.push(() =>
// this.registry.getService('filestorage', 'file').syncForLinkedUser({
// integrationId: provider,
// linkedUserId: linkedUserId,
// }),
// );
// }
for (const task of tasks) {
try {
await task();
Expand All @@ -456,30 +461,30 @@ export class CoreSyncService {
},
});

const folders = await this.prisma.fs_folders.findMany({
where: {
id_connection: connection.id_connection,
},
});
if (provider !== 'googledrive') {
const filesTasks = folders.map(
(folder) => async () =>
this.registry.getService('filestorage', 'file').syncForLinkedUser({
integrationId: provider,
linkedUserId: linkedUserId,
id_folder: folder.id_fs_folder,
}),
);

for (const task of filesTasks) {
try {
await task();
} catch (error) {
console.log(error);
this.logger.error(`File Task failed: ${error.message}`, error);
}
}
}
// const folders = await this.prisma.fs_folders.findMany({
// where: {
// id_connection: connection.id_connection,
// },
// });
// if (provider !== 'googledrive') {
// const filesTasks = folders.map(
// (folder) => async () =>
// this.registry.getService('filestorage', 'file').syncForLinkedUser({
// integrationId: provider,
// linkedUserId: linkedUserId,
// id_folder: folder.id_fs_folder,
// }),
// );

// for (const task of filesTasks) {
// try {
// await task();
// } catch (error) {
// console.log(error);
// this.logger.error(`File Task failed: ${error.message}`, error);
// }
// }
// }
}

async handleEcommerceSync(provider: string, linkedUserId: string) {
Expand Down Expand Up @@ -539,7 +544,6 @@ export class CoreSyncService {
}
}


// we must have a sync_jobs table with 7 (verticals) rows, one of each is syncing details
async getSyncStatus(vertical: string) {
try {
Expand Down
36 changes: 32 additions & 4 deletions packages/api/src/filestorage/file/services/onedrive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { OnedriveFileOutput } from './types';
export class OnedriveService implements IFileService {
private readonly MAX_RETRIES: number = 5;
private readonly INITIAL_BACKOFF_MS: number = 1000;
private readonly BATCH_SIZE: number = 20;

constructor(
private prisma: PrismaService,
Expand Down Expand Up @@ -51,16 +52,43 @@ export class OnedriveService implements IFileService {
if (folder && folder.remote_id !== 'root') {
foldersToSync.push(folder.remote_id);
}
} else {
const folders = await this.prisma.fs_folders.findMany({
where: {
id_connection: connection.id_connection,
},
select: {
remote_id: true,
},
});
foldersToSync.push(...folders.map((folder) => folder.remote_id));
}

const allFiles: OnedriveFileOutput[] = [];

for (const folderId of foldersToSync) {
const files = await this.syncFolder(connection, folderId);
allFiles.push(...files);
// Process folders in batches
for (let i = 0; i < foldersToSync.length; i += this.BATCH_SIZE) {
const batch = foldersToSync.slice(i, i + this.BATCH_SIZE);

// Synchronize all folders in the current batch concurrently
const filePromises = batch.map(async (folderId) => {
const files = await this.syncFolder(connection, folderId);
return files;
});

// Await all promises for the current batch
const batchFiles = await Promise.all(filePromises);
const flatBatchFiles = batchFiles.flat();
allFiles.push(...flatBatchFiles);

this.logger.log(
`Batch ${i / this.BATCH_SIZE + 1} completed: got ${
flatBatchFiles.length
} files.`,
);
}

this.logger.log(`Synced OneDrive files from root and specified folder!`);
this.logger.log(`Synced ${allFiles.length} files for onedrive.`);
return {
data: allFiles,
message: "OneDrive's files retrieved from root and specified folder",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ export class OnedriveService implements IFolderService {
const folders: OnedriveFolderOutput[] =
await this.iterativeGetOnedriveFolders('root', linkedUserId);

this.logger.log(`${folders.length} OneDrive folders found`);
this.logger.log('OneDrive folders synced successfully.');
this.logger.log(
`${folders.length} OneDrive folders synced successfully.`,
);

return {
data: folders,
Expand Down
6 changes: 6 additions & 0 deletions packages/api/swagger/swagger-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,12 @@ paths:
responses:
'200':
description: Pull frequency updated successfully
'201':
description: ''
content:
application/json:
schema:
type: object
tags: *ref_5
x-speakeasy-group: sync
get:
Expand Down

0 comments on commit 78f8c71

Please sign in to comment.