Skip to content

Commit

Permalink
ingest: support kind 6 reposts
Browse files Browse the repository at this point in the history
This also enables processing raw json via ndb import

Fixes: #46
Signed-off-by: William Casarin <[email protected]>
  • Loading branch information
jb55 committed Sep 17, 2024
1 parent f938941 commit cd9ba0e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 47 deletions.
2 changes: 1 addition & 1 deletion ndb.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ int main(int argc, char *argv[])
} else {
map_file(argv[2], &data, &data_len);
ndb_process_events(ndb, (const char *)data, data_len);
ndb_process_client_events(ndb, (const char *)data, data_len);
//ndb_process_client_events(ndb, (const char *)data, data_len);
}
} else if (argc == 2 && !strcmp(argv[1], "print-search-keys")) {
ndb_begin_query(ndb, &txn);
Expand Down
98 changes: 57 additions & 41 deletions src/nostrdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,37 @@ int ndb_process_profile_note(struct ndb_note *note,
return 1;
}

static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
char *json, unsigned len, unsigned client)
{
struct ndb_ingester_msg msg;
msg.type = NDB_INGEST_EVENT;

msg.event.json = json;
msg.event.len = len;
msg.event.client = client;

return threadpool_dispatch(&ingester->tp, &msg);
}


static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json,
int len, unsigned client)
{
// Since we need to return as soon as possible, and we're not
// making any assumptions about the lifetime of the string, we
// definitely need to copy the json here. In the future once we
// have our thread that manages a websocket connection, we can
// avoid the copy and just use the buffer we get from that
// thread.
char *json_copy = strdupn(json, len);
if (json_copy == NULL)
return 0;

return ndb_ingester_queue_event(ingester, json_copy, len, client);
}


static int ndb_ingester_process_note(secp256k1_context *ctx,
struct ndb_note *note,
size_t note_size,
Expand Down Expand Up @@ -2077,12 +2108,18 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
out->type = NDB_WRITER_PROFILE;
out->profile.note.note = note;
out->profile.note.note_len = note_size;
} else {
out->type = NDB_WRITER_NOTE;
out->note.note = note;
out->note.note_len = note_size;
return 1;
} else if (note->kind == 6) {
// process the repost if we have a repost event
ndb_debug("processing kind 6 repost\n");
ndb_ingest_event(ingester, ndb_note_content(note),
ndb_note_content_length(note), 0);
}

out->type = NDB_WRITER_NOTE;
out->note.note = note;
out->note.note_len = note_size;

return 1;
}

Expand Down Expand Up @@ -4102,19 +4139,6 @@ static int ndb_ingester_destroy(struct ndb_ingester *ingester)
return 1;
}

static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
char *json, unsigned len, unsigned client)
{
struct ndb_ingester_msg msg;
msg.type = NDB_INGEST_EVENT;

msg.event.json = json;
msg.event.len = len;
msg.event.client = client;

return threadpool_dispatch(&ingester->tp, &msg);
}

static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize)
{
int rc;
Expand Down Expand Up @@ -4383,17 +4407,7 @@ void ndb_destroy(struct ndb *ndb)
// The client-sent variation of ndb_process_event
int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
{
// Since we need to return as soon as possible, and we're not
// making any assumptions about the lifetime of the string, we
// definitely need to copy the json here. In the future once we
// have our thread that manages a websocket connection, we can
// avoid the copy and just use the buffer we get from that
// thread.
char *json_copy = strdupn(json, len);
if (json_copy == NULL)
return 0;

return ndb_ingester_queue_event(&ndb->ingester, json_copy, len, 1);
return ndb_ingest_event(&ndb->ingester, json, len, 1);
}

// Process anostr event from a relay,
Expand All @@ -4415,17 +4429,7 @@ int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
//
int ndb_process_event(struct ndb *ndb, const char *json, int json_len)
{
// Since we need to return as soon as possible, and we're not
// making any assumptions about the lifetime of the string, we
// definitely need to copy the json here. In the future once we
// have our thread that manages a websocket connection, we can
// avoid the copy and just use the buffer we get from that
// thread.
char *json_copy = strdupn(json, json_len);
if (json_copy == NULL)
return 0;

return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len, 0);
return ndb_ingest_event(&ndb->ingester, json, json_len, 0);
}


Expand Down Expand Up @@ -5360,12 +5364,19 @@ int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce,
jsmntok_t *tok = NULL;
int tok_len, res;
struct ndb_json_parser parser;
struct ndb_event *ev = &fce->event;

ndb_json_parser_init(&parser, json, len, buf, bufsize);

if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
return res;

