Skip to content

Commit

Permalink
always leave tombstones in funk
Browse files Browse the repository at this point in the history
  • Loading branch information
asiegel-jt authored and ibhatt-jumptrading committed Dec 18, 2024
1 parent 5442608 commit 7843108
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 399 deletions.
4 changes: 2 additions & 2 deletions src/app/ledger/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ runtime_replay( fd_ledger_args_t * ledger_args ) {
if( FD_UNLIKELY( err ) ) {
FD_LOG_ERR(( "Failed to import block %lu", start_slot ));
}

fd_blockstore_start_write( blockstore );

/* Remove the previous block from the blockstore */
Expand Down Expand Up @@ -1217,7 +1217,7 @@ prune( fd_ledger_args_t * args ) {
fd_funk_val_sz( original_rec ) ) == 0 ));
} else {
fd_funk_rec_t * mod_rec = fd_funk_rec_modify( pruned_funk, rec );
int res = fd_funk_rec_remove( pruned_funk, mod_rec, 1 );
int res = fd_funk_rec_remove( pruned_funk, mod_rec );
FD_TEST(( res == 0 ));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/flamenco/runtime/fd_hashes.c
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ fd_update_hash_bank_tpool( fd_exec_slot_ctx_t * slot_ctx,
continue;
}

fd_funk_rec_remove(funk, fd_funk_rec_modify(funk, task_info->rec), 1);
fd_funk_rec_remove(funk, fd_funk_rec_modify(funk, task_info->rec));
}

// Sanity-check LT Hash
Expand Down
2 changes: 1 addition & 1 deletion src/flamenco/runtime/fd_runtime.c
Original file line number Diff line number Diff line change
Expand Up @@ -3694,7 +3694,7 @@ fd_runtime_cleanup_incinerator( fd_exec_slot_ctx_t * slot_ctx ) {
fd_funk_t * funk = slot_ctx->acc_mgr->funk;
fd_funk_rec_t const * rec = fd_funk_rec_query( funk, slot_ctx->funk_txn, &id );
if( rec )
fd_funk_rec_remove( funk, fd_funk_rec_modify( funk, rec ), 1 );
fd_funk_rec_remove( funk, fd_funk_rec_modify( funk, rec ));
}

void
Expand Down
10 changes: 5 additions & 5 deletions src/flamenco/runtime/program/fd_bpf_program_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fd_acc_mgr_cache_key( fd_pubkey_t const * pubkey ) {
return id;
}

