Skip to content

Commit

Permalink
Merge pull request #68 from togethercomputer/yogish/upload-files
Browse files Browse the repository at this point in the history
Adding upload API as a helper function
  • Loading branch information
Nutlope authored Nov 5, 2024
2 parents 1f74ae7 + 469475e commit ed8622e
Show file tree
Hide file tree
Showing 6 changed files with 1,026 additions and 11 deletions.
500 changes: 500 additions & 0 deletions examples/sample_finetuning.jsonl

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions examples/uploading-files.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env -S npm run tsn -T

// Example of uploading a file
import { upload } from 'together-ai/lib/upload';

async function main() {
// Upload a file
const file = await upload('./examples/sample_finetuning.jsonl');
console.log('Uploaded file');
console.log(file);
}

main();
7 changes: 6 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@
"dependencies": {
"@types/node": "^18.11.18",
"@types/node-fetch": "^2.6.4",
"@types/parquetjs": "^0.10.6",
"@types/progress-stream": "^2.0.5",
"abort-controller": "^3.0.0",
"agentkeepalive": "^4.2.1",
"axios": "^1.7.7",
"form-data-encoder": "1.7.2",
"formdata-node": "^4.3.2",
"node-fetch": "^2.6.7"
"node-fetch": "^2.6.7",
"parquetjs": "^0.11.2",
"progress-stream": "^2.0.0"
},
"devDependencies": {
"@swc/core": "^1.3.102",
Expand Down
301 changes: 301 additions & 0 deletions src/lib/upload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
// Upload file to server using /files API

import * as core from '../core';
import { isAxiosError } from 'axios';
import fs from 'fs';
import fetch from 'node-fetch';
import * as path from 'path';
import progress from 'progress-stream';
import readline from 'readline';
import pkg from 'parquetjs';
const { ParquetReader } = pkg;

export interface FileResponse {
id: string;
object: string;
type: 'jsonl' | 'parquet';
purpose: 'fine-tune';
filename: string;
bytes: number;
line_count: number;
processed: boolean;
}

export interface ErrorResponse {
message: string;
}

const failedUploadMessage = {
message: 'failed to upload file',
};

const baseURL = 'https://api.together.xyz/v1';
const MAX_FILE_SIZE = 4.8; // GB
const BYTES_PER_GB = 1024 * 1024 * 1024;
const MIN_SAMPLES = 1;
const PARQUET_EXPECTED_COLUMNS = ['input_ids', 'attention_mask', 'labels'];

export interface CheckFileResponse {
success: boolean;
message?: string;
}
export async function check_file(fileName: string): Promise<CheckFileResponse> {
const stat = fs.statSync(fileName);
if (stat.size == 0) {
return { success: false, message: `File is empty` };
}

if (stat.size > MAX_FILE_SIZE * BYTES_PER_GB) {
return { success: false, message: `File size exceeds the limit of ${MAX_FILE_SIZE} GB` };
}

const fileType = path.extname(fileName);
if (fileType !== '.jsonl' && fileType !== '.parquet') {
return {
success: false,
message: 'File type must be either .jsonl or .parquet',
};
}

if (fileType == '.jsonl') {
const jsonlCheck = await check_jsonl(fileName);
if (jsonlCheck) {
return { success: false, message: jsonlCheck };
}
}

if (fileType == '.parquet') {
const parquetCheck = await check_parquet(fileName);
if (parquetCheck) {
return { success: false, message: parquetCheck };
}
}

return { success: true };
}

export async function check_parquet(fileName: string): Promise<string | undefined> {
try {
const reader = await ParquetReader.openFile(fileName);
const cursor = reader.getCursor();
let record = null;

const fieldNames = Object.keys(reader.schema.fields);
if (!('input_ids' in fieldNames)) {
return `Parquet file ${fileName} does not contain the 'input_ids' column.`;
}

for (const fieldName in fieldNames) {
if (!PARQUET_EXPECTED_COLUMNS.includes(fieldName)) {
return `Parquet file ${fileName} contains unexpected column ${fieldName}. Only ${PARQUET_EXPECTED_COLUMNS.join(
', ',
)} are supported`;
}
}

const numRows = reader.getRowCount() as unknown as number;
if (numRows < MIN_SAMPLES) {
return `Parquet file ${fileName} contains only ${numRows} samples. Minimum of ${MIN_SAMPLES} samples are required`;
}

await reader.close();
} catch (err) {
return `failed to read parquet file ${fileName}`;
}

return undefined;
}

// return undefined if the file is valid, otherwise return an error message
export async function check_jsonl(fileName: string): Promise<string | undefined> {
const fileStream = fs.createReadStream(fileName);

const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});