if (parser.toks[0].type == JSMN_OBJECT) {
ndb_debug("got raw json in client_event_from_json\n");
fce->evtype = NDB_FCE_EVENT;
return ndb_parse_json_note(&parser, &ev->note);
}

if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY)
return 0;

Expand All @@ -5377,7 +5388,6 @@ int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce,

if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
fce->evtype = NDB_FCE_EVENT;
struct ndb_event *ev = &fce->event;
return ndb_parse_json_note(&parser, &ev->note);
}

Expand All @@ -5392,6 +5402,7 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
jsmntok_t *tok = NULL;
int tok_len, res;
struct ndb_json_parser parser;
struct ndb_event *ev = &tce->event;

tce->subid_len = 0;
tce->subid = "";
Expand All @@ -5401,6 +5412,12 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
return res;

if (parser.toks[0].type == JSMN_OBJECT) {
ndb_debug("got raw json in ws_event_from_json\n");
tce->evtype = NDB_TCE_EVENT;
return ndb_parse_json_note(&parser, &ev->note);
}

if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY)
return 0;

Expand All @@ -5412,7 +5429,6 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,

if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
tce->evtype = NDB_TCE_EVENT;
struct ndb_event *ev = &tce->event;

tok = &parser.toks[parser.i++];
if (tok->type != JSMN_STRING)
Expand Down
8 changes: 3 additions & 5 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ static void test_fetched_at()
// this should be set to t1
fetched_at = ndb_read_last_profile_fetch(&txn, pubkey);

if (fetched_at != t1) {
printf("fetched_at != t1? %" PRIu64 " != %" PRIu64 "\n", fetched_at, t1);
}
assert(fetched_at == t1);

t2 = time(NULL);
Expand Down Expand Up @@ -1422,14 +1425,9 @@ static void test_query()
0xe5, 0x39, 0x21, 0x03, 0x70, 0x0c, 0x10, 0xfc
};


const char *ev = "[\"EVENT\",\"s\",{\"id\": \"0336948bdfbf5f939802eba03aa78735c82825211eece987a6d2e20e3cfff930\",\"pubkey\": \"aeadd3bf2fd92e509e137c9e8bdf20e99f286b90be7692434e03c015e1d3bbfe\",\"created_at\": 1704401597,\"kind\": 1,\"tags\": [],\"content\": \"hello\",\"sig\": \"232395427153b693e0426b93d89a8319324d8657e67d23953f014a22159d2127b4da20b95644b3e34debd5e20be0401c283e7308ccb63c1c1e0f81cac7502f09\"}]";

const char *ev2 = "[\"EVENT\",\"s\",{\"id\": \"0a350c5851af6f6ce368bab4e2d4fe442a1318642c7fe58de5392103700c10fc\",\"pubkey\": \"dfa3fc062f7430dab3d947417fd3c6fb38a7e60f82ffe3387e2679d4c6919b1d\",\"created_at\": 1704404822,\"kind\": 1,\"tags\": [],\"content\": \"hello2\",\"sig\": \"48a0bb9560b89ee2c6b88edcf1cbeeff04f5e1b10d26da8564cac851065f30fa6961ee51f450cefe5e8f4895e301e8ffb2be06a2ff44259684fbd4ea1c885696\"}]";


const char *ev3 = "[\"EVENT\",\"s\",{\"id\": \"20d2b66e1a3ac4a2afe22866ad742091b6267e6e614303de062adb33e12c9931\",\"pubkey\": \"7987bfb2632d561088fc8e3c30a95836f822e4f53633228ec92ae2f5cd6690aa\",\"created_at\": 1704408561,\"kind\": 2,\"tags\": [],\"content\": \"what\",\"sig\": \"cc8533bf177ac87771a5218a04bed24f7a1706f0b2d92700045cdeb38accc5507c6c8de09525e43190df3652012b554d4efe7b82ab268a87ff6f23da44e16a8f\"}]";

const char *ev4 = "[\"EVENT\",\"s\",{\"id\": \"8a2057c13c1c57b536eab78e6c55428732d33b6b5b234c1f5eab2b5918c37fa1\",\"pubkey\": \"303b5851504da5caa14142e9e2e1b1b60783c48d6f137c205019d46d09244c26\",\"created_at\": 1704408730,\"kind\": 2,\"tags\": [],\"content\": \"hmm\",\"sig\": \"e7cd3029042d41964192411929cade59592840af766da6420077ccc57a61405312db6ca879150db01f53c3b81c477cec5d6bd49f9dc10937267cacf7e5c784b3\"}]";

assert(ndb_init(&ndb, test_dir, &config));
Expand Down

0 comments on commit cd9ba0e

Please sign in to comment.