Skip to content

Commit

Permalink
Support for new ADaaS architecture, attachments streaming, gzipping f…
Browse files Browse the repository at this point in the history
…iles (#6)
  • Loading branch information
radovan-jorgic authored Nov 8, 2024
1 parent da1850b commit 05206f2
Show file tree
Hide file tree
Showing 55 changed files with 7,607 additions and 1,942 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,4 @@ dist
.pnp.*

.npmrc
.idea
298 changes: 182 additions & 116 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# ADaaS Library

Typescript ADaaS Library (@devrev/ts-adaas) provides:
## Release Notes

- type definitions for ADaaS control protocol,
- an adapter for ADaaS control protocol,
- helpers for uploading artifacts and manage the state for ADaaS snap-in.
#### v1.0.0

## Release Notes
- Allow extractions to use full lambda runtime and gracefully handle execution context timeout.
- Simplified metadata and data normalization and uploading with repo implementation.
- Default handling of attachment extraction phase in ADaaS SDK library.
- Reduced file size, streamlined process by gzip compression.
- Bug fixes and improvements in error handling.

#### v0.0.3

Expand All @@ -25,155 +27,219 @@ Typescript ADaaS Library (@devrev/ts-adaas) provides:
- Adapter for ADaaS control protocol with helper functions
- Uploader for uploading artifacts

## Usage
# Overview

Create a new ADaaS adapter on each ADaaS snap-in invocation:
The ADaaS (Airdrop-as-a-Service) Library for TypeScript helps developers build Snap-ins that integrate with DevRev’s ADaaS platform. This library simplifies the workflow for handling data extraction, event-driven actions, state management, and artifact handling.

```javascript
const adapter = new Adapter(event: AirdropEvent);
```
## Features

Adapter class provides:
- Type Definitions: Structured types for ADaaS control protocol
- Event Management: Easily emit events for different extraction phases
- State Handling: Update and access state in real-time within tasks
- Artifact Management: Supports batched storage of artifacts (2000 items per batch)
- Error & Timeout Support: Error handling and timeout management for long-running tasks

- helper function to emit response,
- automatic emit event if ADaaS snap-in invocation runs out of time,
- setter for updating ADaaS snap-in state and adding artifacts to the return ADaaS message.
# Installation

### Phases of Airdrop Extraction
```bash
npm install @devrev/ts-adaas
```

Each ADaaS snap-in must handle all the phases of ADaaS extraction.
# Usage

ADaaS library provides type definitions to ensure ADaaS snap-ins are compatible with ADaaS control protocol.
ADaaS Snap-ins are composed of several phases, each with unique requirements for initialization, data extraction, and error handling. The ADaaS library exports processTask to structure the work within each phase. The processTask function accepts task and onTimeout handlers, giving access to the adapter to streamline state updates, upload of extracted data, and event emission.

```javascript
async run() {
switch (this.event.payload.event_type) {
case EventType.ExtractionExternalSyncUnitsStart: {
### ADaaS Snap-in Invocation

// extract available External Sync Units (projects, organizations, ...)
Each ADaaS snap-in must handle all the phases of ADaaS extraction. In a Snap-in, you typically define a `run` function that iterates over events and invokes workers per extraction phase.

await this.adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, {
external_sync_units: externalSyncUnits,
});
break;
}

case EventType.ExtractionMetadataStart: {
```typescript
import { AirdropEvent, EventType, spawn } from '@devrev/ts-adaas';

// provide mappings of domain objects by provioding initial_domain_mapping.json file
// update ADaaS snap-in state
interface DummyExtractorState {
issues: { completed: boolean };
users: { completed: boolean };
attachments: { completed: boolean };
}

await this.adapter.emit(ExtractorEventType.ExtractionMetadataDone);
const initialState: DummyExtractorState = {
issues: { completed: false },
users: { completed: false },
attachments: { completed: false },
};

function getWorkerPerExtractionPhase(event: AirdropEvent) {
let path;
switch (event.payload.event_type) {
case EventType.ExtractionExternalSyncUnitsStart:
path = __dirname + '/workers/external-sync-units-extraction';
break;
}

case EventType.ExtractionDataStart: {

// extract Data
// upload Data
// update ADaaS snap-in state
// approximate progress done

await this.adapter.emit(ExtractorEventType.ExtractionDataContinue, {
progress: 10,
});

case EventType.ExtractionMetadataStart:
path = __dirname + '/workers/metadata-extraction';
break;
}

case EventType.ExtractionDataContinue: {
await this.processExtractionData();

// extract Data
// upload Data
// update ADaaS snap-in state
// approximate progress done

await this.adapter.emit(ExtractorEventType.ExtractionDataDone, {
progress: 100,
});
case EventType.ExtractionDataStart:
case EventType.ExtractionDataContinue:
path = __dirname + '/workers/data-extraction';
break;
}

case EventType.ExtractionDataDelete: {
}
return path;
}

// if an extraction has any side-effects to 3rd party systems cleanup should be done here.
const run = async (events: AirdropEvent[]) => {
for (const event of events) {
const file = getWorkerPerExtractionPhase(event);
await spawn<DummyExtractorState>({
event,
initialState,
workerPath: file,
options: {
isLocalDevelopment: true,
},
});
}
};

await this.adapter.emit(ExtractorEventType.ExtractionDataDeleteDone);
break;
}
export default run;
```

case EventType.ExtractionAttachmentsStart: {
## Extraction Phases

// extract Attachments
// upload Attachments
// update ADaaS snap-in state
The ADaaS snap-in extraction lifecycle consists of three main phases: External Sync Units Extraction, Metadata Extraction, and Data Extraction. Each phase is defined in a separate file and is responsible for fetching the respective data.

await this.adapter.emit(ExtractorEventType.ExtractionAttachmentsContinue);
break;
}
### 1. External Sync Units Extraction

case EventType.ExtractionAttachmentsContinue: {
This phase is defined in `external-sync-units-extraction.ts` and is responsible for fetching the external sync units.

```typescript
import {
ExternalSyncUnit,
ExtractorEventType,
processTask,
} from '@devrev/ts-adaas';