let errors: string[] = [];
let lineNumber = 1;
for await (const line of rl) {
try {
// do not proceed if there are too many errors
if (errors.length > 20) {
break;
}
const parsedLine = JSON.parse(line);

if (typeof parsedLine !== 'object') {
errors.push(`Line number ${lineNumber} is not a valid JSON object`);
continue;
}

if (!('text' in parsedLine)) {
errors.push(
`Missing 'text' field was found on line ${lineNumber} of the the input file. Expected format: {'text': 'my sample string'}.`,
);
continue;
}

if (typeof parsedLine['text'] !== 'string') {
errors.push(`'Invalid value type for "text" key on line ${lineNumber}. Expected string`);
continue;
}
} catch (error) {
errors.push(`Error parsing line number ${lineNumber}`);
}
lineNumber += 1;
}

lineNumber -= 1;
if (lineNumber < MIN_SAMPLES) {
errors.push(`Processing ${fileName} resulted in only ${lineNumber - 1} samples.`);
}

if (errors.length > 0) {
return errors.join('\n');
}

return undefined;
}

export async function upload(fileName: string, check: boolean = true): Promise<FileResponse | ErrorResponse> {
let purpose = 'fine-tune';
if (!fs.existsSync(fileName)) {
return {
message: 'File does not exists',
};
}

const fileType = path.extname(fileName);
if (fileType !== '.jsonl' && fileType !== '.parquet') {
return {
message: 'File type must be either .jsonl or .parquet',
};
}

if (check) {
const checkFile = await check_file(fileName);
if (!checkFile.success) {
return {
message: checkFile.message || `verification of ${fileName} failed with some unknown reason`,
};
}
}

// steps to do
// 1. check if file exists
// 2. get signed upload url
// 3. upload file
const baseUrl = core.readEnv('TOGETHER_API_BASE_URL') || 'https://api.together.ai/v1';
const apiKey =
core.readEnv('TOGETHER_API_KEY') || '3c24363cf5506cb56b48e7e99de5e182a1a544965f3d9f38833a6db35d6f7aad';

if (!apiKey) {
return {
message: 'API key is required',
};
}

const getSigned = baseURL + '/files';

try {
const params = new URLSearchParams({
file_name: fileName,
purpose: purpose,
});
const fullUrl = `${getSigned}?${params}`;
const r = await fetch(fullUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
Authorization: `Bearer ${apiKey}`,
},
redirect: 'manual',
body: params.toString(),
});

if (r.status !== 302) {
return failedUploadMessage;
}

const uploadUrl = r.headers.get('location') || '';
if (!uploadUrl || uploadUrl === '') {
return failedUploadMessage;
}
const fileId = r.headers.get('x-together-file-id') || '';
if (!fileId || fileId === '') {
return failedUploadMessage;
}

const fileStream = fs.createReadStream(fileName);
const fileSize = fs.statSync(fileName).size;

const progressStream = progress({
length: fileSize,
time: 100, // Emit progress events every 100ms
});

// Listen to progress events and log them
progressStream.on('progress', (progress) => {
displayProgress(progress.percentage);
});

let uploadedBytes = 0;
// upload the file to uploadUrl
const uploadResponse = await fetch(uploadUrl, {
method: 'PUT',
headers: {
'Content-Type': 'application/octet-stream',
},
body: fileStream.pipe(progressStream),
});

displayProgress(100);
process.stdout.write('\n');

return {
id: fileId,
object: 'file',
type: 'jsonl',
purpose: 'fine-tune',
filename: fileName,
bytes: fileSize,
line_count: 0,
processed: true,
};
} catch (error) {
if (isAxiosError(error)) {
// handle axios error here
if (error.status) {
return {
message: `failed to upload file with status ${error.status}`,
};
}
}

return {
message: 'failed to upload file',
};
}
}

async function displayProgress(progress: number) {
const barWidth = 40; // Number of characters for the progress bar
const completedBars = Math.round((progress / 100) * barWidth);
let remainingBars = barWidth - completedBars;
if (remainingBars < 0) {
remainingBars = 0;
}
const progressBar = `[${'='.repeat(completedBars)}${' '.repeat(remainingBars)}] ${progress.toFixed(2)}%`;

// Clear the line and write progress
//process.stdout.clearLine(0); //clean entire line
process.stdout.cursorTo(0);
process.stdout.write(progressBar, () => {});
await sleep(2000);
}

async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
7 changes: 7 additions & 0 deletions src/resources/files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ export class Files extends APIResource {
content(id: string, options?: Core.RequestOptions): Core.APIPromise<Response> {
return this._client.get(`/files/${id}/content`, { ...options, __binaryResponse: true });
}

/**
* Upload a file.
*/
upload(_: string): Promise<void> {
throw 'please use together-ai/lib/upload';
}
}

export interface FileObject {
Expand Down
Loading

0 comments on commit ed8622e

Please sign in to comment.