Skip to content

Commit

Permalink
fix dropfile
Browse files Browse the repository at this point in the history
  • Loading branch information
e1arikawa committed Sep 29, 2024
1 parent e0995e8 commit cc503c7
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 41 deletions.
2 changes: 2 additions & 0 deletions lib/include/duckdb/web/io/web_filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ class WebFileSystem : public duckdb::FileSystem {
DataBuffer file_buffer);
/// Try to drop a specific file
bool TryDropFile(std::string_view file_name);
/// drop a specific file
void DropFile(std::string_view file_name);
/// Drop all files without references (including buffers)
void DropDanglingFiles();
/// Configure file statistics
Expand Down
4 changes: 4 additions & 0 deletions lib/js-stubs.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ addToLibrary({
duckdb_web_fs_file_sync: function (fileId) {
return globalThis.DUCKDB_RUNTIME.syncFile(Module, fileId);
},
duckdb_web_fs_file_drop_file__sig: 'vpi',
duckdb_web_fs_file_drop_file: function (fileName, fileNameLen) {
return globalThis.DUCKDB_RUNTIME.dropFile(Module, fileName, fileNameLen);
},
duckdb_web_fs_file_close__sig: 'vi',
duckdb_web_fs_file_close: function (fileId) {
return globalThis.DUCKDB_RUNTIME.closeFile(Module, fileId);
Expand Down
9 changes: 9 additions & 0 deletions lib/src/io/web_filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ RT_FN(void duckdb_web_fs_file_close(size_t file_id), {
auto &infos = GetLocalState();
infos.handles.erase(file_id);
});
RT_FN(void duckdb_web_fs_file_drop_file(const char *fileName, size_t pathLen), {});
RT_FN(void duckdb_web_fs_file_truncate(size_t file_id, double new_size), { GetOrOpen(file_id).Truncate(new_size); });
RT_FN(time_t duckdb_web_fs_file_get_last_modified_time(size_t file_id), {
auto &file = GetOrOpen(file_id);
Expand Down Expand Up @@ -455,6 +456,7 @@ void WebFileSystem::DropDanglingFiles() {
for (auto &[file_id, file] : files_by_id_) {
if (file->handle_count_ == 0) {
files_by_name_.erase(file->file_name_);
DropFile(file->file_name_);
if (file->data_url_.has_value()) {
files_by_url_.erase(file->data_url_.value());
}
Expand Down Expand Up @@ -483,6 +485,13 @@ bool WebFileSystem::TryDropFile(std::string_view file_name) {
return false;
}

/// drop a file
void WebFileSystem::DropFile(std::string_view file_name) {
DEBUG_TRACE();
std::string fileNameS = std::string{file_name};
duckdb_web_fs_file_drop_file(fileNameS.c_str(), fileNameS.size());
}

/// Write the global filesystem info
rapidjson::Value WebFileSystem::WriteGlobalFileInfo(rapidjson::Document &doc, uint32_t cache_epoch) {
DEBUG_TRACE();
Expand Down
21 changes: 16 additions & 5 deletions lib/src/webdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -912,18 +912,29 @@ arrow::Status WebDB::RegisterFileBuffer(std::string_view file_name, std::unique_
/// Drop all files
arrow::Status WebDB::DropFiles() {
file_page_buffer_->DropDanglingFiles();
pinned_web_files_.clear();
std::vector<std::string> files_to_drop;
for (const auto& [key, handle] : pinned_web_files_) {
files_to_drop.push_back(handle->GetName());
}
for (const auto& fileName : files_to_drop) {
arrow::Status status = DropFile(fileName);
if (!status.ok()) {
return arrow::Status::Invalid("Failed to drop file: " + fileName);
}
}
if (auto fs = io::WebFileSystem::Get()) {
fs->DropDanglingFiles();
}
return arrow::Status::OK();
}
/// Drop a file
arrow::Status WebDB::DropFile(std::string_view file_name) {
file_page_buffer_->TryDropFile(file_name);
pinned_web_files_.erase(file_name);
arrow::Status WebDB::DropFile(std::string_view fileName) {
file_page_buffer_->TryDropFile(fileName);
pinned_web_files_.erase(fileName);
if (auto fs = io::WebFileSystem::Get()) {
if (!fs->TryDropFile(file_name)) {
if (fs->TryDropFile(fileName)) {
fs->DropFile(fileName);
} else {
return arrow::Status::Invalid("file is in use");
}
}
Expand Down
6 changes: 5 additions & 1 deletion packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,11 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && handle instanceof FileSystemFileHandle) {
// handle is an async handle, should convert to sync handle
const fileHandle: FileSystemFileHandle = handle as any;
handle = (await fileHandle.createSyncAccessHandle()) as any;
try {
handle = (await fileHandle.createSyncAccessHandle()) as any;
} catch (e: any) {
throw new Error( e.message + ":" + name );
}
}
const [s, d, n] = callSRet(
this.mod,
Expand Down
3 changes: 3 additions & 0 deletions packages/duckdb-wasm/src/bindings/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export interface DuckDBRuntime {
openFile(mod: DuckDBModule, fileId: number, flags: FileFlags): void;
syncFile(mod: DuckDBModule, fileId: number): void;
closeFile(mod: DuckDBModule, fileId: number): void;
dropFile(mod: DuckDBModule, fileNamePtr: number, fileNameLen:number): void;
getLastFileModificationTime(mod: DuckDBModule, fileId: number): number;
truncateFile(mod: DuckDBModule, fileId: number, newSize: number): void;
readFile(mod: DuckDBModule, fileId: number, buffer: number, bytes: number, location: number): number;
Expand All @@ -155,6 +156,7 @@ export interface DuckDBRuntime {
checkFile(mod: DuckDBModule, pathPtr: number, pathLen: number): boolean;
removeFile(mod: DuckDBModule, pathPtr: number, pathLen: number): void;

// Prepare a file handle that could only be acquired aschronously
prepareDBFileHandle?: (path: string, protocol: DuckDBDataProtocol) => Promise<PreparedDBFileHandle[]>;

// Call a scalar UDF function
Expand All @@ -177,6 +179,7 @@ export const DEFAULT_RUNTIME: DuckDBRuntime = {
openFile: (_mod: DuckDBModule, _fileId: number, flags: FileFlags): void => {},
syncFile: (_mod: DuckDBModule, _fileId: number): void => {},
closeFile: (_mod: DuckDBModule, _fileId: number): void => {},
dropFile: (_mod: DuckDBModule, _fileNamePtr: number, _fileNameLen:number): void => {},
getLastFileModificationTime: (_mod: DuckDBModule, _fileId: number): number => {
return 0;
},
Expand Down
50 changes: 35 additions & 15 deletions packages/duckdb-wasm/src/bindings/runtime_browser.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { StatusCode } from '../status';
import { addS3Headers, getHTTPUrl } from '../utils';
import {StatusCode} from '../status';
import {addS3Headers, getHTTPUrl} from '../utils';

import {
callSRet,
Expand All @@ -23,7 +23,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
_files: Map<string, any>;
_fileInfoCache: Map<number, DuckDBFileInfo>;
_globalFileInfo: DuckDBGlobalFileInfo | null;
_preparedHandles: Record<string, any>;
_preparedHandles: Record<string, FileSystemSyncAccessHandle>;

getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null;
getGlobalFileInfo(mod: DuckDBModule): DuckDBGlobalFileInfo | null;
Expand Down Expand Up @@ -93,7 +93,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
if (info == null) {
return null;
}
BROWSER_RUNTIME._globalFileInfo = { ...info, blob: null } as DuckDBGlobalFileInfo;
BROWSER_RUNTIME._globalFileInfo = { ...info, blob: null} as DuckDBGlobalFileInfo;

return BROWSER_RUNTIME._globalFileInfo;
} catch (e: any) {
Expand Down Expand Up @@ -137,13 +137,17 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
throw e;
});
const handle = await fileHandle.createSyncAccessHandle();
BROWSER_RUNTIME._preparedHandles[path] = handle;
return {
path,
handle,
fromCached: false,
};
try {
const handle = await fileHandle.createSyncAccessHandle();
BROWSER_RUNTIME._preparedHandles[path] = handle;
return {
path,
handle,
fromCached: false,
};
} catch (e: any) {
throw new Error(e.message + ":" + name);
}
};
const result: PreparedDBFileHandle[] = [];
for (const filePath of filePaths) {
Expand Down Expand Up @@ -485,9 +489,25 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
handle.flush();
handle.close();
BROWSER_RUNTIME._files.delete(file.fileName);
return handle.flush();
}
}
},
dropFile: (mod: DuckDBModule, fileNamePtr: number, fileNameLen: number) => {
const fileName = readString(mod, fileNamePtr, fileNameLen);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(fileName);
if (handle) {
BROWSER_RUNTIME._files.delete(fileName);
if (handle instanceof FileSystemSyncAccessHandle) {
try {
handle.flush();
handle.close();
} catch (e: any) {
throw new Error(`Cannot drop file with name: ${fileName}`);
}
}
if (handle instanceof Blob) {
// nothing
}
}
},
Expand Down Expand Up @@ -624,7 +644,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
const input = mod.HEAPU8.subarray(buf, buf + bytes);
return handle.write(input, { at: location });
return handle.write(input, {at: location});
}
}
return 0;
Expand Down
1 change: 1 addition & 0 deletions packages/duckdb-wasm/src/bindings/runtime_node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export const NODE_RUNTIME: DuckDBRuntime & {
}
return 0;
},
dropFile: (mod: DuckDBModule, _fileNamePtr: number, _fileNameLen:number) => {},
truncateFile: (mod: DuckDBModule, fileId: number, newSize: number) => {
try {
const file = NODE_RUNTIME.resolveFileInfo(mod, fileId);
Expand Down
131 changes: 111 additions & 20 deletions packages/duckdb-wasm/test/opfs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo
removeFiles();
});

describe('Load Data', () => {
describe('Load Data in OPFS', () => {
it('Imporet Small Parquet file', async () => {
await conn.send(`CREATE TABLE stu AS SELECT * FROM "${baseDir}/uni/studenten.parquet"`);
await conn.send(`CHECKPOINT;`);
Expand All @@ -71,7 +71,7 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo
expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);
});

it('Load Existing DB File in OPFS', async () => {
it('Load Existing DB File', async () => {
await conn.send(`CREATE TABLE tmp AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`);
await conn.send(`CHECKPOINT;`);
await conn.close();
Expand All @@ -96,7 +96,57 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo
expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);
});

it('Export as CSV to OPFS + Load CSV that are already in OPFS', async () => {
it('Load Parquet file that are already', async () => {
const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res =>
res.arrayBuffer(),
);
const opfsRoot = await navigator.storage.getDirectory();
const fileHandle = await opfsRoot.getFileHandle('test.parquet', {create: true});
const writable = await fileHandle.createWritable();
await writable.write(parquetBuffer);
await writable.close();

await db.registerFileHandle('test.parquet', fileHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true);
await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`);
await conn.send(`CHECKPOINT;`);
await conn.send(`CREATE TABLE lineitem2 AS SELECT * FROM read_parquet('test.parquet')`);
await conn.send(`CHECKPOINT;`);
await conn.send(`CREATE TABLE lineitem3 AS SELECT * FROM read_parquet('test.parquet')`);
await conn.send(`CHECKPOINT;`);

{
const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`);
const batches1 = [];
for await (const batch of result1) {
batches1.push(batch);
}
const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1);
expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);
}

{
const result2 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem2;`);
const batches2 = [];
for await (const batch of result2) {
batches2.push(batch);
}
const table2 = await new arrow.Table<{ cnt: arrow.Int }>(batches2);
expect(table2.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);
}

{
const result3 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem3;`);
const batches3 = [];
for await (const batch of result3) {
batches3.push(batch);
}
const table3 = await new arrow.Table<{ cnt: arrow.Int }>(batches3);
expect(table3.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);
}

});

it('Drop File + Export as CSV to OPFS + Load CSV', async () => {
const opfsRoot = await navigator.storage.getDirectory();
const testHandle = await opfsRoot.getFileHandle('test.csv', {create: true});
await db.registerFileHandle('test.csv', testHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true);
Expand All @@ -117,29 +167,64 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo
}
const table = await new arrow.Table<{ cnt: arrow.Int }>(batches);
expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);

await db.dropFile('test.csv');
});

it('Load Parquet file that are already in OPFS', async () => {
const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res =>
res.arrayBuffer(),
);

it('Drop Files + Export as CSV to OPFS + Load CSV', async () => {
const opfsRoot = await navigator.storage.getDirectory();
const fileHandle = await opfsRoot.getFileHandle('test.parquet', {create: true});
const writable = await fileHandle.createWritable();
await writable.write(parquetBuffer);
await writable.close();
const testHandle1 = await opfsRoot.getFileHandle('test1.csv', {create: true});
const testHandle2 = await opfsRoot.getFileHandle('test2.csv', {create: true});
const testHandle3 = await opfsRoot.getFileHandle('test3.csv', {create: true});
await db.registerFileHandle('test1.csv', testHandle1, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true);
await db.registerFileHandle('test2.csv', testHandle2, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true);
await db.registerFileHandle('test3.csv', testHandle3, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true);

await db.registerFileHandle('test.parquet', fileHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true);
await conn.send(`CREATE TABLE lineitem AS SELECT * FROM read_parquet('test.parquet')`);
await conn.send(`CHECKPOINT;`);
await conn.send(`CREATE TABLE zzz AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`);
await conn.send(`COPY (SELECT * FROM zzz) TO 'test1.csv'`);
await conn.send(`COPY (SELECT * FROM zzz) TO 'test2.csv'`);
await conn.send(`COPY (SELECT * FROM zzz) TO 'test3.csv'`);
await conn.close();

const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem;`);
const batches = [];
for await (const batch of result) {
batches.push(batch);
await db.dropFiles();
await db.reset();

await db.open({});
conn = await db.connect();
await db.registerFileHandle('test1.csv', testHandle1, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true);
await db.registerFileHandle('test2.csv', testHandle2, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true);
await db.registerFileHandle('test3.csv', testHandle3, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true);

{
const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test1.csv';`);
const batches1 = [];
for await (const batch of result1) {
batches1.push(batch);
}
const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1);
expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);
}
const table = await new arrow.Table<{ cnt: arrow.Int }>(batches);
expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);
{
const result2 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test2.csv';`);
const batches2 = [];
for await (const batch of result2) {
batches2.push(batch);
}
const table2 = await new arrow.Table<{ cnt: arrow.Int }>(batches2);
expect(table2.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);
}
{
const result3 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test3.csv';`);
const batches3 = [];
for await (const batch of result3) {
batches3.push(batch);
}
const table3 = await new arrow.Table<{ cnt: arrow.Int }>(batches3);
expect(table3.getChildAt(0)?.get(0)).toBeGreaterThan(60_000);
}

await db.dropFiles();
});
});

Expand All @@ -151,6 +236,12 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo
});
await opfsRoot.removeEntry('test.csv').catch(() => {
});
await opfsRoot.removeEntry('test1.csv').catch(() => {
});
await opfsRoot.removeEntry('test2.csv').catch(() => {
});
await opfsRoot.removeEntry('test3.csv').catch(() => {
});
await opfsRoot.removeEntry('test.parquet').catch(() => {
});
}
Expand Down

0 comments on commit cc503c7

Please sign in to comment.