Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): request abort signal support #112

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/client/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@fal-ai/client",
"description": "The fal.ai client for JavaScript and TypeScript",
"version": "1.2.0-alpha.5",
"version": "1.2.0-alpha.6",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
3 changes: 3 additions & 0 deletions libs/client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ export function createFalClient(userConfig: Config = {}): FalClient {
...config,
responseHandler: resultResponseHandler,
},
options: {
signal: options.abortSignal,
},
});
},
subscribe: async (endpointId, options) => {
Expand Down
24 changes: 21 additions & 3 deletions libs/client/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ type BaseQueueOptions = {
* The unique identifier for the enqueued request.
*/
requestId: string;

/**
* The signal to abort the request.
*/
abortSignal?: AbortSignal;
};

export type QueueStatusOptions = BaseQueueOptions & {
Expand Down Expand Up @@ -246,11 +251,14 @@ export const createQueueClient = ({
},
input: input as Input,
config,
options: {
signal: options.abortSignal,
},
});
},
async status(
endpointId: string,
{ requestId, logs = false }: QueueStatusOptions,
{ requestId, logs = false, abortSignal }: QueueStatusOptions,
): Promise<QueueStatus> {
const appId = parseEndpointId(endpointId);
const prefix = appId.namespace ? `${appId.namespace}/` : "";
Expand All @@ -262,6 +270,9 @@ export const createQueueClient = ({
path: `/requests/${requestId}/status`,
}),
config,
options: {
signal: abortSignal,
},
});
},

Expand Down Expand Up @@ -379,6 +390,7 @@ export const createQueueClient = ({
const requestStatus = await ref.status(endpointId, {
requestId,
logs: options.logs ?? false,
abortSignal: options.abortSignal,
});
if (options.onQueueUpdate) {
options.onQueueUpdate(requestStatus);
Expand All @@ -400,7 +412,7 @@ export const createQueueClient = ({

async result<Output>(
endpointId: string,
{ requestId }: BaseQueueOptions,
{ requestId, abortSignal }: BaseQueueOptions,
): Promise<Result<Output>> {
const appId = parseEndpointId(endpointId);
const prefix = appId.namespace ? `${appId.namespace}/` : "";
Expand All @@ -414,12 +426,15 @@ export const createQueueClient = ({
...config,
responseHandler: resultResponseHandler,
},
options: {
signal: abortSignal,
},
});
},

async cancel(
endpointId: string,
{ requestId }: BaseQueueOptions,
{ requestId, abortSignal }: BaseQueueOptions,
): Promise<void> {
const appId = parseEndpointId(endpointId);
const prefix = appId.namespace ? `${appId.namespace}/` : "";
Expand All @@ -430,6 +445,9 @@ export const createQueueClient = ({
path: `/requests/${requestId}/cancel`,
}),
config,
options: {
signal: abortSignal,
},
});
},
};
Expand Down
22 changes: 9 additions & 13 deletions libs/client/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* @returns the file extension or `bin` if the content type is not recognized.
*/
function getExtensionFromContentType(contentType: string): string {
const [_, fileType] = contentType.split("/");

Check warning on line 48 in libs/client/src/storage.ts

View workflow job for this annotation

GitHub Actions / build

'_' is assigned a value but never used
return fileType.split(/[-;]/)[0] ?? "bin";
}

Expand Down Expand Up @@ -105,7 +105,7 @@
uploadUrl: string,
chunk: Blob,
config: RequiredConfig,
tries: number = 3,
tries = 3,
): Promise<MultipartObject> {
if (tries === 0) {
throw new Error("Part upload failed, retries exhausted");
Expand Down Expand Up @@ -142,21 +142,17 @@

const responses: MultipartObject[] = [];

try {
for (let i = 0; i < chunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, file.size);
for (let i = 0; i < chunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, file.size);

const chunk = file.slice(start, end);
const chunk = file.slice(start, end);

const partNumber = i + 1;
// {uploadUrl}/{part_number}?uploadUrlParams=...
const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`;
const partNumber = i + 1;
// {uploadUrl}/{part_number}?uploadUrlParams=...
const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`;

responses.push(await partUploadRetries(partUploadUrl, chunk, config));
}
} catch (error) {
throw error;
responses.push(await partUploadRetries(partUploadUrl, chunk, config));
}

// Complete the upload
Expand Down Expand Up @@ -221,7 +217,7 @@
} else if (input instanceof Blob) {
return await ref.upload(input);
} else if (isPlainObject(input)) {
const inputObject = input as Record<string, any>;

Check warning on line 220 in libs/client/src/storage.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
const promises = Object.entries(inputObject).map(
async ([key, value]): Promise<KeyValuePair> => {
return [key, await ref.transformInput(value)];
Expand Down
17 changes: 17 additions & 0 deletions libs/client/src/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@
* support streaming.
*/
readonly connectionMode?: StreamingConnectionMode;

/**
* The signal to abort the request.
*/
readonly signal?: AbortSignal;
};

const EVENT_STREAM_TIMEOUT = 15 * 1000;

type FalStreamEventType = "data" | "error" | "done";

type EventHandler<T = any> = (event: T) => void;

Check warning on line 77 in libs/client/src/streaming.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

/**
* The class representing a streaming response. With t
Expand Down Expand Up @@ -129,6 +134,14 @@
reject(error);
});
});
// if a abort signal was passed, sync it with the internal one
if (options.signal) {
options.signal.addEventListener("abort", () => {
this.abortController.abort();
});
}

// start the streaming request
this.start().catch(this.handleError);
}

Expand Down Expand Up @@ -232,7 +245,7 @@
this.emit("data", parsedData);

// also emit 'message'for backwards compatibility
this.emit("message" as any, parsedData);

Check warning on line 248 in libs/client/src/streaming.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
} catch (e) {
this.emit("error", e);
}
Expand Down Expand Up @@ -268,7 +281,7 @@
return;
};

private handleError = (error: any) => {

Check warning on line 284 in libs/client/src/streaming.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
// In case AbortError is thrown but the signal is marked as aborted
// it means the user called abort() and we should not emit an error
// as it's expected behavior
Expand Down Expand Up @@ -345,6 +358,10 @@

/**
* Gets the `AbortSignal` instance that can be used to listen for abort events.
*
* **Note:** this signal is internal to the `FalStream` instance. If you pass your
* own abort signal, the `FalStream` will listen to it and abort it appropriately.
*
* @returns the `AbortSignal` instance.
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
*/
Expand Down
5 changes: 5 additions & 0 deletions libs/client/src/types/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ export type RunOptions<Input> = {
* The HTTP method, defaults to `post`;
*/
readonly method?: "get" | "post" | "put" | "delete" | string;

/**
* The abort signal to cancel the request.
*/
readonly abortSignal?: AbortSignal;
};

export type UrlOptions = {
Expand Down
Loading