Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
samwillis committed Oct 7, 2024
1 parent 47e1ef5 commit 3d637d1
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 28 deletions.
2 changes: 1 addition & 1 deletion packages/pglite/examples/basic.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ <h1>PGlite Basic Example</h1>
console.log("Starting...");

// In-memory database:
const pg = new PGlite();
const pg = new PGlite({ debug: 2 });
// Or, stored in indexedDB:
// const pg = new PGlite('pgdata');

Expand Down
160 changes: 133 additions & 27 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import {
NotificationResponseMessage,
} from '@electric-sql/pg-protocol/messages'

const SOCKET_FILE = {
ILOCK: "/tmp/pglite/base/.s.PGSQL.5432.lock.in",
IN: "/tmp/pglite/base/.s.PGSQL.5432.in",
OLOCK: "/tmp/pglite/base/.s.PGSQL.5432.lock.out",
OUT: "/tmp/pglite/base/.s.PGSQL.5432.out",
}

export class PGlite
extends BasePGlite
implements PGliteInterface, AsyncDisposable
Expand Down Expand Up @@ -54,10 +61,13 @@ export class PGlite

#protocolParser = new ProtocolParser()

// These are the current ArrayBuffer that is being read or written to
#queryInBuffer?: ArrayBuffer
#queryOutChunks?: Uint8Array[]

// These are the current /dev/blob ArrayBuffer that is being read or written to
// during a query, such as COPY FROM or COPY TO.
#queryReadBuffer?: ArrayBuffer
#queryWriteChunks?: Uint8Array[]
#devBlobReadBuffer?: ArrayBuffer
#devBlobWriteChunks?: Uint8Array[]

#notifyListeners = new Map<string, Set<(payload: string) => void>>()
#globalNotifyListeners = new Set<(channel: string, payload: string) => void>()
Expand Down Expand Up @@ -243,7 +253,7 @@ export class PGlite
length: number,
position: number,
) => {
const buf = this.#queryReadBuffer
const buf = this.#devBlobReadBuffer
if (!buf) {
throw new Error(
'No /dev/blob File or Blob provided to read from',
Expand All @@ -264,12 +274,14 @@ export class PGlite
length: number,
_position: number,
) => {
this.#queryWriteChunks ??= []
this.#queryWriteChunks.push(buffer.slice(offset, offset + length))
this.#devBlobWriteChunks ??= []
this.#devBlobWriteChunks.push(
buffer.slice(offset, offset + length),
)
return length
},
llseek: (stream: any, offset: number, whence: number) => {
const buf = this.#queryReadBuffer
const buf = this.#devBlobReadBuffer
if (!buf) {
throw new Error('No /dev/blob File or Blob provided to llseek')
}
Expand All @@ -288,6 +300,107 @@ export class PGlite
mod.FS.registerDevice(devId, devOpt)
mod.FS.mkdev('/dev/blob', devId)
},
(mod: any) => {
console.log('registering SOCKET_FILE.IN device')
// Register SOCKET_FILE.IN device
const devId = mod.FS.makedev(63, 0)
const devOpt = {
open: (_stream: any) => {},
close: (_stream: any) => {},
read: (
_stream: any,
buffer: Uint8Array,
offset: number,
length: number,
position: number,
) => {
const buf = this.#queryInBuffer
if (!buf) {
throw new Error(`No ${SOCKET_FILE.IN} Buffer provided to read from`)
}
const contents = new Uint8Array(buf)
if (position >= contents.length) return 0
const size = Math.min(contents.length - position, length)
for (let i = 0; i < size; i++) {
buffer[offset + i] = contents[position + i]
}
return size
},
write: (
_stream: any,
_buffer: Uint8Array,
_offset: number,
_length: number,
_position: number,
) => {
throw new Error('Not implemented')
},
llseek: (stream: any, offset: number, whence: number) => {
const buf = this.#queryInBuffer
if (!buf) {
throw new Error(`No ${SOCKET_FILE.IN} Buffer provided to llseek`)
}
let position = offset
if (whence === 1) {
position += stream.position
} else if (whence === 2) {
position = new Uint8Array(buf).length
}
if (position < 0) {
throw new mod.FS.ErrnoError(28)
}
return position
},
}
console.log('registering SOCKET_FILE.IN device 2')
mod.FS.registerDevice(devId, devOpt)
console.log('registering SOCKET_FILE.IN device 3')
mod.FS.mkdir('/tmp/pglite/base/')
mod.FS.mkdev(SOCKET_FILE.IN, devId)
console.log('registering SOCKET_FILE.IN device 4')
// mod.FS.mkdev(SOCKET_FILE.OLOCK, devId)
console.log('registered SOCKET_FILE.IN device')
},
(mod: any) => {
console.log('registering SOCKET_FILE.OUT device')
// Register SOCKET_FILE.OUT device
const devId = mod.FS.makedev(62, 0)
const devOpt = {
open: (_stream: any) => {},
close: (_stream: any) => {},
read: (
_stream: any,
_buffer: Uint8Array,
_offset: number,
_length: number,
_position: number,
) => {
throw new Error('Not implemented')
},
write: (
_stream: any,
buffer: Uint8Array,
offset: number,
length: number,
_position: number,
) => {
this.#queryOutChunks ??= []
this.#queryOutChunks.push(buffer.slice(offset, offset + length))
return length
},
llseek: (_stream: any, _offset: number, _whence: number) => {
throw new Error('Not implemented')
},
}
console.log('registering SOCKET_FILE.OUT device 2')
mod.FS.registerDevice(devId, devOpt)
console.log('registering SOCKET_FILE.OUT device 3')
mod.FS.mkdir('/tmp/pglite/base/')
mod.FS.mkdev(SOCKET_FILE.OUT, devId)
console.log('registering SOCKET_FILE.OUT device 4')
// mod.FS.mkdev(SOCKET_FILE.OLOCK, devId)
console.log('registered SOCKET_FILE.OUT device')
},
],
}