const externalSyncUnits: ExternalSyncUnit[] = [
{
id: 'devrev',
name: 'devrev',
description: 'Demo external sync unit',
item_count: 2,
item_type: 'issues',
},
];

processTask({
task: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, {
external_sync_units: externalSyncUnits,
});
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsError, {
error: {
message: 'Failed to extract external sync units. Lambda timeout.',
},
});
},
});
```

// extract Attachments
// upload Attachments
// update ADaaS snap-in state
### 2. Metadata Extraction

await this.adapter.emit(ExtractorEventType.ExtractionAttachmentsDone);
break;
}
This phase is defined in `metadata-extraction.ts` and is responsible for fetching the metadata.

case EventType.ExtractionAttachmentsDelete: {
```typescript
import { ExtractorEventType, processTask } from '@devrev/ts-adaas';
import externalDomainMetadata from '../dummy-extractor/external_domain_metadata.json';

const repos = [{ itemType: 'external_domain_metadata' }];

processTask({
task: async ({ adapter }) => {
adapter.initializeRepos(repos);
await adapter
.getRepo('external_domain_metadata')
?.push([externalDomainMetadata]);
await adapter.emit(ExtractorEventType.ExtractionMetadataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionMetadataError, {
error: { message: 'Failed to extract metadata. Lambda timeout.' },
});
},
});
```

// if an extraction has any side-effects to 3rd party systems cleanup should be done here.
### 3. Data Extraction

await this.adapter.emit(ExtractorEventType.ExtractionAttachmentsDeleteDone);
break;
}
This phase is defined in `data-extraction.ts` and is responsible for fetching the data. In this phase also attachments metadata is extracted.

default: {
console.log('Event not supported' + JSON.stringify(this.event));
```typescript
import { EventType, ExtractorEventType, processTask } from '@devrev/ts-adaas';
import { normalizeAttachment, normalizeIssue, normalizeUser } from '../dummy-extractor/data-normalization';

