Skip to content

Commit

Permalink
refactor: googledrive: use changes api for incremental syncs
Browse files Browse the repository at this point in the history
  • Loading branch information
amuwal committed Dec 22, 2024
1 parent 3ddba56 commit 848e7ad
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 154 deletions.
1 change: 1 addition & 0 deletions packages/api/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ model fs_drives {
name String?
remote_created_at DateTime? @db.Timestamptz(6)
remote_id String?
remote_cursor String?
created_at DateTime @db.Timestamptz(6)
modified_at DateTime @db.Timestamptz(6)
id_connection String @db.Uuid
Expand Down
1 change: 1 addition & 0 deletions packages/api/scripts/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ CREATE TABLE fs_drives
name text NULL,
remote_created_at timestamp with time zone NULL,
remote_id text NULL,
remote_cursor text NULL,
created_at timestamp with time zone NOT NULL,
modified_at timestamp with time zone NOT NULL,
id_connection uuid NOT NULL,
Expand Down
10 changes: 10 additions & 0 deletions packages/api/src/filestorage/drive/types/model.unified.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,14 @@ export class UnifiedFilestorageDriveOutput extends UnifiedFilestorageDriveInput
})
@IsOptional()
modified_at?: Date;

@ApiPropertyOptional({
type: String,
nullable: true,
example: 'next_page_token_123',
description: 'The cursor used for pagination with the 3rd party provider',
})
@IsString()
@IsOptional()
remote_cursor?: string;
}
183 changes: 124 additions & 59 deletions packages/api/src/filestorage/file/services/googledrive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class GoogleDriveService implements IFileService {
}, []);