Expand Down Expand Up @@ -492,26 +605,26 @@ export class PGlite
* @param file The file to handle
*/
async _handleBlob(blob?: File | Blob) {
this.#queryReadBuffer = blob ? await blob.arrayBuffer() : undefined
this.#devBlobReadBuffer = blob ? await blob.arrayBuffer() : undefined
}

/**
* Cleanup the current file
*/
async _cleanupBlob() {
this.#queryReadBuffer = undefined
this.#devBlobReadBuffer = undefined
}

/**
* Get the written blob from the current query
* @returns The written blob
*/
async _getWrittenBlob(): Promise<Blob | undefined> {
if (!this.#queryWriteChunks) {
if (!this.#devBlobWriteChunks) {
return undefined
}
const blob = new Blob(this.#queryWriteChunks)
this.#queryWriteChunks = undefined
const blob = new Blob(this.#devBlobWriteChunks)
this.#devBlobWriteChunks = undefined
return blob
}

Expand Down Expand Up @@ -547,29 +660,22 @@ export class PGlite
message: Uint8Array,
{ syncToFs = true }: ExecProtocolOptions = {},
) {
const msg_len = message.length
const mod = this.mod!

// >0 set buffer content type to wire protocol
// set buffer size so answer will be at size+0x2 pointer addr
mod._interactive_write(msg_len)

// copy whole buffer at addr 0x1
mod.HEAPU8.set(message, 1)
// Make query available at /dev/query-in
this.#queryInBuffer = message

// execute the message
mod._interactive_one()
this.#queryOutChunks = []
this.mod!._interactive_one()

// Read responses from the buffer
const msg_start = msg_len + 2
const msg_end = msg_start + mod._interactive_read()
const data = mod.HEAPU8.subarray(msg_start, msg_end)
// Read responses from SOCKET_FILE.OUT
const data = await new Blob(this.#queryOutChunks).arrayBuffer()
this.#queryOutChunks = undefined

if (syncToFs) {
await this.syncToFs()
}

return data
return new Uint8Array(data)
}

/**
Expand Down

0 comments on commit 3d637d1

Please sign in to comment.