const issues = [
{ id: 'issue-1', created_date: '1999-12-25T01:00:03+01:00', ... },
{ id: 'issue-2', created_date: '1999-12-27T15:31:34+01:00', ... },
];

const users = [
{ id: 'user-1', created_date: '1999-12-25T01:00:03+01:00', ... },
{ id: 'user-2', created_date: '1999-12-27T15:31:34+01:00', ... },
];

const attachments = [
{ url: 'https://app.dev.devrev-eng.ai/favicon.ico', id: 'attachment-1', ... },
{ url: 'https://app.dev.devrev-eng.ai/favicon.ico', id: 'attachment-2', ... },
];

const repos = [
{ itemType: 'issues', normalize: normalizeIssue },
{ itemType: 'users', normalize: normalizeUser },
{ itemType: 'attachments', normalize: normalizeAttachment },
];

processTask({
task: async ({ adapter }) => {
adapter.initializeRepos(repos);

if (adapter.event.payload.event_type === EventType.ExtractionDataStart) {
await adapter.getRepo('issues')?.push(issues);
await adapter.emit(ExtractorEventType.ExtractionDataProgress, { progress: 50 });
} else {
await adapter.getRepo('users')?.push(users);
await adapter.getRepo('attachments')?.push(attachments);
await adapter.emit(ExtractorEventType.ExtractionDataDone, { progress: 100 });
}
}
}
},
onTimeout: async ({ adapter }) => {
await adapter.postState();
await adapter.emit(ExtractorEventType.ExtractionDataProgress, { progress: 50 });
},
});
```

## Uploading artifacts
## 4. Attachments Streaming

Create a new Uploader class for uploading artifacts:
The ADaaS library handles attachments streaming to improve efficiency and reduce complexity for developers. During the extraction phase, developers need only to provide metadata in a specific format for each attachment, and the library manages the streaming process.

```javascript
const upload = new Uploader(
event.execution_metadata.devrev_endpoint,
event.context.secrets.service_account_token
);
```
The Snap-in should provide attachment metadata following the `NormalizedAttachment` interface:

Files with extracted domain objects must be in JSONL (JSON Lines) format. Data files should contain 2000 - 5000 records each.

```javascript
const entity = 'users';
const { artifact, error } = await this.uploader.upload(
`extractor_${entity}_${i}.jsonl`,
entity,
data
);
if (error) {
return error;
} else {
await this.adapter.update({ artifact });
```typescript
export interface NormalizedAttachment {
url: string;
id: string;
file_name: string;
author_id: string;
parent_id: string;
}
```

Each uploaded file must be attached to ADaaS adapter as soon as it is uploaded to ensure it is included in the ADaaS response message in case of a lambda timeout.
## Artifact Uploading and State Management

## Updating ADaaS snap-in state
The ADaaS library provides a repository management system to handle artifacts in batches. The `initializeRepos` function initializes the repositories, and the `push` function uploads the artifacts to the repositories. The `postState` function is used to post the state of the extraction task.

ADaaS snap-ins keep their own state between sync runs, between the states of a particular sync run and between invocations within a particular state.
State management is crucial for ADaaS Snap-ins to maintain the state of the extraction task. The `postState` function is used to post the state of the extraction task. The state is stored in the adapter and can be retrieved using the `adapter.state` property.

By managing its own state, the ADaaS snap-in keeps track of the process of extraction (what items have already been extracted and where to continue), the times of the last successful sync run and keeps record of progress of the extraction.
## Timeout Handling

```typescript
async update({ artifacts, extractor_state}: AdapterUpdateParams)
```
The ADaaS library provides a timeout handler to handle timeouts in long-running tasks. The `onTimeout` handler is called when the task exceeds the timeout limit. The handler can be used to post the state of the extraction task and emit an event when a timeout occurs.
Loading

0 comments on commit 05206f2

Please sign in to comment.