if (allPermissions.length === 0) {
this.logger.warn('No permissions found in the provided files.');
this.logger.log('No permissions found in the provided files.');
return this.ingestService.ingestData<
UnifiedFilestorageFileOutput,
GoogleDriveFileOutput
Expand Down Expand Up @@ -136,6 +136,11 @@ export class GoogleDriveService implements IFileService {
return syncedFiles;
}

/**
* Syncs files from Google Drive to the local database
* @param data - Parameters required for syncing
* @param pageToken - Used for continuation of initial sync
*/
async sync(data: SyncParam, pageToken?: string) {
const { linkedUserId, custom_field_mappings, ingestParams } = data;
const connection = await this.prisma.connections.findFirst({
Expand All @@ -154,64 +159,65 @@ export class GoogleDriveService implements IFileService {
});
const drive = google.drive({ version: 'v3', auth });

const rootDriveId = await drive.files
.get({
fileId: 'root',
fields: 'id',
})
.then((res) => res.data.id);

let query = "mimeType!='application/vnd.google-apps.folder'";
if (!pageToken) {
const lastSyncTime = await this.getLastSyncTime(connection.id_connection);
if (lastSyncTime) {
console.log(`Last sync time is ${lastSyncTime.toISOString()}`);
query += ` and modifiedTime >= '${lastSyncTime.toISOString()}'`;
const lastSyncTime = await this.getLastSyncTime(connection.id_connection);
const isFirstSync = !lastSyncTime || pageToken;
let syncCompleted = false;

if (isFirstSync) {
// Start or continuation of initial sync
const { filesToSync: files, nextPageToken } = await this.getFilesToSync(
drive,
pageToken,
);

// Process the files fetched in the current batch
if (files.length > 0) {
await this.ingestFiles(
files,
connection.id_connection,
custom_field_mappings,
ingestParams,
);
}
}

const { filesToSync, nextPageToken } = await this.getFilesToSync(
drive,
query,
pageToken,
);
if (nextPageToken) {
// Add the next pageToken to the queue
await this.bullQueueService
.getThirdPartyDataIngestionQueue()
.add('fs_file_googledrive', {
...data,
pageToken: nextPageToken,
connectionId: connection.id_connection,
});
} else {
syncCompleted = true;
}
} else {
// incremental sync using changes api
const { filesToSync, moreChangesToFetch } =
await this.getFilesToSyncFromChangesApi(drive, connection);

const files: GoogleDriveFileOutput[] = filesToSync.map((file) => ({
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
createdTime: file.createdTime!,
modifiedTime: file.modifiedTime!,
size: file.size!,
permissions: file.permissions,
parents: file.parents,
webViewLink: file.webViewLink,
driveId: file.driveId || rootDriveId,
trashed: file.trashed ?? false,
}));

// Process the files fetched in the current batch
if (files.length > 0) {
await this.ingestFiles(
files,
filesToSync,
connection.id_connection,
custom_field_mappings,
ingestParams,
);

if (moreChangesToFetch) {
await this.bullQueueService
.getThirdPartyDataIngestionQueue()
.add('fs_file_googledrive', {
...data,
});
} else {
syncCompleted = true;
}
}

if (nextPageToken) {
// Add the next pageToken to the queue
await this.bullQueueService
.getThirdPartyDataIngestionQueue()
.add('fs_file_googledrive', {
...data,
pageToken: nextPageToken,
connectionId: connection.id_connection,
});
} else {
if (syncCompleted) {
this.logger.log(
`No more files to sync for googledrive for linked user ${linkedUserId}.`,
`Googledrive files sync completed for user: ${linkedUserId}.`,
);
}

Expand All @@ -222,9 +228,75 @@ export class GoogleDriveService implements IFileService {
};
}

// For incremental syncs
async getFilesToSyncFromChangesApi(
drive: ReturnType<typeof google.drive>,
connection: any,
) {
let moreChangesToFetch = false; // becomes true if there are more changes to fetch in any drive
const filesToSync: GoogleDriveFileOutput[] = [];

const remoteCursor = await this.getRemoteCursor(connection);

const response = await this.rateLimitedRequest(() =>
drive.changes.list({
pageToken: remoteCursor,
supportsAllDrives: true,
includeItemsFromAllDrives: true,
pageSize: 1000,
fields:
'nextPageToken, newStartPageToken, changes(file(id,name,mimeType,createdTime,modifiedTime,size,parents,webViewLink,driveId,trashed,permissions))',
}),
);

const { changes, nextPageToken, newStartPageToken } = response.data;

const batchFiles = changes
.filter(
(change) =>
change.file?.mimeType !== 'application/vnd.google-apps.folder',
)
.map((change) => change.file);

filesToSync.push(...(batchFiles as GoogleDriveFileOutput[]));

if (nextPageToken) {
moreChangesToFetch = true;
}

const nextCursor = newStartPageToken ? newStartPageToken : nextPageToken;

// all drives share the same cursor (might update this in the future)
await this.prisma.fs_drives.updateMany({
where: {
id_connection: connection.id_connection,
},
data: {
remote_cursor: nextCursor,
},
});

return {
filesToSync,
moreChangesToFetch,
};
}

private async getRemoteCursor(connection: any) {
const internalDrive = await this.prisma.fs_drives.findFirst({
where: {
id_connection: connection.id_connection,
},
select: {
remote_cursor: true,
id_fs_drive: true,
},
});
return internalDrive?.remote_cursor;
}

async getFilesToSync(
drive: any,
query: string,
pageToken?: string,
pages = 20, // number of times we use nextPageToken
) {
Expand All @@ -242,7 +314,7 @@ export class GoogleDriveService implements IFileService {
do {
const filesResponse = await this.rateLimitedRequest<DriveResponse>(() =>
drive.files.list({
q: query,
q: 'mimeType != "application/vnd.google-apps.folder"',
fields:
'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions, trashed)',
pageSize: BATCH_SIZE,
Expand All @@ -267,15 +339,8 @@ export class GoogleDriveService implements IFileService {
}

async processBatch(job: any) {
const {
linkedUserId,
query,
pageToken,
pageTokenFolders,
connectionId,
custom_field_mappings,
ingestParams,
} = job.data;
const { linkedUserId, pageToken, custom_field_mappings, ingestParams } =
job.data;

// Call the sync method with the pageToken and other job data
await this.sync(
Expand Down
Loading

0 comments on commit 848e7ad

Please sign in to comment.