/* Similar to the below function, but gets the executable program content for the v4 loader.
/* Similar to the below function, but gets the executable program content for the v4 loader.
Unlike the v3 loader, the programdata is stored in a single program account. The program must
NOT be retracted to be added to the cache. */
static int
Expand All @@ -62,7 +62,7 @@ fd_bpf_get_executable_program_content_for_v4_loader( fd_borrowed_account_t * pro
ulong * program_data_len ) {
int err;
fd_loader_v4_state_t state = {0};

/* Get the current loader v4 state. This implicitly also checks the dlen. */
err = fd_loader_v4_get_state( program_acc, &state );
if( FD_UNLIKELY( err ) ) {
Expand All @@ -73,7 +73,7 @@ fd_bpf_get_executable_program_content_for_v4_loader( fd_borrowed_account_t * pro
if( FD_UNLIKELY( fd_loader_v4_status_is_retracted( &state ) ) ) {
return -1;
}

*program_data = program_acc->const_data + LOADER_V4_PROGRAM_DATA_OFFSET;
*program_data_len = program_acc->const_meta->dlen - LOADER_V4_PROGRAM_DATA_OFFSET;
return 0;
Expand Down Expand Up @@ -224,7 +224,7 @@ fd_bpf_create_bpf_program_cache_entry( fd_exec_slot_ctx_t * slot_ctx,
if( FD_UNLIKELY( 0!=fd_sbpf_program_load( prog, program_data, program_data_len, syscalls, false ) ) ) {
/* Remove pending funk record */
FD_LOG_DEBUG(( "fd_sbpf_program_load() failed: %s", fd_sbpf_strerror() ));
fd_funk_rec_remove( funk, rec, 0 );
fd_funk_rec_remove( funk, rec );
return -1;
}

Expand Down Expand Up @@ -267,7 +267,7 @@ fd_bpf_create_bpf_program_cache_entry( fd_exec_slot_ctx_t * slot_ctx,
if( FD_UNLIKELY( res ) ) {
/* Remove pending funk record */
FD_LOG_DEBUG(( "fd_vm_validate() failed" ));
fd_funk_rec_remove( funk, rec, 0 );
fd_funk_rec_remove( funk, rec );
return -1;
}

Expand Down
5 changes: 4 additions & 1 deletion src/funk/fd_funk_archive.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ fd_funk_archive( fd_funk_t * funk,
ARCH_WRITE( rec->pair.key, sizeof(rec->pair.key) );
ARCH_WRITE( &rec->part, sizeof(rec->part) );
ARCH_WRITE( &rec->val_sz, sizeof(rec->val_sz) );
ARCH_WRITE( &rec->flags, sizeof(rec->flags) );
if( rec->val_sz ) {
ARCH_WRITE( fd_wksp_laddr_fast( wksp, rec->val_gaddr ), rec->val_sz );
}
Expand Down Expand Up @@ -128,6 +129,7 @@ fd_funk_unarchive( fd_funk_t * funk,
fd_memset( &pair, 0, sizeof(pair) );
uint part;
uint val_sz;
ulong flags;

for(;;) {
ARCH_READ( &type, sizeof(type) );
Expand All @@ -138,6 +140,7 @@ fd_funk_unarchive( fd_funk_t * funk,
ARCH_READ( pair.key, sizeof(pair.key) );
ARCH_READ( &part, sizeof(part) );
ARCH_READ( &val_sz, sizeof(val_sz) );
ARCH_READ( &flags, sizeof(flags) );

if( FD_UNLIKELY( fd_funk_rec_map_is_full( rec_map ) ) ) {
FD_LOG_WARNING(( "archive %s has too many records to fit in given funk", filename ));
Expand All @@ -155,7 +158,7 @@ fd_funk_unarchive( fd_funk_t * funk,
rec->next_idx = FD_FUNK_REC_IDX_NULL;
rec->txn_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
rec->tag = 0U;
rec->flags = 0UL;
rec->flags = flags;

int first_born = fd_funk_rec_idx_is_null( rec_prev_idx );
if( first_born ) funk->rec_head_idx = rec_idx;
Expand Down
177 changes: 24 additions & 153 deletions src/funk/fd_funk_rec.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,14 @@ fd_funk_rec_insert( fd_funk_t * funk,
fd_funk_rec_t * rec = fd_funk_rec_map_query( rec_map, pair, NULL );

if( FD_UNLIKELY( rec ) ) { /* Already a record present */
if( FD_UNLIKELY( rec->flags & FD_FUNK_REC_FLAG_ERASE ) ) FD_LOG_CRIT(( "memory corruption detected (bad flags)" ));

/* However, if the record is marked for erasure, reset the flag and
return the record. */
if( rec->flags & FD_FUNK_REC_FLAG_ERASE ) {
rec->flags &= ~FD_FUNK_REC_FLAG_ERASE;
return rec;
}

fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_KEY );
return NULL;
}
Expand Down Expand Up @@ -356,20 +363,10 @@ fd_funk_rec_insert( fd_funk_t * funk,

if( FD_UNLIKELY( rec ) ) { /* Already a record present */

/* If this record has erase set, it is supposed to erase its
closest ancestor record on publish. At the time it was marked
erase, any updates it had to the ancestor record value were
flushed. Thus clearing the erase flag will reset the record to
the way it was when it was first inserted into this
transaction.
Otherwise, the user is trying insert a record update on top of
/* The user is trying insert a record update on top of
a pre-existing of record update. We fail with ERR_KEY to
prevent accidentally discarding any previous updates
unintentionally.
In both cases, it is straightforward to tweak these to have
alternative behaviors as might be convenient for users. */
unintentionally. */

if( FD_UNLIKELY( rec->flags & FD_FUNK_REC_FLAG_ERASE ) ) {
rec->flags &= ~FD_FUNK_REC_FLAG_ERASE;
Expand Down Expand Up @@ -417,8 +414,7 @@ fd_funk_rec_insert( fd_funk_t * funk,

int
fd_funk_rec_remove( fd_funk_t * funk,
fd_funk_rec_t * rec,
int erase ) {
fd_funk_rec_t * rec ) {

if( FD_UNLIKELY( !funk ) ) return FD_FUNK_ERR_INVAL;
fd_funk_check_write( funk );
Expand All @@ -436,161 +432,34 @@ fd_funk_rec_remove( fd_funk_t * funk,

if( FD_UNLIKELY( rec!=fd_funk_rec_map_query_const( rec_map, fd_funk_rec_pair( rec ), NULL ) ) ) return FD_FUNK_ERR_KEY;

/* At this point, rec appears to be a live record. Determine which
list contains the record and if we are allowed to remove it. */

ulong * _rec_head_idx;
ulong * _rec_tail_idx;

ulong txn_idx = fd_funk_txn_idx( rec->txn_cidx );

if( FD_UNLIKELY( fd_funk_txn_idx_is_null( txn_idx ) ) ) { /* Removing from last published, opt for lots recs, rand remove */

if( FD_UNLIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) return FD_FUNK_ERR_FROZEN;

if( FD_UNLIKELY( rec->flags & FD_FUNK_REC_FLAG_ERASE ) ) FD_LOG_CRIT(( "memory corruption detected (bad flags)" ));

if( FD_UNLIKELY( !erase ) ) return FD_FUNK_ERR_XID;

/* At this point, we are in last published transaction, it is
unfrozen, the record erase flag is clear and the user explicitly
asked to erase. Remove the record. */

_rec_head_idx = &funk->rec_head_idx;
_rec_tail_idx = &funk->rec_tail_idx;

} else { /* Removing from in-prep transaction */
} else {

fd_funk_txn_t * txn_map = fd_funk_txn_map( funk, wksp );
ulong txn_max = funk->txn_max;

if( FD_UNLIKELY( txn_idx>=txn_max ) ) FD_LOG_CRIT(( "memory corruption detected (bad idx)" ));

if( FD_UNLIKELY( fd_funk_txn_is_frozen( &txn_map[ txn_idx ] ) ) ) return FD_FUNK_ERR_FROZEN;

if( FD_UNLIKELY( erase ) ) {

/* If this was already marked for erase, we are done (we already
flushed the value when it was first marked for erase) */

if( FD_UNLIKELY( rec->flags & FD_FUNK_REC_FLAG_ERASE ) ) return FD_FUNK_SUCCESS;

/* Query our ancestors to see if we need to keep this record
around or if we can just remove it immediately. Though this is
potentially an O(ancestor_cnt) cost, it prevents the
possibility unnecessarily consuming a practically unbounded
number of records by flickering insert / remove-with-erase in
an in-preparation transaction with lots unique keys. */

ulong tag = funk->cycle_tag++;

ulong cur_idx = txn_idx;
for(;;) {

/* At this point, transaction cur_idx is an in-prep transaction.
Tag it for cycle detection and see if transaction cur_idx's
parent has a record for this and react accordingly. */

txn_map[ cur_idx ].tag = tag;

ulong parent_idx = fd_funk_txn_idx( txn_map[ cur_idx ].parent_cidx );
if( FD_LIKELY( fd_funk_txn_idx_is_null( parent_idx ) ) ) { /* Parent txn is last published, opt for shallow */

fd_funk_rec_t const * erase_rec = fd_funk_rec_query( funk, NULL, fd_funk_rec_key( rec ) );
if( FD_UNLIKELY( !erase_rec ) ) break; /* No ancestor has this record, can free immediately, opt no flicker */

/* Record is available in last published ... this remove
should erase the published record when if and when this txn
is published. We should never see a published record as
flagged for erasure. */

if( FD_UNLIKELY( erase_rec->flags & FD_FUNK_REC_FLAG_ERASE ) ) FD_LOG_CRIT(( "memory corruption detected (bad flags)" ));

fd_funk_part_set_intern( fd_funk_get_partvec( funk, wksp ), rec_map, rec, FD_FUNK_PART_NULL );
fd_funk_val_flush( rec, fd_funk_alloc( funk, wksp ), wksp ); /* TODO: consider testing wksp_gaddr has wksp_tag? */

rec->flags |= FD_FUNK_REC_FLAG_ERASE;

return FD_FUNK_SUCCESS;

}

if( FD_UNLIKELY( parent_idx>=txn_max ) ) FD_LOG_CRIT(( "memory corruption detected (bad idx)" ));
if( FD_UNLIKELY( txn_map[ parent_idx ].tag==tag ) ) FD_LOG_CRIT(( "memory corruption detected (cycle)" ));

fd_funk_rec_t const * erase_rec = fd_funk_rec_query( funk, &txn_map[ parent_idx ], fd_funk_rec_key( rec ) );
if( FD_LIKELY( erase_rec ) ) { /* Opt for shallow */

/* Record is available in an in-prep ancestor ... this remove
erases that record on publish of this txn (which also
implies an earlier publish of that ancestor).
If that ancestor record itself was already marked as
erasing a record, we can just free this record.
(Note, there are some exotic circumstances that can
generate such naturally but they are pretty gross. For
example distant ancestor has this record, unpublished
ancestor has marked it for erase, user reused the record's
key in this txn, and has some proposed updates to the
record's val that the user then decides to discard. It is
arguable that cases should be disallowed. More generally,
it is a good practice to only erase on records that don't
have any proposed updates in them to avoid cases like
this.) */

if( erase_rec->flags & FD_FUNK_REC_FLAG_ERASE ) break;

/* Otherwise, we mark this record as erasing that record and
discard any changes we might have made already in this
record. */

fd_funk_val_flush( rec, fd_funk_alloc( funk, wksp ), wksp ); /* TODO: consider testing wksp_gaddr has wksp_tag? */
fd_funk_part_set_intern( fd_funk_get_partvec( funk, wksp ), rec_map, rec, FD_FUNK_PART_NULL );

rec->flags |= FD_FUNK_REC_FLAG_ERASE;

return FD_FUNK_SUCCESS;
}

cur_idx = parent_idx;
}

}

/* At this point, we are in an in-prep transaction, it is unfrozen,
and we are to discard changes to this record done by this
transaction. Note that if rec_erase is set, this will discard
the erase and any value changes that might have been made
previously. */

_rec_head_idx = &txn_map[ txn_idx ].rec_head_idx;
_rec_tail_idx = &txn_map[ txn_idx ].rec_tail_idx;
}

/* Flush the value, remove the record from its list, and unmap the
record */

fd_funk_val_flush( rec, fd_funk_alloc( funk, wksp ), wksp ); /* TODO: consider testing wksp_gaddr has wksp_tag? */
fd_funk_part_set_intern( fd_funk_get_partvec( funk, wksp ), rec_map, rec, FD_FUNK_PART_NULL );

ulong prev_idx = rec->prev_idx;
ulong next_idx = rec->next_idx;
/* If this was already marked for erase, we are done (we already
flushed the value when it was first marked for erase) */

int prev_null = fd_funk_rec_idx_is_null( prev_idx );
int next_null = fd_funk_rec_idx_is_null( next_idx );

if( !( ((prev_null) | (prev_idx<rec_max)) & ((next_null) | (next_idx<rec_max)) ) )
FD_LOG_CRIT(( "memory corruption detected (bad idx)" ));
if( FD_UNLIKELY( rec->flags & FD_FUNK_REC_FLAG_ERASE ) ) return FD_FUNK_SUCCESS;

/* TODO: Consider branchless impl */
if( prev_null ) *_rec_head_idx = next_idx;
else rec_map[ prev_idx ].next_idx = next_idx;
/* Flush the value and leave a tombstone behind. In theory, this can
lead to an unbounded number of records, but for application
reasons, we need to remember what was deleted. */

if( next_null ) *_rec_tail_idx = prev_idx;
else rec_map[ next_idx ].prev_idx = prev_idx;

fd_funk_rec_map_remove( rec_map, fd_funk_rec_pair( rec ) );
fd_funk_val_flush( rec, fd_funk_alloc( funk, wksp ), wksp );
fd_funk_part_set_intern( fd_funk_get_partvec( funk, wksp ), rec_map, rec, FD_FUNK_PART_NULL );
rec->flags |= FD_FUNK_REC_FLAG_ERASE;

return FD_FUNK_SUCCESS;
}
Expand All @@ -613,6 +482,9 @@ fd_funk_rec_write_prepare( fd_funk_t * funk,
else
rec_con = irec;

/* We are able to handle tombstones in this case because we treat an erased
record as not exisitng. */

if ( FD_UNLIKELY( rec_con && !(rec_con->flags & FD_FUNK_REC_FLAG_ERASE) ) ) {
/* We have an incarnation of the record */
if ( txn == fd_funk_rec_txn( rec_con, fd_funk_txn_map( funk, wksp ) ) ) {
Expand Down Expand Up @@ -691,7 +563,6 @@ fd_funk_rec_verify( fd_funk_t * funk ) {
if( fd_funk_txn_idx_is_null( txn_idx ) ) { /* This is a record from the last published transaction */

TEST( fd_funk_txn_xid_eq_root( txn_xid ) );
TEST( !(rec->flags & FD_FUNK_REC_FLAG_ERASE) );

} else { /* This is a record from an in-prep transaction */

Expand Down
Loading

0 comments on commit 7843108

Please sign in to comment.