Skip to content

Commit

Permalink
loop change
Browse files Browse the repository at this point in the history
  • Loading branch information
pmp-p committed Nov 25, 2024
1 parent 0a9eb99 commit 97d1cf2
Show file tree
Hide file tree
Showing 3 changed files with 507 additions and 335 deletions.
5 changes: 1 addition & 4 deletions cibuild/linkimports.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ echo "============= link imports : begin ==============="
pushd ${WORKSPACE}
> patches/imports/pgcore

echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
cp -v /tmp/arrays.so /tmp/pglite/lib/postgresql/arrays.so
echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"

for extra_pg_so in $(find $PGROOT/lib/postgresql/|grep \.so$)
do
SOBASE=patches/imports.pgcore/$(basename $extra_pg_so .so)
Expand Down Expand Up @@ -69,6 +65,7 @@ _lowerstr
_main
_pg_getport
_pg_initdb
_pg_initdb_main
_pg_shutdown
_readstoplist
_searchstoplist
Expand Down
172 changes: 112 additions & 60 deletions patches/interactive_one.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,20 @@ static void io_init(bool in_auth, bool out_auth) {

SOCKET_FILE = NULL;
SOCKET_DATA = 0;
PDEBUG("# 141: io_init --------- CLIENT (ready) ---------");
PDEBUG("\n\n\n\n# 141: io_init --------- Ready for CLIENT ---------");


}

/*
static void wait_unlock() {
int busy = 0;
while (access(PGS_OLOCK, F_OK) == 0) {
if (!(busy++ % 1110222))
printf("# 150: FIXME: busy wait lock removed %d\n", busy);
}
}
*/

