Skip to content

Commit

Permalink
chore: Change interface of execProtocol return value to remove duplic…
Browse files Browse the repository at this point in the history
…ation of data buffer (#377)

* Change interface of execProtocol return value to remove duplication of data buffer

* Changeset

* remove console.log
  • Loading branch information
samwillis authored Oct 14, 2024
1 parent 0c7a627 commit cf50f47
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 63 deletions.
5 changes: 5 additions & 0 deletions .changeset/good-frogs-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/pglite': patch
---

Change interface of execProtocol return value to remove duplication of data buffer
63 changes: 33 additions & 30 deletions packages/pglite/src/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import type {
Transaction,
QueryOptions,
ExecProtocolOptions,
ExecProtocolResult,
} from './interface.js'

import { serialize as serializeProtocol } from '@electric-sql/pg-protocol'
import { BackendMessage } from '@electric-sql/pg-protocol/messages'

export abstract class BasePGlite
implements Pick<PGliteInterface, 'query' | 'sql' | 'exec' | 'transaction'>
Expand All @@ -43,7 +43,7 @@ export abstract class BasePGlite
abstract execProtocol(
message: Uint8Array,
{ syncToFs, onNotice }: ExecProtocolOptions,
): Promise<Array<[BackendMessage, Uint8Array]>>
): Promise<ExecProtocolResult>

