diff --git a/src/nostrdb.c b/src/nostrdb.c index f86a14d..bc4502d 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -3980,6 +3980,7 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, static void *ndb_writer_thread(void *data) { + ndb_debug("started writer thread\n"); struct ndb_writer *writer = data; struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; struct written_note written_notes[THREAD_QUEUE_BATCH]; @@ -3999,6 +4000,7 @@ static void *ndb_writer_thread(void *data) while (!done) { txn.mdb_txn = NULL; num_notes = 0; + ndb_debug("writer waiting for items\n"); popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); ndb_debug("writer popped %d items\n", popped); @@ -4029,6 +4031,7 @@ static void *ndb_writer_thread(void *data) switch (msg->type) { case NDB_WRITER_QUIT: // quits are handled before this + ndb_debug("writer thread got quit message\n"); done = 1; continue; case NDB_WRITER_PROFILE: @@ -4242,14 +4245,18 @@ static int ndb_writer_destroy(struct ndb_writer *writer) // kill thread msg.type = NDB_WRITER_QUIT; + ndb_debug("writer: pushing quit message\n"); if (!prot_queue_push(&writer->inbox, &msg)) { // queue is too full to push quit message. just kill it. + ndb_debug("writer: terminating thread\n"); THREAD_TERMINATE(writer->thread_id); } else { + ndb_debug("writer: joining thread\n"); THREAD_FINISH(writer->thread_id); } // cleanup + ndb_debug("writer: cleaning up protected queue\n"); prot_queue_destroy(&writer->inbox); free(writer->queue_buf); @@ -4515,12 +4522,17 @@ void ndb_destroy(struct ndb *ndb) return; // ingester depends on writer and must be destroyed first + ndb_debug("destroying ingester\n"); ndb_ingester_destroy(&ndb->ingester); + ndb_debug("destroying writer\n"); ndb_writer_destroy(&ndb->writer); + ndb_debug("destroying monitor\n"); ndb_monitor_destroy(&ndb->monitor); + ndb_debug("closing env\n"); mdb_env_close(ndb->lmdb.env); + ndb_debug("ndb destroyed\n"); free(ndb); } diff --git a/src/thread.h b/src/thread.h index e798fd2..c3f976a 100644 --- a/src/thread.h +++ b/src/thread.h @@ -5,27 +5,47 @@ #include #define ErrCode() GetLastError() - #define pthread_t HANDLE - #define pthread_mutex_t HANDLE - #define pthread_cond_t HANDLE - #define pthread_cond_destroy(x) - #define pthread_mutex_unlock(x) ReleaseMutex(*x) - #define pthread_mutex_destroy(x) \ - (CloseHandle(*x) ? 0 : ErrCode()) - #define pthread_mutex_lock(x) WaitForSingleObject(*x, INFINITE) - #define pthread_mutex_init(mutex, attr) \ - ((*mutex = CreateMutex(NULL, FALSE, NULL)) ? 0 : ErrCode()) - #define pthread_cond_init(x, attr) (InitializeConditionVariable(x), 0) - #define pthread_cond_signal(x) SetEvent(*x) - #define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0) - #define THREAD_CREATE(thr,start,arg) \ - (((thr) = CreateThread(NULL, 0, start, arg, 0, NULL)) ? 0 : ErrCode()) - #define THREAD_FINISH(thr) \ - (WaitForSingleObject(thr, INFINITE) ? ErrCode() : 0) - #define THREAD_TERMINATE(thr) \ - (TerminateThread(thr, 0) ? ErrCode() : 0) - #define LOCK_MUTEX(mutex) WaitForSingleObject(mutex, INFINITE) - #define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex) +// Define POSIX-like thread types +typedef HANDLE pthread_t; +typedef CRITICAL_SECTION pthread_mutex_t; +typedef CONDITION_VARIABLE pthread_cond_t; + +#define ErrCode() GetLastError() + +// Mutex functions +#define pthread_mutex_init(mutex, attr) \ + (InitializeCriticalSection(mutex), 0) + +#define pthread_mutex_destroy(mutex) \ + (DeleteCriticalSection(mutex), 0) + +#define pthread_mutex_lock(mutex) \ + (EnterCriticalSection(mutex), 0) + +#define pthread_mutex_unlock(mutex) \ + (LeaveCriticalSection(mutex), 0) + +// Condition variable functions +#define pthread_cond_init(cond, attr) \ + (InitializeConditionVariable(cond), 0) + +#define pthread_cond_destroy(cond) + +#define pthread_cond_signal(cond) \ + (WakeConditionVariable(cond), 0) + +#define pthread_cond_wait(cond, mutex) \ + (SleepConditionVariableCS(cond, mutex, INFINITE) ? 0 : ErrCode()) + +// Thread functions +#define THREAD_CREATE(thr, start, arg) \ + (((thr = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)start, arg, 0, NULL)) != NULL) ? 0 : ErrCode()) + +#define THREAD_FINISH(thr) \ + (WaitForSingleObject(thr, INFINITE), CloseHandle(thr), 0) + +#define THREAD_TERMINATE(thr) \ + (TerminateThread(thr, 0) ? ErrCode() : 0) #else // _WIN32 #include