Skip to content

Commit

Permalink
blocks: write note blocks on ingest
Browse files Browse the repository at this point in the history
When ingesting notes, parse text/longform contents and store them in nostrdb.
  • Loading branch information
jb55 committed Dec 30, 2023
1 parent 5fee42e commit b975cff
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 9 deletions.
166 changes: 157 additions & 9 deletions src/nostrdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "lmdb.h"
#include "util.h"
#include "cpu.h"
#include "block.h"
#include "threadpool.h"
#include "protected_queue.h"
#include "memchr.h"
Expand Down Expand Up @@ -127,6 +128,7 @@ enum ndb_writer_msgtype {
NDB_WRITER_PROFILE, // write a profile to the db
NDB_WRITER_DBMETA, // write ndb metadata
NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
NDB_WRITER_BLOCKS, // write parsed note blocks
};

// keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
Expand Down Expand Up @@ -1141,6 +1143,12 @@ struct ndb_writer_last_fetch {
uint64_t fetched_at;
};

// write note blocks
struct ndb_writer_blocks {
struct ndb_blocks *blocks;
uint64_t note_key;
};

// The different types of messages that the writer thread can write to the
// database
struct ndb_writer_msg {
Expand All @@ -1150,6 +1158,7 @@ struct ndb_writer_msg {
struct ndb_writer_profile profile;
struct ndb_writer_ndb_meta ndb_meta;
struct ndb_writer_last_fetch last_fetch;
struct ndb_writer_blocks blocks;
};
};

Expand Down Expand Up @@ -2651,8 +2660,48 @@ int ndb_text_search(struct ndb_txn *txn, const char *query,
return 1;
}

static void ndb_write_blocks(struct ndb_txn *txn, uint64_t note_key,
struct ndb_blocks *blocks)
{
int rc;
MDB_val key, val;

key.mv_data = &note_key;
key.mv_size = sizeof(note_key);
val.mv_data = blocks;
val.mv_size = ndb_blocks_total_size(blocks);
assert((val.mv_size % 8) == 0);

if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_BLOCKS], &key, &val, 0))) {
ndb_debug("write version to note_blocks failed: %s\n",
mdb_strerror(rc));
return;
}
}

static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note,
uint64_t note_key, unsigned char *scratch,
size_t scratch_size)
{
size_t content_len;
const char *content;
struct ndb_blocks *blocks;

content_len = ndb_note_content_length(note);
content = ndb_note_content(note);

if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) {
ndb_debug("failed to parse content '%.*s'\n", content_len, content);
return 0;
}

ndb_write_blocks(txn, note_key, blocks);
return 1;
}

static uint64_t ndb_write_note(struct ndb_txn *txn,
struct ndb_writer_note *note)
struct ndb_writer_note *note,
unsigned char *scratch, size_t scratch_size)
{
int rc;
uint64_t note_key;
Expand Down Expand Up @@ -2688,10 +2737,14 @@ static uint64_t ndb_write_note(struct ndb_txn *txn,
if (!ndb_write_note_kind_index(txn, note->note, note_key))
return 0;

// only do fulltext index on text and longform notes
// only parse content and do fulltext index on text and longform notes
if (note->note->kind == 1 || note->note->kind == 30023) {
if (!ndb_write_note_fulltext_index(txn, note->note, note_key))
return 0;

// write note blocks
ndb_write_new_blocks(txn, note->note, note_key, scratch,
scratch_size);
}

if (note->note->kind == 7) {
Expand Down Expand Up @@ -2728,6 +2781,9 @@ static void *ndb_writer_thread(void *data)
{
struct ndb_writer *writer = data;
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
// 8mb scratch buffer for parsing note content
size_t scratch_size = 8 * 1024 * 1024;
unsigned char *scratch = malloc(scratch_size);
int i, popped, done, any_note;
uint64_t note_nkey;
MDB_txn *mdb_txn = NULL;
Expand All @@ -2748,6 +2804,7 @@ static void *ndb_writer_thread(void *data)
case NDB_WRITER_PROFILE: any_note = 1; break;
case NDB_WRITER_DBMETA: any_note = 1; break;
case NDB_WRITER_PROFILE_LAST_FETCH: any_note = 1; break;
case NDB_WRITER_BLOCKS: any_note = 1; break;
case NDB_WRITER_QUIT: break;
}
}
Expand All @@ -2770,22 +2827,28 @@ static void *ndb_writer_thread(void *data)
continue;
case NDB_WRITER_PROFILE:
note_nkey =
ndb_write_note(&txn, &msg->note);
ndb_write_note(&txn, &msg->note,
scratch, scratch_size);
if (msg->profile.record.builder) {
// only write if parsing didn't fail
ndb_write_profile(&txn, &msg->profile,
note_nkey);
}
break;
case NDB_WRITER_NOTE:
ndb_write_note(&txn, &msg->note);
ndb_write_note(&txn, &msg->note, scratch,
scratch_size);
//printf("wrote note ");
//print_hex(msg->note.note->id, 32);
//printf("\n");
break;
case NDB_WRITER_DBMETA:
ndb_write_version(&txn, msg->ndb_meta.version);
break;
case NDB_WRITER_BLOCKS:
ndb_write_blocks(&txn, msg->blocks.note_key,
msg->blocks.blocks);
break;
case NDB_WRITER_PROFILE_LAST_FETCH:
ndb_writer_last_profile_fetch(&txn,
msg->last_fetch.pubkey,
Expand All @@ -2804,15 +2867,18 @@ static void *ndb_writer_thread(void *data)
// free notes
for (i = 0; i < popped; i++) {
msg = &msgs[i];
if (msg->type == NDB_WRITER_NOTE)
if (msg->type == NDB_WRITER_NOTE) {
free(msg->note.note);
else if (msg->type == NDB_WRITER_PROFILE) {
} else if (msg->type == NDB_WRITER_PROFILE) {
free(msg->profile.note.note);
ndb_profile_record_builder_free(&msg->profile.record);
} else if (msg->type == NDB_WRITER_BLOCKS) {
ndb_blocks_free(msg->blocks.blocks);
}
}
}

free(scratch);
ndb_debug("quitting writer thread\n");
return NULL;
}
Expand Down Expand Up @@ -3060,24 +3126,30 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare);

if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) {
fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
fprintf(stderr, "mdb_dbi_open profile_pk failed: %s\n", mdb_strerror(rc));
return 0;
}
mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare);