/**
* Execute a postgres wire protocol message directly without wrapping the response.
Expand Down Expand Up @@ -121,7 +121,7 @@ export abstract class BasePGlite
async #execProtocolNoSync(
message: Uint8Array,
options: ExecProtocolOptions = {},
): Promise<Array<[BackendMessage, Uint8Array]>> {
): Promise<ExecProtocolResult> {
return await this.execProtocol(message, { ...options, syncToFs: false })
}

Expand Down Expand Up @@ -207,7 +207,7 @@ export abstract class BasePGlite
let results

try {
const parseResults = await this.#execProtocolNoSync(
const { messages: parseResults } = await this.#execProtocolNoSync(
serializeProtocol.parse({ text: query, types: options?.paramTypes }),
options,
)
Expand All @@ -218,7 +218,7 @@ export abstract class BasePGlite
serializeProtocol.describe({ type: 'S' }),
options,
)
).map(([msg]) => msg),
).messages,
)

const values = params.map((param, i) => {
Expand All @@ -236,20 +236,26 @@ export abstract class BasePGlite

results = [
...parseResults,
...(await this.#execProtocolNoSync(
serializeProtocol.bind({
values,
}),
options,
)),
...(await this.#execProtocolNoSync(
serializeProtocol.describe({ type: 'P' }),
options,
)),
...(await this.#execProtocolNoSync(
serializeProtocol.execute({}),
options,
)),
...(
await this.#execProtocolNoSync(
serializeProtocol.bind({
values,
}),
options,
)
).messages,
...(
await this.#execProtocolNoSync(
serializeProtocol.describe({ type: 'P' }),
options,
)
).messages,
...(
await this.#execProtocolNoSync(
serializeProtocol.execute({}),
options,
)
).messages,
]
} finally {
await this.#execProtocolNoSync(serializeProtocol.sync(), options)
Expand All @@ -260,12 +266,7 @@ export abstract class BasePGlite
await this.syncToFs()
}
const blob = await this._getWrittenBlob()
return parseResults(
results.map(([msg]) => msg),
this.parsers,
options,
blob,
)[0] as Results<T>
return parseResults(results, this.parsers, options, blob)[0] as Results<T>
})
}

Expand All @@ -286,10 +287,12 @@ export abstract class BasePGlite
await this._handleBlob(options?.blob)
let results
try {
results = await this.#execProtocolNoSync(
serializeProtocol.query(query),
options,
)
results = (
await this.#execProtocolNoSync(
serializeProtocol.query(query),
options,
)
).messages
} finally {
await this.#execProtocolNoSync(serializeProtocol.sync(), options)
}
Expand All @@ -299,7 +302,7 @@ export abstract class BasePGlite
}
const blob = await this._getWrittenBlob()
return parseResults(
results.map(([msg]) => msg),
results,
this.parsers,
options,
blob,
Expand Down
7 changes: 6 additions & 1 deletion packages/pglite/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ export type Extensions = {
[namespace: string]: Extension | URL
}

export interface ExecProtocolResult {
messages: BackendMessage[]
data: Uint8Array
}

export interface DumpDataDirResult {
tarball: Uint8Array
extension: '.tar' | '.tgz'
Expand Down Expand Up @@ -99,7 +104,7 @@ export type PGliteInterface = {
execProtocol(
message: Uint8Array,
options?: ExecProtocolOptions,
): Promise<Array<[BackendMessage, Uint8Array]>>
): Promise<ExecProtocolResult>
listen(
channel: string,
callback: (payload: string) => void,
Expand Down
9 changes: 5 additions & 4 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
ExecProtocolOptions,
PGliteInterfaceExtensions,
Extensions,
ExecProtocolResult,
} from './interface.js'
import { BasePGlite } from './base.js'
import { loadExtensionBundle, loadExtensions } from './extensionUtils.js'
Expand Down Expand Up @@ -588,9 +589,9 @@ export class PGlite
throwOnError = true,
onNotice,
}: ExecProtocolOptions = {},
): Promise<Array<[BackendMessage, Uint8Array]>> {
): Promise<ExecProtocolResult> {
const data = await this.execProtocolRaw(message, { syncToFs })
const results: Array<[BackendMessage, Uint8Array]> = []
const results: BackendMessage[] = []

this.#protocolParser.parse(data, (msg) => {
if (msg instanceof DatabaseError) {
Expand Down Expand Up @@ -632,10 +633,10 @@ export class PGlite
queueMicrotask(() => cb(msg.channel, msg.payload))
})
}
results.push([msg, data])
results.push(msg)
})

return results
return { messages: results, data }
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/pglite/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ export async function formatQuery(
await pg.execProtocol(serializeProtocol.describe({ type: 'S' }), {
syncToFs: false,
})
).map(([msg]) => msg),
).messages,
)
} finally {
await pg.execProtocol(serializeProtocol.sync(), { syncToFs: false })
Expand Down
32 changes: 12 additions & 20 deletions packages/pglite/src/worker/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type {
DebugLevel,
ExecProtocolResult,
Extensions,
PGliteInterface,
PGliteInterfaceExtensions,
Expand All @@ -8,7 +9,6 @@ import type {
import type { PGlite } from '../pglite.js'
import { BasePGlite } from '../base.js'
import { uuid } from '../utils.js'
import type { BackendMessage } from '@electric-sql/pg-protocol/messages'

export type PGliteWorkerOptions = PGliteOptions & {
meta?: any
Expand Down Expand Up @@ -337,12 +337,8 @@ export class PGliteWorker
* @param message The postgres wire protocol message to execute
* @returns The result of the query
*/
async execProtocol(
message: Uint8Array,
): Promise<Array<[BackendMessage, Uint8Array]>> {
return (await this.#rpc('execProtocol', message)) as Array<
[BackendMessage, Uint8Array]
>
async execProtocol(message: Uint8Array): Promise<ExecProtocolResult> {
return await this.#rpc('execProtocol', message)
}

/**
Expand Down Expand Up @@ -623,19 +619,15 @@ function makeWorkerApi(tabId: string, db: PGlite) {
await db.close()
},
async execProtocol(message: Uint8Array) {
const result = await db.execProtocol(message)
return result.map(([message, data]) => {
if (data.byteLength !== data.buffer.byteLength) {
// The data is a slice of a larger buffer, this is potentially the whole
// memory of the WASM module. We copy it to a new Uint8Array and return that.
const buffer = new ArrayBuffer(data.byteLength)
const dataCopy = new Uint8Array(buffer)
dataCopy.set(data)
return [message, dataCopy]
} else {
return [message, data]
}
})
const { messages, data } = await db.execProtocol(message)
if (data.byteLength !== data.buffer.byteLength) {
const buffer = new ArrayBuffer(data.byteLength)
const dataCopy = new Uint8Array(buffer)
dataCopy.set(data)
return { messages, data: dataCopy }
} else {
return { messages, data }
}
},
async execProtocolRaw(message: Uint8Array) {
const result = await db.execProtocolRaw(message)
Expand Down
14 changes: 7 additions & 7 deletions packages/pglite/tests/exec-protocol.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ describe('exec protocol', () => {

it('should perform a simple query', async () => {
const result = await db.execProtocol(serialize.query('SELECT 1'))
const messageNames = result.map((msg) => msg[0].name)
const messageNames = result.messages.map((msg) => msg.name)
expect(messageNames).toEqual([
'rowDescription',
'dataRow',
Expand All @@ -26,31 +26,31 @@ describe('exec protocol', () => {

it('should perform an extended query', async () => {
const r1 = await db.execProtocol(serialize.parse({ text: 'SELECT $1' }))
const messageNames1 = r1.map((msg) => msg[0].name)
const messageNames1 = r1.messages.map((msg) => msg.name)
expect(messageNames1).toEqual(['notice', 'parseComplete'])

const r2 = await db.execProtocol(serialize.bind({ values: ['1'] }))
const messageNames2 = r2.map((msg) => msg[0].name)
const messageNames2 = r2.messages.map((msg) => msg.name)
expect(messageNames2).toEqual(['notice', 'bindComplete'])

const r3 = await db.execProtocol(serialize.describe({ type: 'P' }))
const messageNames3 = r3.map((msg) => msg[0].name)
const messageNames3 = r3.messages.map((msg) => msg.name)
expect(messageNames3).toEqual(['rowDescription'])

const r4 = await db.execProtocol(serialize.execute({}))
const messageNames4 = r4.map((msg) => msg[0].name)
const messageNames4 = r4.messages.map((msg) => msg.name)
expect(messageNames4).toEqual(['dataRow', 'commandComplete'])

const r5 = await db.execProtocol(serialize.sync())
const messageNames5 = r5.map((msg) => msg[0].name)
const messageNames5 = r5.messages.map((msg) => msg.name)
expect(messageNames5).toEqual(['readyForQuery'])
})

it('should handle error', async () => {
const result = await db.execProtocol(serialize.query('invalid sql'), {
throwOnError: false,
})
const messageNames = result.map((msg) => msg[0].name)
const messageNames = result.messages.map((msg) => msg.name)
expect(messageNames).toEqual(['error', 'readyForQuery'])
})

Expand Down

0 comments on commit cf50f47

Please sign in to comment.