Skip to content

Commit

Permalink
feat: use event emitter for bulk upload progress
Browse files Browse the repository at this point in the history
  • Loading branch information
DaniAkash committed May 2, 2024
1 parent fd1c824 commit a95725d
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 26 deletions.
12 changes: 1 addition & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
"@parcel/packager-ts": "^2.11.0",
"@parcel/transformer-typescript-types": "^2.11.0",
"@types/async": "^3.2.24",
"@types/cli-progress": "^3.11.5",
"@types/google-protobuf": "^3.15.12",
"@types/js-yaml": "^4.0.9",
"@types/lodash": "^4.17.0",
Expand Down Expand Up @@ -67,7 +66,6 @@
"async": "^3.2.5",
"chalk": "^5.3.0",
"clarifai-nodejs-grpc": "^10.3.2",
"cli-progress": "^3.12.0",
"csv-parse": "^5.5.5",
"from-protobuf-object": "^1.0.2",
"google-protobuf": "^3.21.2",
Expand Down
8 changes: 7 additions & 1 deletion src/client/dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { UserError } from "../errors";
import { ClarifaiUrl, ClarifaiUrlHelper } from "../urls/helper";
import { AuthConfig } from "../utils/types";
import { Lister } from "./lister";
import { Input } from "./input";
import { Input, InputBulkUpload } from "./input";
import {
DeleteDatasetVersionsRequest,
ListDatasetVersionsRequest,
Expand Down Expand Up @@ -139,11 +139,13 @@ export class Dataset extends Lister {
inputType,
labels = false,
batchSize = this.batchSize,
uploadProgressEmitter,
}: {
folderPath: string;
inputType: "image" | "text";
labels: boolean;
batchSize?: number;
uploadProgressEmitter?: InputBulkUpload;
}): Promise<void> {
if (["image", "text"].indexOf(inputType) === -1) {
throw new UserError("Invalid input type");
Expand All @@ -166,6 +168,7 @@ export class Dataset extends Lister {
await this.input.bulkUpload({
inputs: inputProtos,
batchSize: batchSize,
uploadProgressEmitter,
});
}

Expand All @@ -175,12 +178,14 @@ export class Dataset extends Lister {
csvType,
labels = true,
batchSize = 128,
uploadProgressEmitter,
}: {
csvPath: string;
inputType?: "image" | "text" | "video" | "audio";
csvType: "raw" | "url" | "file";
labels?: boolean;
batchSize?: number;
uploadProgressEmitter?: InputBulkUpload;
}): Promise<void> {
if (!["image", "text", "video", "audio"].includes(inputType)) {
throw new UserError(
Expand Down Expand Up @@ -209,6 +214,7 @@ export class Dataset extends Lister {
await this.input.bulkUpload({
inputs: inputProtos,
batchSize: batchSize,
uploadProgressEmitter,
});
}
}
50 changes: 39 additions & 11 deletions src/client/input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import {
Input as GrpcInput,
Image,
Point,
Polygon,
Region,
RegionInfo,
Text,
Video,
} from "clarifai-nodejs-grpc/proto/clarifai/api/resources_pb";
import { AuthConfig, Polygon } from "../utils/types";
import { AuthConfig, Polygon as PolygonType } from "../utils/types";
import { Lister } from "./lister";
import { Buffer } from "buffer";
import fs from "fs";
Expand All @@ -41,9 +42,9 @@ import { StatusCode } from "clarifai-nodejs-grpc/proto/clarifai/api/status/statu
import os from "os";
import chunk from "lodash/chunk";
import { Status } from "clarifai-nodejs-grpc/proto/clarifai/api/status/status_pb";
import cliProgress from "cli-progress";
import async from "async";
import { MAX_RETRIES } from "../constants/dataset";
import { EventEmitter } from "events";

interface CSVRecord {
inputid: string;
Expand All @@ -53,6 +54,29 @@ interface CSVRecord {
geopoints: string;
}

interface UploadEvents {
start: ProgressEvent;
progress: ProgressEvent;
error: ErrorEvent;
end: ProgressEvent;
}

interface ProgressEvent {
current: number;
total: number;
}

interface ErrorEvent {
error: Error;
}

type BulkUploadEventEmitter<T> = EventEmitter & {
emit<K extends keyof T>(event: K, payload: T[K]): boolean;
on<K extends keyof T>(event: K, listener: (payload: T[K]) => void): this;
};

export type InputBulkUpload = BulkUploadEventEmitter<UploadEvents>;

/**
* Inputs is a class that provides access to Clarifai API endpoints related to Input information.
* @noInheritDoc
Expand Down Expand Up @@ -738,7 +762,7 @@ export class Input extends Lister {
}: {
inputId: string;
label: string;
polygons: Polygon[];
polygons: PolygonType[];
}): Annotation {
const polygonsSchema = z.array(z.array(z.tuple([z.number(), z.number()])));
try {
Expand Down Expand Up @@ -1009,19 +1033,18 @@ export class Input extends Lister {
bulkUpload({
inputs,
batchSize: providedBatchSize = 128,
uploadProgressEmitter,
}: {
inputs: GrpcInput[];
batchSize?: number;
uploadProgressEmitter?: InputBulkUpload;
}): Promise<void> {
const batchSize = Math.min(128, providedBatchSize);
const chunkedInputs = chunk(inputs, batchSize);

const progressBar = new cliProgress.SingleBar(
{},
cliProgress.Presets.shades_classic,
);

progressBar.start(chunkedInputs.length, 0);
let currentProgress = 0;
const total = chunkedInputs.length;
uploadProgressEmitter?.emit("start", { current: currentProgress, total });

return new Promise<void>((resolve, reject) => {
async.mapLimit(
Expand All @@ -1033,7 +1056,11 @@ export class Input extends Lister {
this.retryUploads({
failedInputs,
}).finally(() => {
progressBar.increment();
currentProgress++;
uploadProgressEmitter?.emit("progress", {
current: currentProgress,
total,
});
callback(null, failedInputs);
});
})
Expand All @@ -1044,9 +1071,10 @@ export class Input extends Lister {
(err) => {
if (err) {
console.error("Error processing batches", err);
uploadProgressEmitter?.emit("error");
reject(err);
}
progressBar.stop();
uploadProgressEmitter?.emit("end", { current: total, total });
console.log("All inputs processed");
resolve();
},
Expand Down
40 changes: 39 additions & 1 deletion tests/client/search.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import path from "path";
import { getSchema } from "../../src/schema/search";
import { z } from "zod";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
import { App, Dataset, Input, Search, User } from "../../src/index";
import { Hit } from "clarifai-nodejs-grpc/proto/clarifai/api/resources_pb";
import EventEmitter from "events";

const NOW = Date.now().toString() + "-search";
const CREATE_APP_USER_ID = import.meta.env.VITE_CLARIFAI_USER_ID;
Expand Down Expand Up @@ -197,11 +198,48 @@ describe("Search", () => {
},
datasetId: datasetObj.id,
});
const eventEmitter = new EventEmitter();
const eventHandler = {
start: (...args: unknown[]) => console.log("start", args),
progress: (...args: unknown[]) => console.log("progress", args),
end: (...args: unknown[]) => console.log("end", args),
error: (...args: unknown[]) => console.log("error", args),
};
const startSpy = vi.spyOn(eventHandler, "start");
const progressSpy = vi.spyOn(eventHandler, "progress");
const endSpy = vi.spyOn(eventHandler, "end");
const errorSpy = vi.spyOn(eventHandler, "error");
eventEmitter.on("start", (start) => {
eventHandler.start(start);
});
eventEmitter.on("progress", (progress) => {
eventHandler.progress(progress);
});
eventEmitter.on("end", (progress) => {
eventHandler.end(progress);
});
eventEmitter.on("error", (error) => {
eventHandler.error(error);
});
await dataset.uploadFromFolder({
folderPath: DATASET_IMAGES_DIR,
inputType: "image",
labels: false,
uploadProgressEmitter: eventEmitter,
});
expect(startSpy).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ current: 0, total: 1 }),
);
expect(progressSpy).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ current: 1, total: 1 }),
);
expect(endSpy).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ current: 1, total: 1 }),
);
expect(errorSpy).not.toHaveBeenCalled();
}, 50000);

it("should get expected hits for filters", async () => {
Expand Down
1 change: 1 addition & 0 deletions vitest.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export default defineConfig({
test: {
coverage: {
reporter: ["text", "json", "html", "clover", "json-summary"],
include: ["src/**/*"],
},
},
});

0 comments on commit a95725d

Please sign in to comment.