Skip to content

Commit

Permalink
feat: onedrive: incremental sync for files
Browse files Browse the repository at this point in the history
  • Loading branch information
amuwal committed Dec 20, 2024
1 parent 019ce08 commit 3fa218d
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 36 deletions.
2 changes: 2 additions & 0 deletions packages/api/src/filestorage/file/file.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { SharepointFileMapper } from './services/sharepoint/mappers';
import { SyncService } from './sync/sync.service';
import { GoogleDriveQueueProcessor } from './services/googledrive/processor';
import { FolderModule } from '../folder/folder.module';
import { OnedriveQueueProcessor } from './services/onedrive/processor';

@Module({
imports: [FolderModule],
Expand All @@ -43,6 +44,7 @@ import { FolderModule } from '../folder/folder.module';
DropboxFileMapper,
GoogleDriveService,
GoogleDriveQueueProcessor,
OnedriveQueueProcessor,
],
exports: [SyncService, ServiceRegistry],
})
Expand Down
237 changes: 201 additions & 36 deletions packages/api/src/filestorage/file/services/onedrive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import axios, { AxiosRequestConfig, AxiosResponse } from 'axios';
import { ServiceRegistry } from '../registry.service';
import { OnedriveFileOutput } from './types';
import { OnedriveService as OnedriveFolderService } from '@filestorage/folder/services/onedrive';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';
import { Connection } from '@@core/connections/@utils/types';
import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified';
import { BullQueueService } from '@@core/@core-services/queues/shared.service';
@Injectable()
export class OnedriveService implements IFileService {
private readonly MAX_RETRIES: number = 6;
Expand All @@ -21,7 +25,9 @@ export class OnedriveService implements IFileService {
private logger: LoggerService,
private cryptoService: EncryptionService,
private registry: ServiceRegistry,
private ingestService: IngestDataService,
private onedriveFolderService: OnedriveFolderService,
private bullQueueService: BullQueueService,
) {
this.logger.setContext(
`${FileStorageObject.file.toUpperCase()}:${OnedriveService.name}`,
Expand All @@ -31,9 +37,13 @@ export class OnedriveService implements IFileService {

// todo: add addFile method

async sync(data: SyncParam): Promise<ApiResponse<OnedriveFileOutput[]>> {
async sync(
data: SyncParam,
deltaLink?: string,
): Promise<ApiResponse<OnedriveFileOutput[]>> {
try {
const { linkedUserId, id_folder } = data;
const { linkedUserId, custom_field_mappings, ingestParams, id_folder } =
data;

const connection = await this.prisma.connections.findFirst({
where: {
Expand All @@ -43,57 +53,103 @@ export class OnedriveService implements IFileService {
},
});

const foldersToSync = ['root'];
// if id_folder is provided, sync only the files in the specified folder
if (id_folder) {
const folder = await this.prisma.fs_folders.findUnique({
where: {
id_fs_folder: id_folder as string,
},
});
if (folder && folder.remote_id !== 'root') {
foldersToSync.push(folder.remote_id);
}
} else {
const folders = await this.prisma.fs_folders.findMany({
where: {
id_connection: connection.id_connection,
remote_was_deleted: false,
},
select: {
remote_id: true,
},
});
foldersToSync.push(...folders.map((folder) => folder.remote_id));
if (folder) {
const files = await this.syncFolder(connection, folder.remote_id);
return {
data: files,
message: 'OneDrive files retrieved from specified folder',
statusCode: 200,
};
}
}

const allFiles: OnedriveFileOutput[] = [];

// Process folders in batches
for (let i = 0; i < foldersToSync.length; i += this.BATCH_SIZE) {
const batch = foldersToSync.slice(i, i + this.BATCH_SIZE);
// if deltaLink is provided
if (deltaLink) {
this.logger.log(
`Syncing OneDrive files from deltaLink: ${deltaLink}`,
'onedrive files sync',
);

// Synchronize all folders in the current batch concurrently
const filePromises = batch.map(async (folderId) => {
const files = await this.syncFolder(connection, folderId);
return files;
});
let files: OnedriveFileOutput[] = [];
let nextDeltaLink: string | null = null;
try {
const { files: batchFiles, nextDeltaLink: batchNextDeltaLink } =
await this.getFilesToSync(connection, deltaLink, 10);

files = batchFiles;
nextDeltaLink = batchNextDeltaLink;
} catch (error: any) {
if (error.response?.status === 410) {
// Delta token expired, start fresh sync
const newDeltaLink = `${connection.account_url}/v1.0/me/drive/root/delta?$top=1000`;
return this.sync(data, newDeltaLink);
}
await this.bullQueueService
.getSyncJobsQueue()
.add('fs_file_onedrive', {
...data,
deltaLink: deltaLink,
connectionId: connection.id_connection,
});

this.logger.error(
`Got 410 error while syncing OneDrive files. Added sync from /delta endpoint to queue to retry.`,
error,
);
}

// Await all promises for the current batch
const batchFiles = await Promise.all(filePromises);
const flatBatchFiles = batchFiles.flat();
allFiles.push(...flatBatchFiles);
if (files.length > 0) {
const ingestedFiles = await this.ingestFiles(
files,
connection,
custom_field_mappings,
ingestParams,
);
this.logger.log(
`Ingested ${ingestedFiles.length} files from OneDrive.`,
'onedrive files ingestion',
);
}

this.logger.log(
`Batch ${i / this.BATCH_SIZE + 1} completed: got ${
flatBatchFiles.length
} files.`,
);
// more files to sync
if (nextDeltaLink) {
await this.bullQueueService
.getThirdPartyDataIngestionQueue()
.add('fs_file_onedrive', {
...data,
deltaLink: nextDeltaLink,
connectionId: connection.id_connection,
});
} else {
this.logger.log(
`No more files to sync from OneDrive.`,
'onedrive files sync',
);
}
} else {
const lastSyncTime = await this.getLastSyncTime(connection);
const deltaLink = lastSyncTime
? `${
connection.account_url
}/v1.0/me/drive/root/delta?$top=1000&token=${lastSyncTime.toISOString()}`
: `${connection.account_url}/v1.0/me/drive/root/delta?$top=1000`; // if no last sync time, get all files

await this.sync(data, deltaLink);
}

this.logger.log(`Synced ${allFiles.length} files for onedrive.`);
return {
data: allFiles,
message: "OneDrive's files retrieved from root and specified folder",
data: [],
message: 'OneDrive files retrieved',
statusCode: 200,
};
} catch (error) {
Expand All @@ -105,6 +161,115 @@ export class OnedriveService implements IFileService {
throw error;
}
}

private async getFilesToSync(
connection: Connection,
deltaLink: string,
batchSize: number, // number of times to call the API
) {
const files: OnedriveFileOutput[] = [];
let nextDeltaLink: string | null = deltaLink;

for (let i = 0; i < batchSize; i++) {
const resp = await this.makeRequestWithRetry({
timeout: 30000,
method: 'get',
url: deltaLink,
headers: {
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},
});

const batchFiles = resp.data.value?.filter((elem: any) => !elem.folder);
files.push(...batchFiles);
nextDeltaLink = resp.data['@odata.nextLink'];

if (!resp.data.value?.length) {
nextDeltaLink = null;
break;
}
}

return { files, nextDeltaLink };
}

async processBatch(job: any) {
const {
linkedUserId,
deltaLink,
connectionId,
custom_field_mappings,
ingestParams,
} = job.data;

// Call the sync method with the pageToken and other job data
await this.sync(
{
linkedUserId,
custom_field_mappings,
ingestParams,
},
deltaLink,
);
}

private async ingestFiles(
files: OnedriveFileOutput[],
connection: Connection,
customFieldMappings?: {
slug: string;
remote_id: string;
}[],
extraParams?: { [key: string]: any },
) {
// Sort files by lastModifiedDateTime in descending order (newest first)
const sortedFiles = [...files].sort((a, b) => {
const dateA = new Date(a.lastModifiedDateTime).getTime();
const dateB = new Date(b.lastModifiedDateTime).getTime();
return dateB - dateA;
});

// Deduplicate files by remote_id, keeping only the first occurrence (which will be the latest version)
const uniqueFiles = sortedFiles.reduce((acc, file) => {
if (!acc.has(file.id)) {
acc.set(file.id, file);
}
return acc;
}, new Map<string, OnedriveFileOutput>());

this.logger.log(
`Deduplicating ${files.length} delta files to ${uniqueFiles.size} unique files`,
'onedrive files ingestion',
);

return this.ingestService.ingestData<
UnifiedFilestorageFileOutput,
OnedriveFileOutput
>(
Array.from(uniqueFiles.values()),
'onedrive',
connection.id_connection,
'filestorage',
'file',
customFieldMappings,
extraParams,
);
}

private async getLastSyncTime(connection: any) {
const lastSyncTime = await this.prisma.fs_files.findFirst({
where: {
id_connection: connection.id_connection,
},
orderBy: {
remote_modified_at: 'desc',
},
});
return lastSyncTime?.remote_modified_at;
}

private async syncFolder(
connection: any,
folderId: string,
Expand Down
21 changes: 21 additions & 0 deletions packages/api/src/filestorage/file/services/onedrive/processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Process, Processor } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Job } from 'bull';
import { Queues } from '@@core/@core-services/queues/types';
import { OnedriveService } from '.';

@Injectable()
@Processor(Queues.THIRD_PARTY_DATA_INGESTION)
export class OnedriveQueueProcessor {
constructor(private readonly onedriveService: OnedriveService) {}

@Process({ name: 'fs_file_onedrive', concurrency: 1 })
async handleOneDriveSync(job: Job): Promise<void> {
try {
await this.onedriveService.processBatch(job);
} catch (error) {
console.error(`Failed to process OneDrive sync job: ${error.message}`);
throw error;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,11 @@ export class OnedriveService implements IFolderService {
continue;
}

// handle 410 gone errors
if (error.response?.status === 410 && config.url.includes('delta')) {
// todo: handle 410 gone errors
}

throw error;
}
}
Expand Down

0 comments on commit 3fa218d

Please sign in to comment.