Skip to content

Commit

Permalink
funk-update: fix funk concurrency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
asiegel-jt committed Mar 14, 2024
1 parent 89da411 commit d9c892b
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 9 deletions.
57 changes: 57 additions & 0 deletions src/funk/fd_funk.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "fd_funk.h"
#include <stdio.h>

ulong
fd_funk_align( void ) {
Expand Down Expand Up @@ -351,3 +352,59 @@ fd_funk_verify( fd_funk_t * funk ) {

return FD_FUNK_SUCCESS;
}

static char *
fd_smart_size( ulong sz, char * tmp, size_t tmpsz ) {
if( sz <= (1UL<<7) )
snprintf( tmp, tmpsz, "%lu B", sz );
else if( sz <= (1UL<<17) )
snprintf( tmp, tmpsz, "%.3f KB", ((double)sz/((double)(1UL<<10))) );
else if( sz <= (1UL<<27) )
snprintf( tmp, tmpsz, "%.3f MB", ((double)sz/((double)(1UL<<20))) );
else
snprintf( tmp, tmpsz, "%.3f GB", ((double)sz/((double)(1UL<<30))) );
return tmp;
}

void
fd_funk_log_mem_usage( fd_funk_t * funk ) {
char tmp1[100];
char tmp2[100];

FD_LOG_NOTICE(( "funk base footprint: %s",
fd_smart_size( fd_funk_footprint(), tmp1, sizeof(tmp1) ) ));
fd_wksp_t * wksp = fd_funk_wksp( funk );
fd_funk_txn_t const * txn_map = fd_funk_txn_map( funk, wksp );
FD_LOG_NOTICE(( "txn table footprint: %s (%lu entries used out of %lu, %lu%%)",
fd_smart_size( fd_funk_txn_map_footprint( funk->txn_max ), tmp1, sizeof(tmp1) ),
fd_funk_txn_map_key_cnt( txn_map ),
fd_funk_txn_map_key_max( txn_map ),
(100U*fd_funk_txn_map_key_cnt( txn_map )) / fd_funk_txn_map_key_max( txn_map ) ));
fd_funk_rec_t * rec_map = fd_funk_rec_map( funk, wksp );
FD_LOG_NOTICE(( "rec table footprint: %s (%lu entries used out of %lu, %lu%%)",
fd_smart_size( fd_funk_rec_map_footprint( funk->rec_max ), tmp1, sizeof(tmp1) ),
fd_funk_rec_map_key_cnt( rec_map ),
fd_funk_rec_map_key_max( rec_map ),
(100U*fd_funk_rec_map_key_cnt( rec_map )) / fd_funk_rec_map_key_max( rec_map ) ));
ulong val_cnt = 0;
ulong val_min = ULONG_MAX;
ulong val_max = 0;
ulong val_used = 0;
ulong val_alloc = 0;
for( fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter_init( rec_map );
!fd_funk_rec_map_iter_done( rec_map, iter );
iter = fd_funk_rec_map_iter_next( rec_map, iter ) ) {
fd_funk_rec_t * rec = fd_funk_rec_map_iter_ele( rec_map, iter );
val_cnt ++;
val_min = fd_ulong_min( val_min, rec->val_sz );
val_max = fd_ulong_max( val_max, rec->val_sz );
val_used += rec->val_sz;
val_alloc += rec->val_max;
}
FD_LOG_NOTICE(( " rec count: %lu, min size: %lu, avg_size: %lu, max_size: %lu, total_size: %s, total_allocated: %s",
val_cnt, val_min, val_used/val_cnt, val_max,
fd_smart_size( val_used, tmp1, sizeof(tmp1) ),
fd_smart_size( val_alloc, tmp2, sizeof(tmp2) ) ));
FD_LOG_NOTICE(( "part vec footprint: %s",
fd_smart_size( fd_funk_partvec_footprint(0U), tmp1, sizeof(tmp1) ) ));
}
12 changes: 11 additions & 1 deletion src/funk/fd_funk.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ struct __attribute__((aligned(FD_FUNK_ALIGN))) fd_funk_private {
ulong wksp_tag; /* Tag to use for wksp allocations, positive */
ulong seed; /* Seed for various hashing function used under the hood, arbitrary */
ulong cycle_tag; /* Next cycle_tag to use, used internally for various data integrity checks */
volatile int readonly; /* Read-only flag on database */

/* The funk transaction map stores the details about transactions
in preparation and their relationships to each other. This is a
Expand Down Expand Up @@ -348,6 +349,10 @@ FD_FN_PURE static inline ulong fd_funk_seed( fd_funk_t * funk ) { return funk->s

FD_FN_PURE static inline ulong fd_funk_txn_max( fd_funk_t * funk ) { return funk->txn_max; }

/* Set/clear the readonly flag, which marks the entire database as
* read-only. Any writes will fail. */
static inline void fd_funk_set_readonly( fd_funk_t * funk, int flag ) { funk->readonly = flag; }

/* fd_funk_txn_map returns a pointer in the caller's address space to
the funk's transaction map. */

Expand Down Expand Up @@ -458,7 +463,7 @@ fd_funk_last_publish_rec_tail( fd_funk_t const * funk, /* Assumes curr
FD_FN_PURE static inline fd_alloc_t * /* Lifetime is that of the local join */
fd_funk_alloc( fd_funk_t * funk, /* Assumes current local join */
fd_wksp_t * wksp ) { /* Assumes wksp == fd_funk_wksp( funk ) */
return (fd_alloc_t *)fd_wksp_laddr_fast( wksp, funk->alloc_gaddr );
return fd_alloc_join_cgroup_hint_set( (fd_alloc_t *)fd_wksp_laddr_fast( wksp, funk->alloc_gaddr ), fd_tile_idx() );
}

/* Operations */
Expand Down Expand Up @@ -491,6 +496,11 @@ fd_funk_last_publish_descendant( fd_funk_t * funk,
int
fd_funk_verify( fd_funk_t * funk );

/* fd_funk_log_mem_usage logs useful statistics about memory usage */

void
fd_funk_log_mem_usage( fd_funk_t * funk );

FD_PROTOTYPES_END

#endif /* HEADER_fd_src_funk_fd_funk_h */
17 changes: 11 additions & 6 deletions src/funk/fd_funk_rec.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fd_funk_rec_query_global( fd_funk_t * funk,
/* TODO: const correct and/or fortify? */
do {
fd_funk_xid_key_pair_t pair[1]; fd_funk_xid_key_pair_init( pair, fd_funk_txn_xid( txn ), key );
fd_funk_rec_t const * rec = fd_funk_rec_map_query( rec_map, pair, NULL );
fd_funk_rec_t const * rec = fd_funk_rec_map_query_const( rec_map, pair, NULL );
if( FD_LIKELY( rec ) ) return rec;
txn = fd_funk_txn_parent( (fd_funk_txn_t *)txn, txn_map );
} while( FD_UNLIKELY( txn ) );
Expand All @@ -85,7 +85,7 @@ fd_funk_rec_query_global( fd_funk_t * funk,
/* Query the last published transaction */

fd_funk_xid_key_pair_t pair[1]; fd_funk_xid_key_pair_init( pair, fd_funk_root( funk ), key );
return fd_funk_rec_map_query( rec_map, pair, NULL );
return fd_funk_rec_map_query_const( rec_map, pair, NULL );
}

fd_funk_rec_t const *
Expand Down Expand Up @@ -170,6 +170,7 @@ fd_funk_rec_modify( fd_funk_t * funk,
fd_funk_rec_t const * rec ) {
if( FD_UNLIKELY( (!funk) | (!rec) ) )
return NULL;
FD_TEST(!funk->readonly);

fd_wksp_t * wksp = fd_funk_wksp( funk );

Expand Down Expand Up @@ -266,6 +267,7 @@ fd_funk_rec_insert( fd_funk_t * funk,
fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_INVAL );
return NULL;
}
FD_TEST(!funk->readonly);

fd_wksp_t * wksp = fd_funk_wksp( funk );

Expand Down Expand Up @@ -406,6 +408,7 @@ fd_funk_rec_insert_prealloc( fd_funk_t * funk,
fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_INVAL );
return NULL;
}
FD_TEST(!funk->readonly);

fd_wksp_t * wksp = fd_funk_wksp( funk );

Expand Down Expand Up @@ -519,6 +522,7 @@ fd_funk_rec_fixup_links( fd_funk_t * funk,
fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_INVAL );
return NULL;
}
FD_TEST(!funk->readonly);

fd_wksp_t * wksp = fd_funk_wksp( funk );

Expand All @@ -540,7 +544,7 @@ fd_funk_rec_fixup_links( fd_funk_t * funk,
_rec_head_idx = &txn->rec_head_idx;
_rec_tail_idx = &txn->rec_tail_idx;
}

ulong rec_idx = (ulong)(rec - rec_map);
if( FD_UNLIKELY( rec_idx>=rec_max ) ) FD_LOG_CRIT(( "memory corruption detected (bad idx)" ));

Expand Down Expand Up @@ -572,6 +576,7 @@ fd_funk_rec_remove( fd_funk_t * funk,
int erase ) {

if( FD_UNLIKELY( !funk ) ) return FD_FUNK_ERR_INVAL;
FD_TEST(!funk->readonly);

fd_wksp_t * wksp = fd_funk_wksp( funk );

Expand Down Expand Up @@ -763,7 +768,7 @@ fd_funk_rec_write_prepare( fd_funk_t * funk,
else
rec_con = irec;

if ( rec_con ) {
if ( rec_con && !FD_UNLIKELY( rec_con->flags & FD_FUNK_REC_FLAG_ERASE ) ) {
/* We have an incarnation of the record */
if ( txn == fd_funk_rec_txn( rec_con, fd_funk_txn_map( funk, wksp ) ) ) {
/* The record is already in the right transaction */
Expand Down Expand Up @@ -808,7 +813,7 @@ fd_funk_rec_write_prepare( fd_funk_t * funk,
fd_funk_rec_t *
fd_funk_rec_write_prepare_prealloc( fd_funk_t * funk,
fd_funk_txn_t * txn,
fd_funk_rec_key_t const * key,
fd_funk_rec_key_t const * key,
ulong min_val_size,
int do_create,
fd_funk_rec_t * prealloc_rec,
Expand Down Expand Up @@ -837,7 +842,7 @@ fd_funk_rec_write_prepare_prealloc( fd_funk_t * funk,
} else {
/* Copy the record into the transaction */
rec = fd_funk_rec_modify( funk, fd_funk_rec_insert_prealloc( funk, txn, key, prealloc_rec, opt_err ) );

if ( !rec )
return NULL;

Expand Down
1 change: 0 additions & 1 deletion src/funk/fd_funk_rec.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
declarations. */

#define FD_FUNK_REC_ALIGN (32UL)
#define FD_FUNK_REC_FOOTPRINT (160UL)

/* FD_FUNK_REC_FLAG_* are flags that can be bit-ored together to specify
how records are to be interpreted.
Expand Down
7 changes: 7 additions & 0 deletions src/funk/fd_funk_txn.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ fd_funk_txn_prepare( fd_funk_t * funk,
if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "NULL funk" ));
return NULL;
}
FD_TEST(!funk->readonly);

fd_funk_txn_t * map = fd_funk_txn_map( funk, fd_funk_wksp( funk ) );

Expand Down Expand Up @@ -178,6 +179,8 @@ fd_funk_txn_cancel_childless( fd_funk_t * funk,
ulong txn_max,
ulong txn_idx ) {

FD_TEST(!funk->readonly);

/* Remove all records used by this transaction. Note that we don't
need to bother doing all the individual removal operations as we
are removing the whole list. We do reset the record transaction
Expand Down Expand Up @@ -684,6 +687,8 @@ fd_funk_txn_publish_funk_child( fd_funk_t * funk,
ulong tag,
ulong txn_idx ) {

FD_TEST(!funk->readonly);

/* Apply the updates in txn to the last published transactions */

fd_wksp_t * wksp = fd_funk_wksp( funk );
Expand Down Expand Up @@ -808,6 +813,7 @@ fd_funk_txn_publish_into_parent( fd_funk_t * funk,
if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "NULL funk" ));
return FD_FUNK_ERR_INVAL;
}
FD_TEST(!funk->readonly);

fd_wksp_t * wksp = fd_funk_wksp( funk );

Expand Down Expand Up @@ -861,6 +867,7 @@ fd_funk_txn_merge_all_children( fd_funk_t * funk,
if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "NULL funk" ));
return FD_FUNK_ERR_INVAL;
}
FD_TEST(!funk->readonly);

fd_wksp_t * wksp = fd_funk_wksp( funk );

Expand Down
1 change: 0 additions & 1 deletion src/funk/test_funk_rec.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#if FD_HAS_HOSTED

FD_STATIC_ASSERT( FD_FUNK_REC_ALIGN == 32UL, unit_test );
FD_STATIC_ASSERT( FD_FUNK_REC_FOOTPRINT==160UL, unit_test );

FD_STATIC_ASSERT( FD_FUNK_REC_FLAG_ERASE==1UL, unit_test );

Expand Down

0 comments on commit d9c892b

Please sign in to comment.