Skip to content

Commit

Permalink
feat: ondrive: integrate permissions with files and folder sync
Browse files Browse the repository at this point in the history
  • Loading branch information
amuwal committed Dec 20, 2024
1 parent 7c8e18e commit b21446f
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export class IngestDataService {
ingestParams,
);
} catch (syncError) {
console.log(syncError, 'syncError in ingest-data.service.ts');
this.logger.error(
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`,
syncError,
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/filestorage/drive/services/onedrive/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ export interface IdentitySet {
readonly phone?: Identity;
/** Identity representing a user. */
readonly user?: Identity;
/** Identity representing a group. */
readonly group?: Identity;
}

/**
Expand Down
197 changes: 155 additions & 42 deletions packages/api/src/filestorage/file/services/onedrive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import { IngestDataService } from '@@core/@core-services/unification/ingest-data
import { Connection } from '@@core/connections/@utils/types';
import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified';
import { BullQueueService } from '@@core/@core-services/queues/shared.service';
import { OnedrivePermissionOutput } from '@filestorage/permission/services/onedrive/types';
import { UnifiedFilestoragePermissionOutput } from '@filestorage/permission/types/model.unified';

@Injectable()
export class OnedriveService implements IFileService {
private readonly MAX_RETRIES: number = 6;
Expand Down Expand Up @@ -109,12 +112,16 @@ export class OnedriveService implements IFileService {
}

if (files.length > 0) {
// Assign and ingest permissions for the ingested files
await this.ingestPermissionsForFiles(files, connection);

const ingestedFiles = await this.ingestFiles(
files,
connection,
custom_field_mappings,
ingestParams,
);

this.logger.log(
`Ingested ${ingestedFiles.length} files from OneDrive.`,
'onedrive files ingestion',
Expand All @@ -137,7 +144,9 @@ export class OnedriveService implements IFileService {
);
}
} else {
const lastSyncTime = await this.getLastSyncTime(connection);
const lastSyncTime = await this.getLastSyncTime(
connection.id_connection,
);
const deltaLink = lastSyncTime
? `${
connection.account_url
Expand Down Expand Up @@ -258,29 +267,138 @@ export class OnedriveService implements IFileService {
);
}

private async getLastSyncTime(connection: {
id_connection: string;
}): Promise<Date | null> {
const lastSyncTime = await this.prisma.fs_files.findFirst({
where: {
id_connection: connection.id_connection,
},
orderBy: {
remote_modified_at: {
sort: 'desc',
nulls: 'last',
},
},
select: {
remote_modified_at: true,
},
});
/**
* Ingests and assigns permissions for files.
* @param allFiles - Array of OnedriveFileOutput to process.
* @param connection - The connection object.
* @returns The updated array of OnedriveFileOutput with ingested permissions.
*/
private async ingestPermissionsForFiles(
allFiles: OnedriveFileOutput[],
connection: Connection,
): Promise<OnedriveFileOutput[]> {
const allPermissions: OnedrivePermissionOutput[] = [];
const fileIdToRemotePermissionIdMap: Map<string, string[]> = new Map();
const batchSize = 100; // simultaneous requests

const files = allFiles.filter((f) => !f.deleted);

for (let i = 0; i < files.length; i += batchSize) {
const batch = files.slice(i, i + batchSize);
const permissions = await Promise.all(
batch.map(async (file) => {
const permissionConfig: AxiosRequestConfig = {
timeout: 30000,
method: 'get',
url: `${connection.account_url}/v1.0/me/drive/items/${file.id}/permissions`,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},
};

this.logger.log(
`Last file sync time: ${lastSyncTime?.remote_modified_at}`,
'onedrive files sync',
const resp = await this.makeRequestWithRetry(permissionConfig);
const permissions = resp.data.value;
fileIdToRemotePermissionIdMap.set(
file.id,
permissions.map((p) => p.id),
);
return permissions;
}),
);

allPermissions.push(...permissions.flat());
}

const uniquePermissions = Array.from(
new Map(
allPermissions.map((permission) => [permission.id, permission]),
).values(),
);
return lastSyncTime?.remote_modified_at ?? null;

await this.assignUserAndGroupIdsToPermissions(uniquePermissions);

const syncedPermissions = await this.ingestService.ingestData<
UnifiedFilestoragePermissionOutput,
OnedrivePermissionOutput
>(
uniquePermissions,
'onedrive',
connection.id_connection,
'filestorage',
'permission',
);

this.logger.log(`Ingested ${allPermissions.length} permissions for files.`);

const permissionIdMap: Map<string, string> = new Map(
syncedPermissions.map((permission) => [
permission.remote_id,
permission.id_fs_permission,
]),
);

files.forEach((file) => {
if (fileIdToRemotePermissionIdMap.has(file.id)) {
file.internal_permissions = fileIdToRemotePermissionIdMap
.get(file.id)
?.map((permission) => permissionIdMap.get(permission))
.filter((id) => id !== undefined);
}
});

return allFiles;
}

private async assignUserAndGroupIdsToPermissions(
permissions: OnedrivePermissionOutput[],
): Promise<void> {
const userLookupCache: Map<string, string> = new Map();
const groupLookupCache: Map<string, string> = new Map();

for (const permission of permissions) {
if (permission.grantedToV2?.user?.id) {
const remote_user_id = permission.grantedToV2.user.id;
if (userLookupCache.has(remote_user_id)) {
permission.internal_user_id = userLookupCache.get(remote_user_id);
continue;
}
const user = await this.prisma.fs_users.findFirst({
where: {
remote_id: remote_user_id,
},
select: {
id_fs_user: true,
},
});
if (user) {
permission.internal_user_id = user.id_fs_user;
userLookupCache.set(remote_user_id, user.id_fs_user);
}
}

if (permission.grantedToV2?.group?.id) {
const remote_group_id = permission.grantedToV2.group.id;
if (groupLookupCache.has(remote_group_id)) {
permission.internal_group_id = groupLookupCache.get(remote_group_id);
continue;
}
const group = await this.prisma.fs_groups.findFirst({
where: {
remote_id: remote_group_id,
},
select: {
id_fs_group: true,
},
});
if (group) {
permission.internal_group_id = group.id_fs_group;
groupLookupCache.set(remote_group_id, group.id_fs_group);
}
}
}
}

private async syncFolder(
Expand All @@ -306,6 +424,8 @@ export class OnedriveService implements IFileService {
(elem: any) => !elem.folder, // files don't have a folder property
);

await this.ingestPermissionsForFiles(files, connection);

return files;
} catch (error: any) {
if (error.response?.status === 404) {
Expand All @@ -332,27 +452,15 @@ export class OnedriveService implements IFileService {
}
throw error;
}
}

// Add permissions (shared link is also included in permissions in OneDrive)
// await Promise.all(
// files.map(async (driveItem: OnedriveFileOutput) => {
// const permissionsConfig: AxiosRequestConfig = {
// method: 'get',
// url: `${connection.account_url}/v1.0/me/drive/items/${driveItem.id}/permissions`,
// headers: {
// 'Content-Type': 'application/json',
// Authorization: `Bearer ${this.cryptoService.decrypt(
// connection.access_token,
// )}`,
// },
// };

// const permissionsResp: AxiosResponse = await this.makeRequestWithRetry(
// permissionsConfig,
// );
// driveItem.permissions = permissionsResp.data.value;
// }),
// );
private async getLastSyncTime(connectionId: string): Promise<Date | null> {
const lastSync = await this.prisma.fs_files.findFirst({
where: { id_connection: connectionId },
orderBy: { remote_modified_at: { sort: 'desc', nulls: 'last' } },
});
this.logger.log(`Last sync time: ${lastSync?.remote_modified_at}`);
return lastSync ? lastSync.remote_modified_at : null;
}

async downloadFile(fileId: string, connection: any): Promise<Buffer> {
Expand Down Expand Up @@ -435,6 +543,11 @@ export class OnedriveService implements IFileService {
continue;
}

// handle 410 gone errors
if (error.response?.status === 410 && config.url.includes('delta')) {
// todo: handle 410 gone errors
}

throw error;
}
}
Expand Down
36 changes: 2 additions & 34 deletions packages/api/src/filestorage/file/services/onedrive/mappers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,37 +88,6 @@ export class OnedriveFileMapper implements IFileMapper {
}
}

const opts: any = {};
if (file.permissions?.length) {
const permissions = await this.coreUnificationService.unify<
OriginalPermissionOutput[]
>({
sourceObject: file.permissions,
targetType: FileStorageObject.permission,
providerName: 'onedrive',
vertical: 'filestorage',
connectionId,
customFieldMappings: [],
});
opts.permissions = permissions;

// shared link
if (file.permissions.some((p) => p.link)) {
const sharedLinks =
await this.coreUnificationService.unify<OriginalSharedLinkOutput>({
sourceObject: file.permissions.find((p) => p.link),
targetType: FileStorageObject.sharedlink,
providerName: 'onedrive',
vertical: 'filestorage',
connectionId,
customFieldMappings: [],
});
opts.shared_links = sharedLinks;
}
}

// todo: handle folder

return {
remote_id: file.id,
remote_data: file,
Expand All @@ -136,9 +105,8 @@ export class OnedriveFileMapper implements IFileMapper {
mime_type: file.file.mimeType,
size: file.size.toString(),
folder_id: null,
// permission: opts.permissions?.[0] || null,
permissions: null,
shared_link: opts.shared_links?.[0] || null,
permissions: file.internal_permissions,
shared_link: null,
field_mappings,
};
}
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/filestorage/file/services/onedrive/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ export interface OnedriveFileOutput {
readonly video?: Video;
/** WebDAV compatible URL for the item. */
readonly webDavUrl?: string;

// INTERNAL FIELDS
internal_permissions?: string[];
}

/**
Expand Down
Loading

0 comments on commit b21446f

Please sign in to comment.