if ((rc = mdb_dbi_open(txn, "note_kind", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_KIND]))) {
fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
fprintf(stderr, "mdb_dbi_open note_kind failed: %s\n", mdb_strerror(rc));
return 0;
}
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_tsid_compare);

if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT,
&lmdb->dbs[NDB_DB_NOTE_TEXT]))) {
fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
fprintf(stderr, "mdb_dbi_open note_text failed: %s\n", mdb_strerror(rc));
return 0;
}
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare);

if ((rc = mdb_dbi_open(txn, "note_blocks", MDB_CREATE | MDB_INTEGERKEY,
&lmdb->dbs[NDB_DB_NOTE_BLOCKS]))) {
fprintf(stderr, "mdb_dbi_open note_blocks failed: %s\n", mdb_strerror(rc));
return 0;
}

// Commit the transaction
if ((rc = mdb_txn_commit(txn))) {
fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc);
Expand Down Expand Up @@ -4591,9 +4663,85 @@ const char *ndb_db_name(enum ndb_dbs db)
return "note_kind_index";
case NDB_DB_NOTE_TEXT:
return "note_fulltext";
case NDB_DB_NOTE_BLOCKS:
return "note_blocks";
case NDB_DBS:
return "count";
}

return "unknown";
}

static struct ndb_blocks *ndb_note_to_blocks(struct ndb_note *note)
{
const char *content;
size_t content_len;
struct ndb_blocks *blocks;

content = ndb_note_content(note);
content_len = ndb_note_content_length(note);

// something weird is going on
if (content_len >= INT32_MAX)
return NULL;

unsigned char *buffer = malloc(content_len);
if (!buffer)
return NULL;

if (!ndb_parse_content(buffer, content_len, content, content_len, &blocks)) {
free(buffer);
return NULL;
}

blocks = realloc(blocks, ndb_blocks_total_size(blocks));
if (blocks == NULL)
return NULL;

blocks->flags |= NDB_BLOCK_FLAG_OWNED;

return blocks;
}

struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key)
{
struct ndb_blocks *blocks, *blocks_to_writer;
size_t blocks_size;
struct ndb_note *note;
size_t note_len;

if ((blocks = ndb_lookup_by_key(txn, note_key, NDB_DB_NOTE_BLOCKS, &note_len))) {
return blocks;
}

// If we don't have note blocks, let's lazily generate them. This is
// migration-friendly instead of doing them all at once
if (!(note = ndb_get_note_by_key(txn, note_key, &note_len))) {
// no note found, can't return note blocks
return NULL;
}

if (!(blocks = ndb_note_to_blocks(note)))
return NULL;

// send a copy to the writer
blocks_size = ndb_blocks_total_size(blocks);
blocks_to_writer = malloc(blocks_size);
memcpy(blocks_to_writer, blocks, blocks_size);
assert(blocks->flags & NDB_BLOCK_FLAG_OWNED);

// we generated new blocks, let's store them in the DB
struct ndb_writer_blocks write_blocks = {
.blocks = blocks_to_writer,
.note_key = note_key
};

assert(write_blocks.blocks != blocks);

struct ndb_writer_msg msg = { .type = NDB_WRITER_BLOCKS };
msg.blocks = write_blocks;

ndb_writer_queue_msg(&ndb->writer, &msg);

return blocks;
}
3 changes: 3 additions & 0 deletions src/nostrdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ enum ndb_dbs {
NDB_DB_PROFILE_LAST_FETCH,
NDB_DB_NOTE_KIND, // note kind index
NDB_DB_NOTE_TEXT, // note fulltext index
NDB_DB_NOTE_BLOCKS, // parsed note blocks for rendering
NDB_DBS,
};

Expand Down Expand Up @@ -474,6 +475,8 @@ size_t ndb_blocks_total_size(struct ndb_blocks *blocks);
/// Free blocks if they are owned, safe to call on unowned blocks as well.
void ndb_blocks_free(struct ndb_blocks *blocks);

// BLOCK DB
struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key);

// BLOCK ITERATORS
struct ndb_block_iterator *ndb_blocks_iterate_start(const char *, struct ndb_blocks *);
Expand Down

0 comments on commit b975cff

Please sign in to comment.