diff --git a/src/nostrdb.c b/src/nostrdb.c index 704f114..7b52340 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -57,7 +57,7 @@ static const int DEFAULT_QUEUE_SIZE = 32768; #define NDB_PARSED_TAGS (1 << 6) #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) -typedef int (*ndb_migrate_fn)(struct ndb *); +typedef int (*ndb_migrate_fn)(struct ndb_txn *); typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len, int word_index); @@ -135,6 +135,7 @@ enum ndb_writer_msgtype { NDB_WRITER_DBMETA, // write ndb metadata NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched NDB_WRITER_BLOCKS, // write parsed note blocks + NDB_WRITER_MIGRATE, // migrate the database }; // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) @@ -1589,47 +1590,33 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, // // This was before we had note_profile_pubkey{,_kind} indices. Let's create them. -static int ndb_migrate_profile_indices(struct ndb *ndb) +static int ndb_migrate_profile_indices(struct ndb_txn *txn) { - struct ndb_txn txn; int count; - if (!ndb_begin_rw_query(ndb, &txn)) { - fprintf(stderr, "ndb_migrate_profile_indices: ndb_begin_rw_query failed\n"); - return 0; - } - enum ndb_dbs indices[] = {NDB_DB_NOTE_PUBKEY, NDB_DB_NOTE_PUBKEY_KIND}; - if ((count = ndb_rebuild_note_indices(&txn, indices, 2)) != -1) { + if ((count = ndb_rebuild_note_indices(txn, indices, 2)) != -1) { fprintf(stderr, "migrated %d notes to have pubkey and pubkey_kind indices\n", count); - ndb_end_query(&txn); return 1; } else { fprintf(stderr, "error migrating notes to have pubkey and pubkey_kind indices, aborting.\n"); - mdb_txn_abort(txn.mdb_txn); return 0; } } -static int ndb_migrate_user_search_indices(struct ndb *ndb) +static int ndb_migrate_user_search_indices(struct ndb_txn *txn) { int rc; MDB_cursor *cur; MDB_val k, v; void *profile_root; NdbProfileRecord_table_t record; - struct ndb_txn txn; struct ndb_note *note; uint64_t note_key, profile_key; size_t len; int count; - if (!ndb_begin_rw_query(ndb, &txn)) { - fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n"); - return 0; - } - - if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { + if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); return 0; } @@ -1642,18 +1629,16 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb) profile_key = *((uint64_t*)k.mv_data); record = NdbProfileRecord_as_root(profile_root); note_key = NdbProfileRecord_note_key(record); - note = ndb_get_note_by_key(&txn, note_key, &len); + note = ndb_get_note_by_key(txn, note_key, &len); if (note == NULL) { - fprintf(stderr, "ndb_migrate_user_search_indices: note lookup failed\n"); - return 0; + continue; } - if (!ndb_write_profile_search_indices(&txn, note, profile_key, + if (!ndb_write_profile_search_indices(txn, note, profile_key, profile_root)) { - fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); - return 0; + continue; } count++; @@ -1663,51 +1648,33 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb) mdb_cursor_close(cur); - ndb_end_query(&txn); - return 1; } -static int ndb_migrate_lower_user_search_indices(struct ndb *ndb) +static int ndb_migrate_lower_user_search_indices(struct ndb_txn *txn) { - MDB_txn *txn; - - if (mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn)) { - fprintf(stderr, "ndb_migrate_lower_user_search_indices: ndb_txn_begin failed\n"); - return 0; - } - // just drop the search db so we can rebuild it - if (mdb_drop(txn, ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH], 0)) { + if (mdb_drop(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 0)) { fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n"); return 0; } - mdb_txn_commit(txn); - - return ndb_migrate_user_search_indices(ndb); + return ndb_migrate_user_search_indices(txn); } int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile); -int ndb_db_version(struct ndb *ndb) +int ndb_db_version(struct ndb_txn *txn) { - int rc; uint64_t version, version_key; MDB_val k, v; - MDB_txn *txn; version_key = NDB_META_KEY_VERSION; k.mv_data = &version_key; k.mv_size = sizeof(version_key); - if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) { - fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc); - return -1; - } - - if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) { + if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &k, &v)) { version = -1; } else { if (v.mv_size != 8) { @@ -1717,7 +1684,6 @@ int ndb_db_version(struct ndb *ndb) version = *((uint64_t*)v.mv_data); } - mdb_txn_abort(txn); return version; } @@ -1857,30 +1823,29 @@ static inline int ndb_writer_queue_msg(struct ndb_writer *writer, return prot_queue_push(&writer->inbox, msg); } -static int ndb_migrate_utf8_profile_names(struct ndb *ndb) +static uint64_t ndb_write_note_and_profile(struct ndb_txn *txn, struct ndb_writer_profile *profile, unsigned char *scratch, size_t scratch_size, uint32_t ndb_flags); +static int ndb_migrate_utf8_profile_names(struct ndb_txn *txn) { int rc; MDB_cursor *cur; MDB_val k, v; void *profile_root; NdbProfileRecord_table_t record; - struct ndb_txn txn; struct ndb_note *note, *copied_note; uint64_t note_key; size_t len; - int count, failed; - struct ndb_writer_msg out; + int count, failed, ret; + struct ndb_writer_profile profile; - if (!ndb_begin_rw_query(ndb, &txn)) { - fprintf(stderr, "ndb_migrate_utf8_profile_names: ndb_begin_rw_query failed\n"); - return 0; - } - - if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { + if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc); return 0; } + size_t scratch_size = 8 * 1024 * 1024; + unsigned char *scratch = malloc(scratch_size); + + ret = 1; count = 0; failed = 0; @@ -1889,14 +1854,14 @@ static int ndb_migrate_utf8_profile_names(struct ndb *ndb) profile_root = v.mv_data; record = NdbProfileRecord_as_root(profile_root); note_key = NdbProfileRecord_note_key(record); - note = ndb_get_note_by_key(&txn, note_key, &len); + note = ndb_get_note_by_key(txn, note_key, &len); if (note == NULL) { - fprintf(stderr, "ndb_migrate_utf8_profile_names: note lookup failed\n"); - return 0; + failed++; + continue; } - struct ndb_profile_record_builder *b = &out.profile.record; + struct ndb_profile_record_builder *b = &profile.record; // reprocess profile if (!ndb_process_profile_note(note, b)) { @@ -1908,13 +1873,14 @@ static int ndb_migrate_utf8_profile_names(struct ndb *ndb) copied_note = malloc(len); memcpy(copied_note, note, len); - out.type = NDB_WRITER_PROFILE; - out.profile.note.note = copied_note; - out.profile.note.note_len = len; + profile.note.note = copied_note; + profile.note.note_len = len; - ndb_writer_queue_msg(&ndb->writer, &out); - - count++; + // we don't pass in flags when migrating... a bit sketchy but + // whatever. noone is using this to customize nostrdb atm + if (ndb_write_note_and_profile(txn, &profile, scratch, scratch_size, 0)) { + count++; + } } fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count); @@ -1923,11 +1889,10 @@ static int ndb_migrate_utf8_profile_names(struct ndb *ndb) fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed); } + free(scratch); mdb_cursor_close(cur); - ndb_end_query(&txn); - - return 1; + return ret; } static struct ndb_migration MIGRATIONS[] = { @@ -4227,29 +4192,6 @@ static uint64_t ndb_write_note(struct ndb_txn *txn, return note_key; } -// only to be called from the writer thread -static void ndb_write_version(struct ndb_txn *txn, uint64_t version) -{ - int rc; - MDB_val key, val; - uint64_t version_key; - - version_key = NDB_META_KEY_VERSION; - - key.mv_data = &version_key; - key.mv_size = sizeof(version_key); - val.mv_data = &version; - val.mv_size = sizeof(version); - - if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { - ndb_debug("write version to ndb_meta failed: %s\n", - mdb_strerror(rc)); - return; - } - - //fprintf(stderr, "writing version %" PRIu64 "\n", version); -} - static void ndb_monitor_lock(struct ndb_monitor *mon) { pthread_mutex_lock(&mon->mutex); } @@ -4311,6 +4253,93 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, ndb_monitor_unlock(monitor); } +uint64_t ndb_write_note_and_profile( + struct ndb_txn *txn, + struct ndb_writer_profile *profile, + unsigned char *scratch, + size_t scratch_size, + uint32_t ndb_flags) +{ + uint64_t note_nkey; + + note_nkey = ndb_write_note(txn, &profile->note, scratch, scratch_size, ndb_flags); + + if (profile->record.builder) { + // only write if parsing didn't fail + ndb_write_profile(txn, profile, note_nkey); + } + + return note_nkey; +} + +// only to be called from the writer thread +static int ndb_write_version(struct ndb_txn *txn, uint64_t version) +{ + int rc; + MDB_val key, val; + uint64_t version_key; + + version_key = NDB_META_KEY_VERSION; + + key.mv_data = &version_key; + key.mv_size = sizeof(version_key); + val.mv_data = &version; + val.mv_size = sizeof(version); + + if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { + ndb_debug("write version to ndb_meta failed: %s\n", + mdb_strerror(rc)); + return 0; + } + + //fprintf(stderr, "writing version %" PRIu64 "\n", version); + return 1; +} + + +static int ndb_run_migrations(struct ndb_txn *txn) +{ + int64_t version, latest_version, i; + + latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); + + if ((version = ndb_db_version(txn)) == -1) { + ndb_debug("run_migrations: no version found, assuming new db\n"); + version = latest_version; + + // no version found. fresh db? + if (!ndb_write_version(txn, version)) { + fprintf(stderr, "run_migrations: failed writing db version"); + return 0; + } + + return 1; + } else { + ndb_debug("ndb: version %" PRIu64 " found\n", version); + } + + if (version < latest_version) + fprintf(stderr, "nostrdb: migrating v%d -> v%d\n", + (int)version, (int)latest_version); + + for (i = version; i < latest_version; i++) { + if (!MIGRATIONS[i].fn(txn)) { + fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); + return 0; + } + + if (!ndb_write_version(txn, i+1)) { + fprintf(stderr, "run_migrations: failed writing db version"); + return 0; + } + + version = i+1; + } + + return 1; +} + + static void *ndb_writer_thread(void *data) { ndb_debug("started writer thread\n"); @@ -4346,6 +4375,7 @@ static void *ndb_writer_thread(void *data) case NDB_WRITER_DBMETA: needs_commit = 1; break; case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; case NDB_WRITER_BLOCKS: needs_commit = 1; break; + case NDB_WRITER_MIGRATE: needs_commit = 1; break; case NDB_WRITER_QUIT: break; } } @@ -4369,24 +4399,22 @@ static void *ndb_writer_thread(void *data) continue; case NDB_WRITER_PROFILE: note_nkey = - ndb_write_note(&txn, &msg->note, - scratch, scratch_size, - writer->ndb_flags); + ndb_write_note_and_profile( + &txn, + &msg->profile, + scratch, + scratch_size, + writer->ndb_flags); + if (note_nkey > 0) { written_notes[num_notes++] = (struct written_note){ .note_id = note_nkey, - .note = &msg->note, + .note = &msg->profile.note, }; } else { ndb_debug("failed to write note\n"); } - if (msg->profile.record.builder) { - // only write if parsing didn't fail - ndb_write_profile(&txn, &msg->profile, - note_nkey); - } - break; case NDB_WRITER_NOTE: note_nkey = ndb_write_note(&txn, &msg->note, scratch, @@ -4407,6 +4435,12 @@ static void *ndb_writer_thread(void *data) ndb_write_blocks(&txn, msg->blocks.note_key, msg->blocks.blocks); break; + case NDB_WRITER_MIGRATE: + if (!ndb_run_migrations(&txn)) { + mdb_txn_abort(txn.mdb_txn); + goto bail; + } + break; case NDB_WRITER_PROFILE_LAST_FETCH: ndb_writer_last_profile_fetch(&txn, msg->last_fetch.pubkey, @@ -4443,6 +4477,7 @@ static void *ndb_writer_thread(void *data) } } +bail: free(scratch); ndb_debug("quitting writer thread\n"); return NULL; @@ -4753,50 +4788,6 @@ static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) return ndb_writer_queue_msg(&ndb->writer, &msg); } -static int ndb_run_migrations(struct ndb *ndb) -{ - int64_t version, latest_version, i; - - latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); - - if ((version = ndb_db_version(ndb)) == -1) { - ndb_debug("run_migrations: no version found, assuming new db\n"); - version = latest_version; - - // no version found. fresh db? - if (!ndb_queue_write_version(ndb, version)) { - fprintf(stderr, "run_migrations: failed writing db version"); - return 0; - } - - return 1; - } else { - ndb_debug("ndb: version %" PRIu64 " found\n", version); - } - - if (version < latest_version) - fprintf(stderr, "nostrdb: migrating v%d -> v%d\n", - (int)version, (int)latest_version); - - for (i = version; i < latest_version; i++) { - if (!MIGRATIONS[i].fn(ndb)) { - fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); - return 0; - } - - if (!ndb_queue_write_version(ndb, i+1)) { - fprintf(stderr, "run_migrations: failed writing db version"); - return 0; - } - - version = i+1; - } - - ndb->version = version; - - return 1; -} - static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb, void *sub_cb_ctx) { @@ -4861,10 +4852,9 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c return 0; } - if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) && - !ndb_run_migrations(ndb)) { - fprintf(stderr, "failed to run migrations\n"); - return 0; + if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE)) { + struct ndb_writer_msg msg = { .type = NDB_WRITER_MIGRATE }; + ndb_writer_queue_msg(&ndb->writer, &msg); } // Initialize LMDB environment and spin up threads diff --git a/src/nostrdb.h b/src/nostrdb.h index d4722e7..94895e2 100644 --- a/src/nostrdb.h +++ b/src/nostrdb.h @@ -458,7 +458,7 @@ int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[3 // NDB int ndb_init(struct ndb **ndb, const char *dbdir, const struct ndb_config *); -int ndb_db_version(struct ndb *ndb); +int ndb_db_version(struct ndb_txn *txn); int ndb_process_event(struct ndb *, const char *json, int len); int ndb_process_events(struct ndb *, const char *ldjson, size_t len); #ifndef _WIN32 diff --git a/test.c b/test.c index cf5154e..bd7bd72 100644 --- a/test.c +++ b/test.c @@ -470,12 +470,16 @@ static void test_migrate() { static const char *v0_dir = "testdata/db/v0"; struct ndb *ndb; struct ndb_config config; + struct ndb_txn txn; + ndb_default_config(&config); ndb_config_set_flags(&config, NDB_FLAG_NOMIGRATE); fprintf(stderr, "testing migrate on v0\n"); assert(ndb_init(&ndb, v0_dir, &config)); - assert(ndb_db_version(ndb) == 0); + assert(ndb_begin_query(ndb, &txn)); + assert(ndb_db_version(&txn) == 0); + assert(ndb_end_query(&txn)); ndb_destroy(ndb); ndb_config_set_flags(&config, 0); @@ -483,7 +487,10 @@ static void test_migrate() { assert(ndb_init(&ndb, v0_dir, &config)); ndb_destroy(ndb); assert(ndb_init(&ndb, v0_dir, &config)); - assert(ndb_db_version(ndb) == 3); + + assert(ndb_begin_query(ndb, &txn)); + assert(ndb_db_version(&txn) == 3); + assert(ndb_end_query(&txn)); test_profile_search(ndb); ndb_destroy(ndb);