Skip to content

Commit

Permalink
fix: pipelining prepared statements (#417)
Browse files Browse the repository at this point in the history
* fix socket pipelining read

* one rfq after Parse, none for bind/descr/exec and one after each sync

* match psycopg3 wire pipeline expectations

* follow ppg3

* follow ppg3

* style

* Chageset

---------

Co-authored-by: User <[email protected]>
Co-authored-by: Sam Willis <[email protected]>
  • Loading branch information
3 people authored Nov 25, 2024
1 parent 1784d04 commit ae36974
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 85 deletions.
5 changes: 5 additions & 0 deletions .changeset/swift-news-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/pglite': patch
---

Fix a bug with pipelining prepared statements.
12 changes: 8 additions & 4 deletions packages/pglite/tests/exec-protocol.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,23 @@ describe('exec protocol', () => {
it('should perform an extended query', async () => {
const r1 = await db.execProtocol(serialize.parse({ text: 'SELECT $1' }))
const messageNames1 = r1.messages.map((msg) => msg.name)
expect(messageNames1).toEqual(['notice', 'parseComplete'])
expect(messageNames1).toEqual(['notice', 'parseComplete', 'readyForQuery'])

const r2 = await db.execProtocol(serialize.bind({ values: ['1'] }))
const messageNames2 = r2.messages.map((msg) => msg.name)
expect(messageNames2).toEqual(['notice', 'bindComplete'])
expect(messageNames2).toEqual(['notice', 'bindComplete', 'readyForQuery'])

const r3 = await db.execProtocol(serialize.describe({ type: 'P' }))
const messageNames3 = r3.messages.map((msg) => msg.name)
expect(messageNames3).toEqual(['rowDescription'])
expect(messageNames3).toEqual(['rowDescription', 'readyForQuery'])

const r4 = await db.execProtocol(serialize.execute({}))
const messageNames4 = r4.messages.map((msg) => msg.name)
expect(messageNames4).toEqual(['dataRow', 'commandComplete'])
expect(messageNames4).toEqual([
'dataRow',
'commandComplete',
'readyForQuery',
])

const r5 = await db.execProtocol(serialize.sync())
const messageNames5 = r5.messages.map((msg) => msg.name)
Expand Down
180 changes: 115 additions & 65 deletions patches/interactive_one.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,20 @@ static void io_init(bool in_auth, bool out_auth) {

SOCKET_FILE = NULL;
SOCKET_DATA = 0;
PDEBUG("# 141: io_init --------- CLIENT (ready) ---------");
PDEBUG("\n\n\n\n# 141: io_init --------- Ready for CLIENT ---------");


}

static void wait_unlock() {
int busy = 0;
while (access(PGS_OLOCK, F_OK) == 0) {
if (!(busy++ % 1110222))
printf("# 150: FIXME: busy wait lock removed %d\n", busy);
}
}










EMSCRIPTEN_KEEPALIVE int
cma_wsize = 0;
Expand All @@ -174,6 +176,7 @@ interactive_read() {

volatile int sf_connected = 0;
volatile bool sockfiles = false;
extern char * cma_port;

EMSCRIPTEN_KEEPALIVE void
interactive_one() {
Expand All @@ -182,15 +185,13 @@ interactive_one() {
StringInfoData input_message;
StringInfoData *inBuf;
FILE *stream ;
FILE *c_lock;
FILE *fp;
int packetlen;
bool is_socket = false;
bool is_wire = true;

if (is_node && is_repl) {
if (!is_node && is_repl) {

wait_unlock();

if (!MyProcPort) {
io_init(false, false);
Expand All @@ -203,11 +204,10 @@ interactive_one() {


if (!SOCKET_FILE) {
SOCKET_FILE = fopen(PGS_OUT,"w") ;
SOCKET_FILE = fopen(PGS_OLOCK, "w") ;
MyProcPort->sock = fileno(SOCKET_FILE);
}


} // is_node && is_repl


Expand All @@ -216,24 +216,55 @@ interactive_one() {
MemoryContextResetAndDeleteChildren(MessageContext);

initStringInfo(&input_message);

inBuf = &input_message;

InvalidateCatalogSnapshotConditionally();

if (send_ready_for_query)
{

// puts("postgres.c 4538-4624 TODO");
if (IsAbortedTransactionBlockState())
{
puts("@@@@ TODO 231: idle in transaction (aborted)");
}
else if (IsTransactionOrTransactionBlock())
{
puts("@@@@ TODO 235: idle in transaction");
}
else
{
if (notifyInterruptPending)
ProcessNotifyInterrupt(false);
}
send_ready_for_query = false;
}


// postgres.c 4627
DoingCommandRead = true;


#define IO ((char *)(1))
// #define IO cma_port this would be a temp fix for -O0 but less efficient than a

// in web mode, client call the wire loop itself waiting synchronously for the results
// in repl mode, the wire loop polls a pseudo socket made from incoming and outgoing files.
/*
* in web mode, client call the wire loop itself waiting synchronously for the results
* in repl mode, the wire loop polls a pseudo socket made from incoming and outgoing files. aka "socketfiles"
* always use "socketfiles" when wasi
*
*/

if (is_node && is_repl) {

// ready to read marker
if (access(PGS_ILOCK, R_OK) != 0) {
#if 0 //!defined(__wasi__)
if (!is_embed || is_repl) {
// do not try to read when lock/buffer file still there
if (!access(PGS_ILOCK, R_OK)) {
#endif

packetlen = 0;

// TODO: lock file
fp = fopen(PGS_IN, "r");

// read as a socket.
Expand Down Expand Up @@ -323,23 +354,24 @@ PDEBUG("# 324 : TODO: set a pg_main started flag");
} else {
#if PGDEBUG
fprintf(stderr, "# 331: CLI[%d] incoming=%d [%d, ", sf_connected, packetlen, firstchar);
for (int i=1;i<packetlen;i++) {
for (int i=0;i<packetlen;i++) {
int b = getc(fp);
/* skip header (size uint32) */
if (i>5) {
if (i>4) {
fprintf(stderr, "%d, ", b);
}
}
fprintf(stderr, "]\n");
#endif
}
// when using lock files
//ftruncate(filenum(fp), 0);
// when using locks
// ftruncate(filenum(fp), 0);
}
/* FD CLEANUP */
fclose(fp);
unlink(PGS_IN);

// Check if auth bypass work with socketfiles
if (packetlen) {
if (!firstchar || (firstchar==112)) {
PDEBUG("# 351: handshake/auth skip");
Expand All @@ -348,7 +380,7 @@ PDEBUG("# 324 : TODO: set a pg_main started flag");

/* else it is wire msg */
#if PGDEBUG
printf("# 352 : node+repl is wire : %c\n", firstchar);
printf("# 353 : node+repl is_wire/is_socket -> true : %c\n", firstchar);
force_echo = true;
#endif
is_socket = true;
Expand All @@ -359,11 +391,10 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
} // wire msg

} // fp data read

#if 0 //!defined(__wasi__)
} // ok lck

} // is_node && is_repl

} // !is_embed || is_repl
#endif
if (cma_rsize) {
PDEBUG("wire message in cma buffer !");
is_wire = true;
Expand All @@ -376,11 +407,11 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
}

if (!SOCKET_FILE) {
SOCKET_FILE = fopen(PGS_OUT,"w") ;
SOCKET_FILE = fopen(PGS_OLOCK, "w") ;
MyProcPort->sock = fileno(SOCKET_FILE);
}
#if PGDEBUG
printf("# fd %s: %s fd=%d\n", PGS_OUT, IO, MyProcPort->sock);
printf("# 391: fd %s: %s fd=%d is_embed=%d\n", PGS_OLOCK, IO, MyProcPort->sock, is_embed);
#endif
goto incoming;

Expand All @@ -393,6 +424,8 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
if (!c)
return;

is_repl = true;

if (is_repl) {
whereToSendOutput = DestNone;
is_wire = false;
Expand All @@ -415,11 +448,11 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
}

if (!SOCKET_FILE) {
SOCKET_FILE = fopen(PGS_OUT,"w") ;
SOCKET_FILE = fopen(PGS_OLOCK, "w") ;
MyProcPort->sock = fileno(SOCKET_FILE);
}
#if PGDEBUG
printf("# fd %s: %s fd=%d\n", PGS_OUT, IO, MyProcPort->sock);
printf("# 430: fd %s: %s fd=%d is_embed=%d\n", PGS_OLOCK, IO, MyProcPort->sock, is_embed);
#endif

}
Expand All @@ -445,8 +478,8 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
IO[0] = 0;

incoming:
#if 0 //PGDEBUG
#warning "exception handler off"
#if defined(__wasi__) //PGDEBUG
PDEBUG("# 484: sjlj exception handler off");
#else
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
Expand Down Expand Up @@ -504,47 +537,66 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
}

PG_exception_stack = &local_sigjmp_buf;
#endif
#endif // wasi

if (!ignore_till_sync)
send_ready_for_query = true;


if (force_echo) {
printf("# 501: wire=%d socket=%d repl=%c: %s", is_wire, is_socket, firstchar, inBuf->data);
printf("# 549: wire=%d socket=%d 1stchar=%c Q: %s", is_wire, is_socket, firstchar, inBuf->data);
}


if (is_wire) {
/* wire on a socket or cma */
firstchar = SocketBackend(inBuf);

} else {
/* nowire */
if (c == EOF && inBuf->len == 0) {
firstchar = EOF;
if (is_wire) {
/* wire on a socket or cma */
firstchar = SocketBackend(inBuf);

} else {
appendStringInfoChar(inBuf, (char) '\0');
firstchar = 'Q';
}
/* nowire */
if (c == EOF && inBuf->len == 0) {
firstchar = EOF;

/* stdio node repl */
if (is_repl)
whereToSendOutput = DestDebug;
}
} else {
appendStringInfoChar(inBuf, (char) '\0');
firstchar = 'Q';
}

#include "pg_proto.c"
/* stdio node repl */
if (is_repl)
whereToSendOutput = DestDebug;
}
while (1) {
if (ignore_till_sync && firstchar != EOF) {
puts("@@@@@@@@@@@@@ 573 TODO: postgres.c 4684 : continue");
} else {
#include "pg_proto.c"

/* process notifications */
ProcessClientReadInterrupt(true);
/* process notifications */
ProcessClientReadInterrupt(true);
}
if (is_wire && pq_buffer_has_data()) {
firstchar = SocketBackend(inBuf);
#if PGDEBUG
printf("583: PIPELINING [%c]!\n", firstchar);
#endif
} else {
break;
}
}

if (is_wire) {
wire_flush:
if (!ClientAuthInProgress) {
PDEBUG("# 537: end packet - sending rfq");
if (send_ready_for_query) {
PDEBUG("# 594: end packet - sending rfq");
ReadyForQuery(DestRemote);
send_ready_for_query = false;
//done at postgres.c 4623 send_ready_for_query = false;
} else {
PDEBUG("# 598: end packet - with no rfq");
}
} else {
PDEBUG("# 542: end packet (ClientAuthInProgress - no rfq) ");
PDEBUG("# 601: end packet (ClientAuthInProgress - no rfq) ");
}

if (SOCKET_DATA>0) {
Expand All @@ -556,19 +608,18 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
cma_wsize = SOCKET_DATA;
}
if (SOCKET_FILE) {
int outb = SOCKET_DATA;
fclose(SOCKET_FILE);
SOCKET_FILE = NULL;
SOCKET_DATA = 0;
if (cma_wsize)
PDEBUG("# 557: cma and sockfile ???");
PDEBUG("# 618: cma and sockfile ???");
if (sockfiles) {
PDEBUG("# 559: setting sockfile lock, ready to read");
PDEBUG(PGS_OLOCK);
c_lock = fopen(PGS_OLOCK, "w");
fclose(c_lock);
#if PGDEBUG
printf("# 621: client:ready -> read(%d) " PGS_OLOCK "->" PGS_OUT"\n", outb);
#endif
rename(PGS_OLOCK, PGS_OUT);
}
// CHECK ME 320 / 540 . only initially or after error
// send_ready_for_query = true;
}

} else {
Expand All @@ -580,7 +631,6 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
cma_rsize = 0;
IO[0] = 0;


#undef IO
}

Expand Down
Loading

0 comments on commit ae36974

Please sign in to comment.