EMSCRIPTEN_KEEPALIVE int
cma_wsize = 0;
Expand All @@ -174,6 +176,7 @@ interactive_read() {

volatile int sf_connected = 0;
volatile bool sockfiles = false;
extern char * cma_port;

EMSCRIPTEN_KEEPALIVE void
interactive_one() {
Expand All @@ -182,15 +185,14 @@ interactive_one() {
StringInfoData input_message;
StringInfoData *inBuf;
FILE *stream ;
FILE *c_lock;
FILE *fp;
int packetlen;
bool is_socket = false;
bool is_wire = true;

if (is_node && is_repl) {
// if (!is_embed && is_repl) {

wait_unlock();
//wait_unlock();

if (!MyProcPort) {
io_init(false, false);
Expand All @@ -203,37 +205,68 @@ interactive_one() {


if (!SOCKET_FILE) {
SOCKET_FILE = fopen(PGS_OUT,"w") ;
SOCKET_FILE = fopen(PGS_OLOCK, "w") ;
MyProcPort->sock = fileno(SOCKET_FILE);
}


} // is_node && is_repl
// } // is_node && is_repl


doing_extended_query_message = false;
MemoryContextSwitchTo(MessageContext);
MemoryContextResetAndDeleteChildren(MessageContext);

initStringInfo(&input_message);

inBuf = &input_message;

InvalidateCatalogSnapshotConditionally();

if (send_ready_for_query)
{

// puts("postgres.c 4538-4624 TODO");
if (IsAbortedTransactionBlockState())
{
puts("@@@@ TODO 231: idle in transaction (aborted)");
}
else if (IsTransactionOrTransactionBlock())
{
puts("@@@@ TODO 235: idle in transaction");
}
else
{
if (notifyInterruptPending)
ProcessNotifyInterrupt(false);
}
send_ready_for_query = false;
}


// postgres.c 4627
DoingCommandRead = true;


#define IO ((char *)(1))
// #define IO ((char *)(1))
#define IO cma_port

// in web mode, client call the wire loop itself waiting synchronously for the results
// in repl mode, the wire loop polls a pseudo socket made from incoming and outgoing files.
/*
* in web mode, client call the wire loop itself waiting synchronously for the results
* in repl mode, the wire loop polls a pseudo socket made from incoming and outgoing files. aka "socketfiles"
* always use "socketfiles" when wasi
*
*/

if (is_node && is_repl) {

// ready to read marker
if (access(PGS_ILOCK, R_OK) != 0) {
#if 0 //!defined(__wasi__)
if (!is_embed || is_repl) {
// do not try to read when lock/buffer file still there
if (!access(PGS_ILOCK, R_OK)) {
#endif

packetlen = 0;

// TODO: lock file
fp = fopen(PGS_IN, "r");

// read as a socket.
Expand Down Expand Up @@ -321,25 +354,26 @@ PDEBUG("# 324 : TODO: set a pg_main started flag");
send_ready_for_query = true;
} // auth
} else {
#if PGDEBUG
#if 0 // PGDEBUG
fprintf(stderr, "# 331: CLI[%d] incoming=%d [%d, ", sf_connected, packetlen, firstchar);
for (int i=1;i<packetlen;i++) {
for (int i=0;i<packetlen;i++) {
int b = getc(fp);
/* skip header (size uint32) */
if (i>5) {
if (i>4) {
fprintf(stderr, "%d, ", b);
}
}
fprintf(stderr, "]\n");
#endif
}
// when using lock files
//ftruncate(filenum(fp), 0);
// when using locks
// ftruncate(filenum(fp), 0);
}
/* FD CLEANUP */
fclose(fp);
unlink(PGS_IN);

// Check if auth bypass work with socketfiles
if (packetlen) {
if (!firstchar || (firstchar==112)) {
PDEBUG("# 351: handshake/auth skip");
Expand All @@ -348,7 +382,7 @@ PDEBUG("# 324 : TODO: set a pg_main started flag");

/* else it is wire msg */
#if PGDEBUG
printf("# 352 : node+repl is wire : %c\n", firstchar);
printf("# 353 : node+repl is_wire/is_socket -> true : %c\n", firstchar);
force_echo = true;
#endif
is_socket = true;
Expand All @@ -359,11 +393,10 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
} // wire msg

} // fp data read

#if 0 //!defined(__wasi__)
} // ok lck

} // is_node && is_repl

} // !is_embed || is_repl
#endif
if (cma_rsize) {
PDEBUG("wire message in cma buffer !");
is_wire = true;
Expand All @@ -376,11 +409,11 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
}

if (!SOCKET_FILE) {
SOCKET_FILE = fopen(PGS_OUT,"w") ;
SOCKET_FILE = fopen(PGS_OLOCK, "w") ;
MyProcPort->sock = fileno(SOCKET_FILE);
}
#if PGDEBUG
printf("# fd %s: %s fd=%d\n", PGS_OUT, IO, MyProcPort->sock);
printf("# 391: fd %s: %s fd=%d is_embed=%d\n", PGS_OLOCK, IO, MyProcPort->sock, is_embed);
#endif
goto incoming;

Expand All @@ -393,6 +426,8 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
if (!c)
return;

is_repl = true;

if (is_repl) {
whereToSendOutput = DestNone;
is_wire = false;
Expand All @@ -415,11 +450,11 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
}

if (!SOCKET_FILE) {
SOCKET_FILE = fopen(PGS_OUT,"w") ;
SOCKET_FILE = fopen(PGS_OLOCK, "w") ;
MyProcPort->sock = fileno(SOCKET_FILE);
}
#if PGDEBUG
printf("# fd %s: %s fd=%d\n", PGS_OUT, IO, MyProcPort->sock);
printf("# 430: fd %s: %s fd=%d is_embed=%d\n", PGS_OLOCK, IO, MyProcPort->sock, is_embed);
#endif

}
Expand All @@ -445,8 +480,8 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
IO[0] = 0;

incoming:
#if 0 //PGDEBUG
#warning "exception handler off"
#if defined(__wasi__) //PGDEBUG
PDEBUG("# 484: sjlj exception handler off");
#else
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
Expand Down Expand Up @@ -504,47 +539,66 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
}

PG_exception_stack = &local_sigjmp_buf;
#endif
#endif // wasi

if (!ignore_till_sync)
send_ready_for_query = true;


if (force_echo) {
printf("# 501: wire=%d socket=%d repl=%c: %s", is_wire, is_socket, firstchar, inBuf->data);
printf("# 549: wire=%d socket=%d 1stchar=%c Q: %s", is_wire, is_socket, firstchar, inBuf->data);
}


if (is_wire) {
/* wire on a socket or cma */
firstchar = SocketBackend(inBuf);

} else {
/* nowire */
if (c == EOF && inBuf->len == 0) {
firstchar = EOF;
if (is_wire) {
/* wire on a socket or cma */
firstchar = SocketBackend(inBuf);

} else {
appendStringInfoChar(inBuf, (char) '\0');
firstchar = 'Q';
}
/* nowire */
if (c == EOF && inBuf->len == 0) {
firstchar = EOF;

/* stdio node repl */
if (is_repl)
whereToSendOutput = DestDebug;
}
} else {
appendStringInfoChar(inBuf, (char) '\0');
firstchar = 'Q';
}

#include "pg_proto.c"
/* stdio node repl */
if (is_repl)
whereToSendOutput = DestDebug;
}
while (1) {
if (ignore_till_sync && firstchar != EOF) {
puts("@@@@@@@@@@@@@ 573 TODO: postgres.c 4684 : continue");
} else {
#include "pg_proto.c"

/* process notifications */
ProcessClientReadInterrupt(true);
/* process notifications */
ProcessClientReadInterrupt(true);
}
if (is_wire && pq_buffer_has_data()) {
firstchar = SocketBackend(inBuf);
#if PGDEBUG
printf("583: PIPELINING [%c]!\n", firstchar);
#endif
} else {
break;
}
}

if (is_wire) {
wire_flush:
if (!ClientAuthInProgress) {
PDEBUG("# 537: end packet - sending rfq");
if (send_ready_for_query) {
PDEBUG("# 594: end packet - sending rfq");
ReadyForQuery(DestRemote);
send_ready_for_query = false;
//done at postgres.c 4623 send_ready_for_query = false;
} else {
PDEBUG("# 598: end packet - with no rfq");
}
} else {
PDEBUG("# 542: end packet (ClientAuthInProgress - no rfq) ");
PDEBUG("# 601: end packet (ClientAuthInProgress - no rfq) ");
}

if (SOCKET_DATA>0) {
Expand All @@ -556,19 +610,18 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
cma_wsize = SOCKET_DATA;
}
if (SOCKET_FILE) {
int outb = SOCKET_DATA;
fclose(SOCKET_FILE);
SOCKET_FILE = NULL;
SOCKET_DATA = 0;
if (cma_wsize)
PDEBUG("# 557: cma and sockfile ???");
PDEBUG("# 618: cma and sockfile ???");
if (sockfiles) {
PDEBUG("# 559: setting sockfile lock, ready to read");
PDEBUG(PGS_OLOCK);
c_lock = fopen(PGS_OLOCK, "w");
fclose(c_lock);
#if PGDEBUG
printf("# 621: client:ready -> read(%d) " PGS_OLOCK "->" PGS_OUT"\n", outb);
#endif
rename(PGS_OLOCK, PGS_OUT);
}
// CHECK ME 320 / 540 . only initially or after error
// send_ready_for_query = true;
}

} else {
Expand All @@ -580,7 +633,6 @@ printf("# 352 : node+repl is wire : %c\n", firstchar);
cma_rsize = 0;
IO[0] = 0;


#undef IO
}

Expand Down
Loading

0 comments on commit 97d1cf2

Please sign in to comment.