Skip to content

Commit

Permalink
refactor: optimize googledrive sync
Browse files Browse the repository at this point in the history
  • Loading branch information
amuwal committed Dec 22, 2024
1 parent 848e7ad commit fee0ae2
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 103 deletions.
2 changes: 1 addition & 1 deletion packages/api/src/filestorage/drive/sync/sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class SyncService implements OnModuleInit, IBaseSync {
this.registry.registerService('filestorage', 'drive', this);
}
onModuleInit() {
//
//
}

@Cron('0 */8 * * *') // every 8 hours
Expand Down
202 changes: 138 additions & 64 deletions packages/api/src/filestorage/file/services/googledrive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,27 @@ export class GoogleDriveService implements IFileService {
async ingestFiles(
sourceData: GoogleDriveFileOutput[],
connectionId: string,
drive: ReturnType<typeof google.drive>,
remote_cursor?: string,
customFieldMappings?: {
slug: string;
remote_id: string;
}[],
extraParams?: { [key: string]: any },
): Promise<UnifiedFilestorageFileOutput[]> {
// Extract all permissions from the files
const allPermissions: GoogledrivePermissionOutput[] = sourceData.reduce<
GoogledrivePermissionOutput[]
>((accumulator, file) => {
if (file.permissions?.length) {
accumulator.push(...file.permissions);
}
return accumulator;
}, []);
const permissionsIds: string[] = Array.from(
new Set(
sourceData.reduce<string[]>((accumulator, file) => {
if (file.permissionIds?.length) {
accumulator.push(...file.permissionIds);
}
return accumulator;
}, []),
),
);

if (allPermissions.length === 0) {
if (permissionsIds.length === 0) {
this.logger.log('No permissions found in the provided files.');
return this.ingestService.ingestData<
UnifiedFilestorageFileOutput,
Expand All @@ -72,11 +76,10 @@ export class GoogleDriveService implements IFileService {
);
}

// Remove duplicate permissions based on 'id'
const uniquePermissions: GoogledrivePermissionOutput[] = Array.from(
new Map(
allPermissions.map((permission) => [permission.id, permission]),
).values(),
const uniquePermissions = await this.fetchPermissions(
permissionsIds,
sourceData,
drive,
);

// Ingest permissions using the ingestService
Expand Down Expand Up @@ -105,9 +108,9 @@ export class GoogleDriveService implements IFileService {

// Update each file's permissions with the synced permission IDs
sourceData.forEach((file) => {
if (file.permissions?.length) {
file.permissions = file.permissions
.map((permission) => permissionIdMap.get(permission.id))
if (file.permissionIds?.length) {
file.internal_permissions = file.permissionIds
.map((permissionId) => permissionIdMap.get(permissionId))
.filter(
(permissionId): permissionId is string =>
permissionId !== undefined,
Expand All @@ -133,6 +136,17 @@ export class GoogleDriveService implements IFileService {
`Ingested a batch of ${syncedFiles.length} googledrive files.`,
);

if (remote_cursor) {
await this.prisma.fs_drives.updateMany({
where: {
id_connection: connectionId,
},
data: {
remote_cursor: remote_cursor,
},
});
}

return syncedFiles;
}

Expand Down Expand Up @@ -175,6 +189,8 @@ export class GoogleDriveService implements IFileService {
await this.ingestFiles(
files,
connection.id_connection,
drive,
null,
custom_field_mappings,
ingestParams,
);
Expand All @@ -194,12 +210,14 @@ export class GoogleDriveService implements IFileService {
}
} else {
// incremental sync using changes api
const { filesToSync, moreChangesToFetch } =
const { filesToSync, moreChangesToFetch, remote_cursor } =
await this.getFilesToSyncFromChangesApi(drive, connection);

await this.ingestFiles(
filesToSync,
connection.id_connection,
drive,
remote_cursor,
custom_field_mappings,
ingestParams,
);
Expand Down Expand Up @@ -232,73 +250,84 @@ export class GoogleDriveService implements IFileService {
async getFilesToSyncFromChangesApi(
drive: ReturnType<typeof google.drive>,
connection: any,
maxApiCalls = 10, // number of times we use nextPageToken
) {
let moreChangesToFetch = false; // becomes true if there are more changes to fetch in any drive
const filesToSync: GoogleDriveFileOutput[] = [];
let apiCallCount = 0;
let pageToken: string | undefined;
let newRemoteCursor: string | undefined;

const remoteCursor = await this.getRemoteCursor(connection);

const response = await this.rateLimitedRequest(() =>
drive.changes.list({
pageToken: remoteCursor,
supportsAllDrives: true,
includeItemsFromAllDrives: true,
pageSize: 1000,
fields:
'nextPageToken, newStartPageToken, changes(file(id,name,mimeType,createdTime,modifiedTime,size,parents,webViewLink,driveId,trashed,permissions))',
}),
);
// Get initial cursor
pageToken = await this.getRemoteCursor(drive, connection.id_connection);

const { changes, nextPageToken, newStartPageToken } = response.data;
do {
const response = await this.rateLimitedRequest(() =>
drive.changes.list({
pageToken,
supportsAllDrives: true,
includeItemsFromAllDrives: true,
pageSize: 1000,
fields:
'nextPageToken, newStartPageToken, changes(file(id,name,mimeType,createdTime,modifiedTime,size,parents,webViewLink,driveId,trashed,permissionIds))',
}),
);

const batchFiles = changes
.filter(
(change) =>
change.file?.mimeType !== 'application/vnd.google-apps.folder',
)
.map((change) => change.file);
const { changes, nextPageToken, newStartPageToken } = response.data;

filesToSync.push(...(batchFiles as GoogleDriveFileOutput[]));
const batchFiles = changes
.filter(
(change) =>
change.file?.mimeType !== 'application/vnd.google-apps.folder',
)
.map((change) => change.file);

if (nextPageToken) {
moreChangesToFetch = true;
}
filesToSync.push(...(batchFiles as GoogleDriveFileOutput[]));

const nextCursor = newStartPageToken ? newStartPageToken : nextPageToken;
// Update pageToken for next iteration
pageToken = nextPageToken;
apiCallCount++;

// all drives share the same cursor (might update this in the future)
await this.prisma.fs_drives.updateMany({
where: {
id_connection: connection.id_connection,
},
data: {
remote_cursor: nextCursor,
},
});
if (!nextPageToken || apiCallCount >= maxApiCalls) {
newRemoteCursor = newStartPageToken || nextPageToken;
}
} while (pageToken && apiCallCount < maxApiCalls);

return {
filesToSync,
moreChangesToFetch,
moreChangesToFetch: !!pageToken, // true if there's still a next page
remote_cursor: newRemoteCursor,
};
}

private async getRemoteCursor(connection: any) {
private async getRemoteCursor(
drive: ReturnType<typeof google.drive>,
connectionId: string,
): Promise<string> {
const internalDrive = await this.prisma.fs_drives.findFirst({
where: {
id_connection: connection.id_connection,
},
select: {
remote_cursor: true,
id_fs_drive: true,
},
where: { id_connection: connectionId }, // all drives share the same cursor for now
select: { id_fs_drive: true, remote_cursor: true },
});
return internalDrive?.remote_cursor;
let remoteCursor = internalDrive?.remote_cursor;
if (!remoteCursor) {
const startPageToken = await this.rateLimitedRequest(() =>
drive.changes
.getStartPageToken({ supportsAllDrives: true }) // one cursor for all drives
.then((response) => response.data.startPageToken),
);
remoteCursor = startPageToken;

await this.prisma.fs_drives.updateMany({
where: { id_connection: connectionId },
data: { remote_cursor: remoteCursor },
});
}
return remoteCursor;
}

async getFilesToSync(
drive: any,
pageToken?: string,
pages = 20, // number of times we use nextPageToken
pages = 5, // number of times we use nextPageToken
) {
interface DriveResponse {
data: {
Expand All @@ -316,7 +345,7 @@ export class GoogleDriveService implements IFileService {
drive.files.list({
q: 'mimeType != "application/vnd.google-apps.folder"',
fields:
'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions, trashed)',
'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissionIds, trashed)',
pageSize: BATCH_SIZE,
pageToken: nextPageToken,
includeItemsFromAllDrives: true,
Expand Down Expand Up @@ -353,6 +382,51 @@ export class GoogleDriveService implements IFileService {
);
}

private async fetchPermissions(
permissionIds: string[],
files: GoogleDriveFileOutput[],
drive: ReturnType<typeof google.drive>,
): Promise<GoogledrivePermissionOutput[]> {
const permissionIdToFiles = new Map<string, string[]>();

for (const file of files) {
if (file.permissionIds?.length) {
for (const permissionId of file.permissionIds) {
if (permissionIdToFiles.has(permissionId)) {
// only need one file_id to get the permission
continue;
} else {
permissionIdToFiles.set(permissionId, [file.id]);
}
}
}
}

const permissions: GoogledrivePermissionOutput[] = [];
const entries = Array.from(permissionIdToFiles.entries());

// do in batches of 10
for (let i = 0; i < entries.length; i += 10) {
const batch = entries.slice(i, i + 10);
const batchPromises = batch.map(([permissionId, fileIds]) =>
drive.permissions.get({
permissionId,
fileId: fileIds[0],
supportsAllDrives: true,
}),
);

const batchResults = await Promise.all(batchPromises);
permissions.push(
...batchResults.map(
(result) => result.data as unknown as GoogledrivePermissionOutput,
),
);
}

return permissions;
}

private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
let attempt = 0;
let backoff = INITIAL_BACKOFF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class GoogleDriveFileMapper implements IFileMapper {
file_url: file.webViewLink || file.webContentLink || null,
mime_type: file.mimeType || null,
size: file.size || null,
permissions: file.permissions,
permissions: file.internal_permissions,
shared_link: null,
...opts,
field_mappings,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export type GoogleDriveFileInput = Partial<GoogleDriveFileOutput>
export type GoogleDriveFileInput = Partial<GoogleDriveFileOutput>;

export interface GoogleDriveFileOutput {
kind?: string;
Expand Down Expand Up @@ -105,4 +105,7 @@ export interface GoogleDriveFileOutput {
};
sha1Checksum?: string;
sha256Checksum?: string;

// Internal fields
internal_permissions?: string[]; // Permissions ID in panora db
}
Loading

0 comments on commit fee0ae2

Please sign in to comment.