diff --git a/c_src/atom_names.h b/c_src/atom_names.h index 39f26af..407c720 100644 --- a/c_src/atom_names.h +++ b/c_src/atom_names.h @@ -29,79 +29,13 @@ ATOM_MAP(not_found); ATOM_MAP(erlfdb_error); ATOM_MAP(erlfdb_future); ATOM_MAP(erlfdb_database); +ATOM_MAP(erlfdb_tenant); ATOM_MAP(erlfdb_transaction); ATOM_MAP(invalid_future_type); ATOM_MAP(writes_not_allowed); -// Database Options -ATOM_MAP(location_cache_size); -ATOM_MAP(max_watches); -ATOM_MAP(machine_id); -ATOM_MAP(datacenter_id); - - -// Transaction Options -ATOM_MAP(causal_write_risky); -ATOM_MAP(causal_read_risky); -ATOM_MAP(causal_read_disable); -ATOM_MAP(next_write_no_write_conflict_range); -ATOM_MAP(read_your_writes_enable); -ATOM_MAP(read_your_writes_disable); -ATOM_MAP(read_ahead_disable); -ATOM_MAP(durability_datacenter); -ATOM_MAP(durability_risky); -ATOM_MAP(durability_dev_null_is_web_scale); -ATOM_MAP(priority_system_immediate); -ATOM_MAP(priority_batch); -ATOM_MAP(initialize_new_database); -ATOM_MAP(access_system_keys); -ATOM_MAP(read_system_keys); -ATOM_MAP(debug_retry_logging); -ATOM_MAP(transaction_logging_enable); -ATOM_MAP(debug_transaction_identifier); -ATOM_MAP(transaction_logging_max_field_length); -ATOM_MAP(log_transaction); -ATOM_MAP(timeout); -ATOM_MAP(retry_limit); -ATOM_MAP(max_retry_delay); -ATOM_MAP(snapshot_ryw_enable); -ATOM_MAP(snapshot_ryw_disable); -ATOM_MAP(lock_aware); -ATOM_MAP(used_during_commit_protection_disable); -ATOM_MAP(read_lock_aware); -ATOM_MAP(size_limit); -ATOM_MAP(allow_writes); -ATOM_MAP(disallow_writes); -ATOM_MAP(include_port_in_address); -ATOM_MAP(use_provisional_proxies); -ATOM_MAP(report_conflicting_keys); - - -// Streaming mode -ATOM_MAP(want_all); -ATOM_MAP(iterator); -ATOM_MAP(exact); -ATOM_MAP(small); -ATOM_MAP(medium); -ATOM_MAP(large); -ATOM_MAP(serial); - - -// Atomic Mutation Types -ATOM_MAP(add); -ATOM_MAP(bit_and); -ATOM_MAP(bit_or); -ATOM_MAP(bit_xor); -ATOM_MAP(append_if_fits); -ATOM_MAP(max); -ATOM_MAP(min); -ATOM_MAP(byte_min); -ATOM_MAP(byte_max); -ATOM_MAP(set_versionstamped_key); -ATOM_MAP(set_versionstamped_value); - // Conflict Range Types ATOM_MAP(read); diff --git a/c_src/main.c b/c_src/main.c index 6dc7a5c..a604b05 100644 --- a/c_src/main.c +++ b/c_src/main.c @@ -48,6 +48,10 @@ typedef struct _ErlFDBSt } ErlFDBSt; +#define FDB_TR_EXTRA_OPTION_ALLOW_WRITES -10000 +#define FDB_TR_EXTRA_OPTION_DISALLOW_WRITES -10001 + + static void* erlfdb_network_thread(void* arg) { @@ -485,6 +489,7 @@ erlfdb_network_set_option(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) // this cast is unsafe, but we guarantee it in the Erlang layer option = option_value; + if(!enif_inspect_binary(env, argv[1], &value)) { return enif_make_badarg(env); } @@ -747,6 +752,7 @@ erlfdb_database_set_option(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) ErlNifBinary value; fdb_error_t err; void* res; + int option_value; if(st->lib_state != ErlFDB_CONNECTED) { return enif_make_badarg(env); @@ -761,36 +767,13 @@ erlfdb_database_set_option(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) } d = (ErlFDBDatabase*) res; - if(IS_ATOM(argv[1], location_cache_size)) { - option = FDB_DB_OPTION_LOCATION_CACHE_SIZE; - } else if(IS_ATOM(argv[1], max_watches)) { - option = FDB_DB_OPTION_MAX_WATCHES; - } else if(IS_ATOM(argv[1], machine_id)) { - option = FDB_DB_OPTION_MACHINE_ID; - } else if(IS_ATOM(argv[1], datacenter_id)) { - option = FDB_DB_OPTION_DATACENTER_ID; - } else if(IS_ATOM(argv[1], read_your_writes_enable)) { - option = FDB_DB_OPTION_SNAPSHOT_RYW_ENABLE; - } else if(IS_ATOM(argv[1], read_your_writes_disable)) { - option = FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE; - } else if(IS_ATOM(argv[1], transaction_logging_max_field_length)) { - option = FDB_DB_OPTION_TRANSACTION_LOGGING_MAX_FIELD_LENGTH; - } else if(IS_ATOM(argv[1], timeout)) { - option = FDB_DB_OPTION_TRANSACTION_TIMEOUT; - } else if(IS_ATOM(argv[1], retry_limit)) { - option = FDB_DB_OPTION_TRANSACTION_RETRY_LIMIT; - } else if(IS_ATOM(argv[1], max_retry_delay)) { - option = FDB_DB_OPTION_TRANSACTION_MAX_RETRY_DELAY; - } else if(IS_ATOM(argv[1], size_limit)) { - option = FDB_DB_OPTION_TRANSACTION_SIZE_LIMIT; - } else if(IS_ATOM(argv[1], causal_read_risky)) { - option = FDB_DB_OPTION_TRANSACTION_CAUSAL_READ_RISKY; - } else if(IS_ATOM(argv[1], include_port_in_address)) { - option = FDB_DB_OPTION_TRANSACTION_INCLUDE_PORT_IN_ADDRESS; - } else { + if(!enif_get_int(env, argv[1], &option_value)) { return enif_make_badarg(env); } + // this cast is unsafe, but we guarantee it in the Erlang layer + option = option_value; + if(!enif_inspect_binary(env, argv[2], &value)) { return enif_make_badarg(env); } @@ -810,6 +793,104 @@ erlfdb_database_set_option(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) } +static ERL_NIF_TERM +erlfdb_database_open_tenant( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[] + ) +{ + ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env); + ErlFDBDatabase* d; + ErlFDBTenant* t; + FDBTenant* tenant; + ERL_NIF_TERM ret; + void* res; + ErlNifBinary bin; + fdb_error_t err; + + if(st->lib_state != ErlFDB_CONNECTED) { + return enif_make_badarg(env); + } + + if(argc != 2) { + return enif_make_badarg(env); + } + + if(!enif_get_resource(env, argv[0], ErlFDBDatabaseRes, &res)) { + return enif_make_badarg(env); + } + d = (ErlFDBDatabase*) res; + + if(!enif_inspect_binary(env, argv[1], &bin)){ + return enif_make_badarg(env); + } + + err = fdb_database_open_tenant(d->database, (uint8_t*) bin.data, bin.size, &tenant); + if(err != 0) { + return erlfdb_erlang_error(env, err); + } + + t = enif_alloc_resource(ErlFDBTenantRes, sizeof(ErlFDBTenant)); + t->tenant = tenant; + + ret = enif_make_resource(env, t); + enif_release_resource(t); + return T2(env, ATOM_erlfdb_tenant, ret); +} + + +static ERL_NIF_TERM +erlfdb_tenant_create_transaction( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[] + ) +{ + ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env); + ErlFDBTenant* tenant; + ErlFDBTransaction* t; + FDBTransaction* transaction; + ErlNifPid pid; + ERL_NIF_TERM ret; + void* res; + fdb_error_t err; + + if(st->lib_state != ErlFDB_CONNECTED) { + return enif_make_badarg(env); + } + + if(argc != 1) { + return enif_make_badarg(env); + } + + if(!enif_get_resource(env, argv[0], ErlFDBTenantRes, &res)) { + return enif_make_badarg(env); + } + tenant = (ErlFDBTenant*) res; + + err = fdb_tenant_create_transaction(tenant->tenant, &transaction); + if(err != 0) { + return erlfdb_erlang_error(env, err); + } + + t = enif_alloc_resource(ErlFDBTransactionRes, sizeof(ErlFDBTransaction)); + t->transaction = transaction; + + enif_self(env, &pid); + t->owner = enif_make_pid(env, &pid); + + t->txid = 0; + t->read_only = true; + t->writes_allowed = true; + t->has_watches = false; + + ret = enif_make_resource(env, t); + enif_release_resource(t); + return T2(env, ATOM_erlfdb_transaction, ret); +} + + static ERL_NIF_TERM erlfdb_database_create_transaction( ErlNifEnv* env, @@ -874,6 +955,7 @@ erlfdb_transaction_set_option( ErlNifBinary value; fdb_error_t err; void* res; + int option_value; if(st->lib_state != ErlFDB_CONNECTED) { return enif_make_badarg(env); @@ -892,10 +974,14 @@ erlfdb_transaction_set_option( return enif_make_badarg(env); } - if(IS_ATOM(argv[1], allow_writes)) { + if(!enif_get_int(env, argv[1], &option_value)) { + return enif_make_badarg(env); + } + + if (option_value == FDB_TR_EXTRA_OPTION_ALLOW_WRITES) { t->writes_allowed = true; return ATOM_ok; - } else if (IS_ATOM(argv[1], disallow_writes)) { + } else if (option_value == FDB_TR_EXTRA_OPTION_DISALLOW_WRITES) { if(!t->read_only) { return enif_make_badarg(env); } @@ -903,74 +989,9 @@ erlfdb_transaction_set_option( return ATOM_ok; } + // this cast is unsafe, but we guarantee it in the Erlang layer + option = option_value; - if(IS_ATOM(argv[1], causal_write_risky)) { - option = FDB_TR_OPTION_CAUSAL_WRITE_RISKY; - } else if(IS_ATOM(argv[1], causal_read_risky)) { - option = FDB_TR_OPTION_CAUSAL_READ_RISKY; - } else if(IS_ATOM(argv[1], causal_read_disable)) { - option = FDB_TR_OPTION_CAUSAL_READ_DISABLE; - } else if(IS_ATOM(argv[1], include_port_in_address)) { - option = FDB_TR_OPTION_INCLUDE_PORT_IN_ADDRESS; - } else if(IS_ATOM(argv[1], next_write_no_write_conflict_range)) { - option = FDB_TR_OPTION_NEXT_WRITE_NO_WRITE_CONFLICT_RANGE; - } else if(IS_ATOM(argv[1], read_your_writes_disable)) { - option = FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE; - } else if(IS_ATOM(argv[1], read_ahead_disable)) { - option = FDB_TR_OPTION_READ_AHEAD_DISABLE; - } else if(IS_ATOM(argv[1], durability_datacenter)) { - option = FDB_TR_OPTION_DURABILITY_DATACENTER; - } else if(IS_ATOM(argv[1], durability_risky)) { - option = FDB_TR_OPTION_DURABILITY_RISKY; - } else if(IS_ATOM(argv[1], durability_dev_null_is_web_scale)) { - option = FDB_TR_OPTION_DURABILITY_DEV_NULL_IS_WEB_SCALE; - } else if(IS_ATOM(argv[1], priority_system_immediate)) { - option = FDB_TR_OPTION_PRIORITY_SYSTEM_IMMEDIATE; - } else if(IS_ATOM(argv[1], priority_batch)) { - option = FDB_TR_OPTION_PRIORITY_BATCH; - } else if(IS_ATOM(argv[1], initialize_new_database)) { - option = FDB_TR_OPTION_INITIALIZE_NEW_DATABASE; - } else if(IS_ATOM(argv[1], access_system_keys)) { - option = FDB_TR_OPTION_ACCESS_SYSTEM_KEYS; - } else if(IS_ATOM(argv[1], read_system_keys)) { - option = FDB_TR_OPTION_READ_SYSTEM_KEYS; - } else if(IS_ATOM(argv[1], debug_retry_logging)) { - option = FDB_TR_OPTION_DEBUG_RETRY_LOGGING; - } else if(IS_ATOM(argv[1], transaction_logging_enable)) { - option = FDB_TR_OPTION_TRANSACTION_LOGGING_ENABLE; - } else if(IS_ATOM(argv[1], debug_transaction_identifier)) { - option = FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER; - } else if(IS_ATOM(argv[1], log_transaction)) { - option = FDB_TR_OPTION_LOG_TRANSACTION; - } else if(IS_ATOM(argv[1], transaction_logging_max_field_length)) { - option = FDB_TR_OPTION_TRANSACTION_LOGGING_MAX_FIELD_LENGTH; - } else if(IS_ATOM(argv[1], timeout)) { - option = FDB_TR_OPTION_TIMEOUT; - } else if(IS_ATOM(argv[1], retry_limit)) { - option = FDB_TR_OPTION_RETRY_LIMIT; - } else if(IS_ATOM(argv[1], max_retry_delay)) { - option = FDB_TR_OPTION_MAX_RETRY_DELAY; - } else if(IS_ATOM(argv[1], snapshot_ryw_enable)) { - option = FDB_TR_OPTION_SNAPSHOT_RYW_ENABLE; - } else if(IS_ATOM(argv[1], snapshot_ryw_disable)) { - option = FDB_TR_OPTION_SNAPSHOT_RYW_DISABLE; - } else if(IS_ATOM(argv[1], lock_aware)) { - option = FDB_TR_OPTION_LOCK_AWARE; - } else if(IS_ATOM(argv[1], used_during_commit_protection_disable)) { - option = FDB_TR_OPTION_USED_DURING_COMMIT_PROTECTION_DISABLE; - } else if(IS_ATOM(argv[1], read_lock_aware)) { - option = FDB_TR_OPTION_READ_LOCK_AWARE; - } else if(IS_ATOM(argv[1], size_limit)) { - option = FDB_TR_OPTION_SIZE_LIMIT; - } else if(IS_ATOM(argv[1], use_provisional_proxies)) { - option = FDB_TR_OPTION_USE_PROVISIONAL_PROXIES; -#if FDB_API_VERSION > 620 - } else if(IS_ATOM(argv[1], report_conflicting_keys)) { - option = FDB_TR_OPTION_REPORT_CONFLICTING_KEYS; -#endif - } else { - return enif_make_badarg(env); - } if(!enif_inspect_binary(env, argv[2], &value)) { return enif_make_badarg(env); @@ -1349,6 +1370,7 @@ erlfdb_transaction_get_range( FDBFuture* future; void* res; + int mode_value; if(st->lib_state != ErlFDB_CONNECTED) { return enif_make_badarg(env); @@ -1383,24 +1405,13 @@ erlfdb_transaction_get_range( return enif_make_badarg(env); } - if(IS_ATOM(argv[5], want_all)) { - mode = FDB_STREAMING_MODE_WANT_ALL; - } else if(IS_ATOM(argv[5], iterator)) { - mode = FDB_STREAMING_MODE_ITERATOR; - } else if(IS_ATOM(argv[5], exact)) { - mode = FDB_STREAMING_MODE_EXACT; - } else if(IS_ATOM(argv[5], small)) { - mode = FDB_STREAMING_MODE_SMALL; - } else if(IS_ATOM(argv[5], medium)) { - mode = FDB_STREAMING_MODE_MEDIUM; - } else if(IS_ATOM(argv[5], large)) { - mode = FDB_STREAMING_MODE_LARGE; - } else if(IS_ATOM(argv[5], serial)) { - mode = FDB_STREAMING_MODE_SERIAL; - } else if(!enif_get_int(env, argv[5], &mode)) { + if(!enif_get_int(env, argv[5], &mode_value)) { return enif_make_badarg(env); } + // this cast is unsafe, but we guarantee it in the Erlang layer + mode = mode_value; + if(!enif_get_int(env, argv[6], &iteration)) { return enif_make_badarg(env); } @@ -1605,6 +1616,7 @@ erlfdb_transaction_atomic_op( FDBMutationType mtype; ErlNifBinary param; void* res; + int mt_value; if(st->lib_state != ErlFDB_CONNECTED) { return enif_make_badarg(env); @@ -1635,32 +1647,13 @@ erlfdb_transaction_atomic_op( return enif_make_badarg(env); } - if(IS_ATOM(argv[3], add)) { - mtype = FDB_MUTATION_TYPE_ADD; - } else if(IS_ATOM(argv[3], bit_and)) { - mtype = FDB_MUTATION_TYPE_BIT_AND; - } else if(IS_ATOM(argv[3], bit_or)) { - mtype = FDB_MUTATION_TYPE_BIT_OR; - } else if(IS_ATOM(argv[3], bit_xor)) { - mtype = FDB_MUTATION_TYPE_BIT_XOR; - } else if(IS_ATOM(argv[3], append_if_fits)) { - mtype = FDB_MUTATION_TYPE_APPEND_IF_FITS; - } else if(IS_ATOM(argv[3], max)) { - mtype = FDB_MUTATION_TYPE_MAX; - } else if(IS_ATOM(argv[3], min)) { - mtype = FDB_MUTATION_TYPE_MIN; - } else if(IS_ATOM(argv[3], byte_min)) { - mtype = FDB_MUTATION_TYPE_BYTE_MIN; - } else if(IS_ATOM(argv[3], byte_max)) { - mtype = FDB_MUTATION_TYPE_BYTE_MAX; - } else if(IS_ATOM(argv[3], set_versionstamped_key)) { - mtype = FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_KEY; - } else if(IS_ATOM(argv[3], set_versionstamped_value)) { - mtype = FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_VALUE; - } else { + if(!enif_get_int(env, argv[3], &mt_value)) { return enif_make_badarg(env); } + // this cast is unsafe, but we guarantee it in the Erlang layer + mtype = mt_value; + fdb_transaction_atomic_op( t->transaction, (uint8_t*) key.data, @@ -2290,8 +2283,11 @@ static ErlNifFunc funcs[] = NIF_FUNC(erlfdb_create_database, 1), NIF_FUNC(erlfdb_database_set_option, 3), + NIF_FUNC(erlfdb_database_open_tenant, 2), NIF_FUNC(erlfdb_database_create_transaction, 1), + NIF_FUNC(erlfdb_tenant_create_transaction, 1), + NIF_FUNC(erlfdb_transaction_set_option, 3), NIF_FUNC(erlfdb_transaction_set_read_version, 2), NIF_FUNC(erlfdb_transaction_get_read_version, 1), diff --git a/c_src/resources.c b/c_src/resources.c index 9be74af..7c4e392 100644 --- a/c_src/resources.c +++ b/c_src/resources.c @@ -16,6 +16,7 @@ ErlNifResourceType* ErlFDBFutureRes; ErlNifResourceType* ErlFDBDatabaseRes; ErlNifResourceType* ErlFDBTransactionRes; +ErlNifResourceType* ErlFDBTenantRes; int @@ -58,6 +59,17 @@ erlfdb_init_resources(ErlNifEnv* env) return 0; } + ErlFDBTenantRes = enif_open_resource_type( + env, + NULL, + "erlfdb_tenant", + erlfdb_tenant_dtor, + ERL_NIF_RT_CREATE, + NULL + ); + if(ErlFDBTenantRes == NULL) { + return 0; + } return 1; } @@ -103,6 +115,17 @@ erlfdb_transaction_dtor(ErlNifEnv* env, void* obj) } +void +erlfdb_tenant_dtor(ErlNifEnv* env, void* obj) +{ + ErlFDBTenant* t = (ErlFDBTenant*) obj; + + if(t->tenant != NULL) { + fdb_tenant_destroy(t->tenant); + } +} + + int erlfdb_transaction_is_owner(ErlNifEnv* env, ErlFDBTransaction* t) { diff --git a/c_src/resources.h b/c_src/resources.h index 15ffd51..8dd9f7c 100644 --- a/c_src/resources.h +++ b/c_src/resources.h @@ -22,6 +22,7 @@ extern ErlNifResourceType* ErlFDBFutureRes; extern ErlNifResourceType* ErlFDBDatabaseRes; extern ErlNifResourceType* ErlFDBTransactionRes; +extern ErlNifResourceType* ErlFDBTenantRes; typedef enum _ErlFDBFutureType @@ -66,12 +67,16 @@ typedef struct _ErlFDBTransaction bool has_watches; } ErlFDBTransaction; +typedef struct _ErlFDBTenant +{ + FDBTenant* tenant; +} ErlFDBTenant; int erlfdb_init_resources(ErlNifEnv* env); void erlfdb_future_dtor(ErlNifEnv* env, void* obj); void erlfdb_database_dtor(ErlNifEnv* env, void* obj); void erlfdb_transaction_dtor(ErlNifEnv* env, void* obj); - +void erlfdb_tenant_dtor(ErlNifEnv* env, void* obj); int erlfdb_transaction_is_owner(ErlNifEnv* env, ErlFDBTransaction* t); diff --git a/rebar.config b/rebar.config index 5a48517..0ba483f 100644 --- a/rebar.config +++ b/rebar.config @@ -1,18 +1,18 @@ {plugins, [ coveralls, - pc, - {rebar3_dynamic_plugin, {git, "https://github.com/lafirest/rebar3_dynamic_plugin.git"}} + pc +% {rebar3_dynamic_plugin, {git, "https://github.com/lafirest/rebar3_dynamic_plugin.git"}} ]}. {project_plugins, [ erlfmt ]}. -{dynamic_plugin_script, "scripts/sdk_checking.erl"}. +{dynamic_plugin_script, "./scripts/sdk_checking.erl"}. {provider_hooks, [ {pre, [ - {compile, dynamic_plugin}, +% {compile, dynamic_plugin}, {compile, {pc, compile}}, {clean, {pc, clean}} ]} diff --git a/src/erlfdb.erl b/src/erlfdb.erl index 1bc2e36..90a319b 100644 --- a/src/erlfdb.erl +++ b/src/erlfdb.erl @@ -82,6 +82,7 @@ byte_max/3, set_versionstamped_key/3, set_versionstamped_value/3, + compare_and_clear/3, atomic_op/4, % Watches @@ -129,6 +130,9 @@ -define(IS_FUTURE, {erlfdb_future, _, _}). -define(IS_FOLD_FUTURE, {fold_info, _, _}). -define(IS_DB, {erlfdb_database, _}). +-define(IS_TENANT, {erlfdb_tenant, _}). +-define(IS_RES, {_RES_TAG_, _}). +-define(IS_DB_OR_TENANT, (_RES_TAG_ == erlfdb_database orelse _RES_TAG_ == erlfdb_tenant)). -define(IS_TX, {erlfdb_transaction, _}). -define(IS_SS, {erlfdb_snapshot, _}). -define(GET_TX(SS), element(2, SS)). @@ -152,9 +156,11 @@ open(ClusterFile) -> erlfdb_nif:create_database(ClusterFile). create_transaction(?IS_DB = Db) -> - erlfdb_nif:database_create_transaction(Db). + erlfdb_nif:database_create_transaction(Db); +create_transaction(?IS_TENANT = Tenant) -> + erlfdb_nif:tenant_create_transaction(Tenant). -transactional(?IS_DB = Db, UserFun) when is_function(UserFun, 1) -> +transactional(?IS_RES = Db, UserFun) when ?IS_DB_OR_TENANT, is_function(UserFun, 1) -> clear_erlfdb_error(), Tx = create_transaction(Db), do_transaction(Tx, UserFun); @@ -284,7 +290,7 @@ wait_for_all(Futures, Options) -> Futures ). -get(?IS_DB = Db, Key) -> +get(?IS_RES = Db, Key) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> wait(get(Tx, Key)) end); @@ -298,14 +304,14 @@ get_ss(?IS_TX = Tx, Key) -> get_ss(?IS_SS = SS, Key) -> get_ss(?GET_TX(SS), Key). -get_range_split_points(?IS_DB = Db, BeginKey, EndKey, ChunkSize) -> +get_range_split_points(?IS_RES = Db, BeginKey, EndKey, ChunkSize) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> wait(get_range_split_points(Tx, BeginKey, EndKey, ChunkSize)) end); get_range_split_points(?IS_TX = Tx, BeginKey, EndKey, ChunkSize) -> erlfdb_nif:transaction_get_range_split_points(Tx, BeginKey, EndKey, ChunkSize). -get_key(?IS_DB = Db, Key) -> +get_key(?IS_RES = Db, Key) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> wait(get_key(Tx, Key)) end); @@ -320,7 +326,7 @@ get_key_ss(?IS_TX = Tx, Key) -> get_range(DbOrTx, StartKey, EndKey) -> get_range(DbOrTx, StartKey, EndKey, []). -get_range(?IS_DB = Db, StartKey, EndKey, Options) -> +get_range(?IS_RES = Db, StartKey, EndKey, Options) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> get_range(Tx, StartKey, EndKey, Options) end); @@ -342,7 +348,7 @@ get_range_startswith(DbOrTx, Prefix, Options) -> fold_range(DbOrTx, StartKey, EndKey, Fun, Acc) -> fold_range(DbOrTx, StartKey, EndKey, Fun, Acc, []). -fold_range(?IS_DB = Db, StartKey, EndKey, Fun, Acc, Options) -> +fold_range(?IS_RES = Db, StartKey, EndKey, Fun, Acc, Options) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> fold_range(Tx, StartKey, EndKey, Fun, Acc, Options) end); @@ -380,7 +386,7 @@ fold_range_wait(?IS_TX = Tx, ?IS_FOLD_FUTURE = FI, Fun, Acc) -> fold_range_wait(?IS_SS = SS, ?IS_FOLD_FUTURE = FI, Fun, Acc) -> fold_range_wait(?GET_TX(SS), FI, Fun, Acc). -set(?IS_DB = Db, Key, Value) -> +set(?IS_RES = Db, Key, Value) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> set(Tx, Key, Value) end); @@ -389,7 +395,7 @@ set(?IS_TX = Tx, Key, Value) -> set(?IS_SS = SS, Key, Value) -> set(?GET_TX(SS), Key, Value). -clear(?IS_DB = Db, Key) -> +clear(?IS_RES = Db, Key) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> clear(Tx, Key) end); @@ -398,7 +404,7 @@ clear(?IS_TX = Tx, Key) -> clear(?IS_SS = SS, Key) -> clear(?GET_TX(SS), Key). -clear_range(?IS_DB = Db, StartKey, EndKey) -> +clear_range(?IS_RES = Db, StartKey, EndKey) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> clear_range(Tx, StartKey, EndKey) end); @@ -407,7 +413,7 @@ clear_range(?IS_TX = Tx, StartKey, EndKey) -> clear_range(?IS_SS = SS, StartKey, EndKey) -> clear_range(?GET_TX(SS), StartKey, EndKey). -clear_range_startswith(?IS_DB = Db, Prefix) -> +clear_range_startswith(?IS_RES = Db, Prefix) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> clear_range_startswith(Tx, Prefix) end); @@ -447,7 +453,10 @@ set_versionstamped_key(DbOrTx, Key, Param) -> set_versionstamped_value(DbOrTx, Key, Param) -> atomic_op(DbOrTx, Key, Param, set_versionstamped_value). -atomic_op(?IS_DB = Db, Key, Param, Op) -> +compare_and_clear(DbOrTx, Key, Param) -> + atomic_op(DbOrTx, Key, Param, compare_and_clear). + +atomic_op(?IS_RES = Db, Key, Param, Op) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> atomic_op(Tx, Key, Param, Op) end); @@ -456,7 +465,7 @@ atomic_op(?IS_TX = Tx, Key, Param, Op) -> atomic_op(?IS_SS = SS, Key, Param, Op) -> atomic_op(?GET_TX(SS), Key, Param, Op). -watch(?IS_DB = Db, Key) -> +watch(?IS_RES = Db, Key) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> watch(Tx, Key) end); @@ -465,20 +474,20 @@ watch(?IS_TX = Tx, Key) -> watch(?IS_SS = SS, Key) -> watch(?GET_TX(SS), Key). -get_and_watch(?IS_DB = Db, Key) -> +get_and_watch(?IS_RES = Db, Key) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> KeyFuture = get(Tx, Key), WatchFuture = watch(Tx, Key), {wait(KeyFuture), WatchFuture} end). -set_and_watch(?IS_DB = Db, Key, Value) -> +set_and_watch(?IS_RES = Db, Key, Value) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> set(Tx, Key, Value), watch(Tx, Key) end). -clear_and_watch(?IS_DB = Db, Key) -> +clear_and_watch(?IS_RES = Db, Key) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> clear(Tx, Key), watch(Tx, Key) @@ -546,7 +555,7 @@ get_writes_allowed(?IS_TX = Tx) -> get_writes_allowed(?IS_SS = SS) -> get_writes_allowed(?GET_TX(SS)). -get_addresses_for_key(?IS_DB = Db, Key) -> +get_addresses_for_key(?IS_RES = Db, Key) when ?IS_DB_OR_TENANT -> transactional(Db, fun(Tx) -> wait(get_addresses_for_key(Tx, Key)) end); diff --git a/src/erlfdb_nif.erl b/src/erlfdb_nif.erl index 4d1c406..1d97554 100644 --- a/src/erlfdb_nif.erl +++ b/src/erlfdb_nif.erl @@ -29,8 +29,11 @@ create_database/1, database_set_option/2, database_set_option/3, + database_open_tenant/2, database_create_transaction/1, + tenant_create_transaction/1, + transaction_set_option/2, transaction_set_option/3, transaction_set_read_version/2, @@ -69,6 +72,7 @@ -type future() :: {erlfdb_future, reference(), reference()}. -type database() :: {erlfdb_database, reference()}. -type transaction() :: {erlfdb_transaction, reference()}. +-type tenant() :: {erlfdb_tenant, reference()}. -type option_value() :: integer() | binary(). @@ -129,12 +133,25 @@ location_cache_size | max_watches | machine_id - | datacenter_id. + | datacenter_id + | snapshot_ryw_enable + | snapshot_ryw_disable + | transaction_logging_max_field_length + | transaction_timeout + | transaction_retry_limit + | transaction_max_retry_delay + | transaction_size_limit + | transaction_causal_read_risky + | transaction_include_port_in_address + | transaction_bypass_unreadable + | use_config_database + | test_causal_read_risky. -type transaction_option() :: causal_write_risky | causal_read_risky | causal_read_disable + | include_port_in_address | next_write_no_write_conflict_range | read_your_writes_disable | read_ahead_disable @@ -146,28 +163,44 @@ | initialize_new_database | access_system_keys | read_system_keys + | raw_access + | debug_dump | debug_retry_logging | transaction_logging_enable + | debug_transaction_identifier + | log_transaction + | transaction_logging_max_field_length + | server_request_tracing | timeout | retry_limit | max_retry_delay + | size_limit | snapshot_ryw_enable | snapshot_ryw_disable | lock_aware | used_during_commit_protection_disable | read_lock_aware - | size_limit + | use_provisional_proxies + | report_conflicting_keys + | special_key_space_relaxed + | special_key_space_enable_writes + | tag + | auto_throttle_tag + | span_parent + | expensive_clear_cost_estimation_enable + | bypass_unreadable + | use_grv_cache | allow_writes | disallow_writes. -type streaming_mode() :: - stream_want_all - | stream_iterator - | stream_exact - | stream_small - | stream_medium - | stream_large - | stream_serial. + want_all + | iterator + | exact + | small + | medium + | large + | serial. -type atomic_mode() :: add @@ -177,10 +210,11 @@ | append_if_fits | max | min + | set_versionstamped_key + | set_versionstamped_value | byte_min | byte_max - | set_versionstamped_key - | set_versionstamped_value. + | compare_and_clear. -type atomic_operand() :: integer() | binary(). @@ -245,12 +279,21 @@ database_set_option(Database, Option) -> ) -> ok. database_set_option({erlfdb_database, Db}, Opt, Val) -> BinVal = option_val_to_binary(Val), - erlfdb_database_set_option(Db, Opt, BinVal). + OptVal = erlfdb_nif_option:to_database_option(Opt), + erlfdb_database_set_option(Db, OptVal, BinVal). + +-spec database_open_tenant(database(), binary()) -> tenant(). +database_open_tenant({erlfdb_database, Db}, Name) -> + erlfdb_database_open_tenant(Db, Name). -spec database_create_transaction(database()) -> transaction(). database_create_transaction({erlfdb_database, Db}) -> erlfdb_database_create_transaction(Db). +-spec tenant_create_transaction(tenant()) -> transaction(). +tenant_create_transaction({erlfdb_tenant, Db}) -> + erlfdb_tenant_create_transaction(Db). + -spec transaction_set_option(transaction(), Option :: transaction_option()) -> ok. transaction_set_option(Transaction, Option) -> transaction_set_option(Transaction, Option, <<>>). @@ -262,7 +305,8 @@ transaction_set_option(Transaction, Option) -> ) -> ok. transaction_set_option({erlfdb_transaction, Tx}, Opt, Val) -> BinVal = option_val_to_binary(Val), - erlfdb_transaction_set_option(Tx, Opt, BinVal). + OptVal = erlfdb_nif_option:to_transaction_option(Opt), + erlfdb_transaction_set_option(Tx, OptVal, BinVal). -spec transaction_set_read_version(transaction(), Version :: integer()) -> ok. transaction_set_read_version({erlfdb_transaction, Tx}, Version) -> @@ -327,13 +371,14 @@ transaction_get_range( Snapshot, Reverse ) -> + StreamingModeVal = erlfdb_nif_option:to_stream_mode(StreamingMode), erlfdb_transaction_get_range( Tx, StartKeySelector, EndKeySelector, Limit, TargetBytes, - StreamingMode, + StreamingModeVal, Iteration, Snapshot, Reverse @@ -369,7 +414,8 @@ transaction_atomic_op({erlfdb_transaction, Tx}, Key, Operand, OpName) -> Int when is_integer(Int) -> <> end, - erlfdb_transaction_atomic_op(Tx, Key, BinOperand, OpName). + OpValue = erlfdb_nif_option:to_mutation_type(OpName), + erlfdb_transaction_atomic_op(Tx, Key, BinOperand, OpValue). -spec transaction_commit(transaction()) -> future(). transaction_commit({erlfdb_transaction, Tx}) -> @@ -505,12 +551,8 @@ select_api_version(Version) when is_integer(Version), Version > 0 -> ok | error(). network_set_option(Name, Value) -> Option = erlfdb_nif_option:to_network_option(Name), - BinValue = - case Value of - B when is_binary(B) -> B; - I when is_integer(I) -> <> - end, - erlfdb_network_set_option(Option, BinValue). + BinVal = option_val_to_binary(Value), + erlfdb_network_set_option(Option, BinVal). % Sentinel Check erlfdb_can_initialize() -> ?NOT_LOADED. @@ -533,8 +575,12 @@ erlfdb_future_get(_Future) -> ?NOT_LOADED. % Databases erlfdb_create_database(_ClusterFilePath) -> ?NOT_LOADED. erlfdb_database_set_option(_Database, _DatabaseOption, _Value) -> ?NOT_LOADED. +erlfdb_database_open_tenant(_Database, _Tenant) -> ?NOT_LOADED. erlfdb_database_create_transaction(_Database) -> ?NOT_LOADED. +%% Tenants +erlfdb_tenant_create_transaction(_Database) -> ?NOT_LOADED. + % Transactions erlfdb_transaction_set_option(_Transaction, _TransactionOption, _Value) -> ?NOT_LOADED. erlfdb_transaction_set_read_version(_Transaction, _Version) -> ?NOT_LOADED. diff --git a/src/erlfdb_nif_option.erl b/src/erlfdb_nif_option.erl index 8a1b8e4..88dc917 100644 --- a/src/erlfdb_nif_option.erl +++ b/src/erlfdb_nif_option.erl @@ -14,10 +14,20 @@ -module(erlfdb_nif_option). --export([to_network_option/1]). +-export([ + to_network_option/1, + to_database_option/1, + to_transaction_option/1, + to_stream_mode/1, + to_mutation_type/1 +]). -include("fdb_options.hrl"). +%% Some options are not included in the official C API +-define(FDB_TR_EXTRA_OPTION_ALLOW_WRITES, -10000). +-define(FDB_TR_EXTRA_OPTION_DISALLOW_WRITES, -10001). + to_network_option(local_address) -> ?FDB_NET_OPTION_LOCAL_ADDRESS; to_network_option(cluster_file) -> @@ -98,3 +108,188 @@ to_network_option(distributed_client_tracer) -> ?FDB_NET_OPTION_DISTRIBUTED_CLIENT_TRACER; to_network_option(_) -> error(badarg). + +to_database_option(location_cache_size) -> + ?FDB_DB_OPTION_LOCATION_CACHE_SIZE; +to_database_option(max_watches) -> + ?FDB_DB_OPTION_MAX_WATCHES; +to_database_option(machine_id) -> + ?FDB_DB_OPTION_MACHINE_ID; +to_database_option(datacenter_id) -> + ?FDB_DB_OPTION_DATACENTER_ID; +to_database_option(snapshot_ryw_enable) -> + ?FDB_DB_OPTION_SNAPSHOT_RYW_ENABLE; +to_database_option(snapshot_ryw_disable) -> + ?FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE; +to_database_option(transaction_logging_max_field_length) -> + ?FDB_DB_OPTION_TRANSACTION_LOGGING_MAX_FIELD_LENGTH; +to_database_option(transaction_timeout) -> + ?FDB_DB_OPTION_TRANSACTION_TIMEOUT; +to_database_option(transaction_retry_limit) -> + ?FDB_DB_OPTION_TRANSACTION_RETRY_LIMIT; +to_database_option(transaction_max_retry_delay) -> + ?FDB_DB_OPTION_TRANSACTION_MAX_RETRY_DELAY; +to_database_option(transaction_size_limit) -> + ?FDB_DB_OPTION_TRANSACTION_SIZE_LIMIT; +to_database_option(transaction_causal_read_risky) -> + ?FDB_DB_OPTION_TRANSACTION_CAUSAL_READ_RISKY; +to_database_option(transaction_include_port_in_address) -> + ?FDB_DB_OPTION_TRANSACTION_INCLUDE_PORT_IN_ADDRESS; +to_database_option(transaction_bypass_unreadable) -> + ?FDB_DB_OPTION_TRANSACTION_BYPASS_UNREADABLE; +to_database_option(use_config_database) -> + ?FDB_DB_OPTION_USE_CONFIG_DATABASE; +to_database_option(test_causal_read_risky) -> + ?FDB_DB_OPTION_TEST_CAUSAL_READ_RISKY; +to_database_option(_) -> + error(badarg). + +to_transaction_option(causal_write_risky) -> + ?FDB_TR_OPTION_CAUSAL_WRITE_RISKY; +to_transaction_option(causal_read_risky) -> + ?FDB_TR_OPTION_CAUSAL_READ_RISKY; +to_transaction_option(causal_read_disable) -> + ?FDB_TR_OPTION_CAUSAL_READ_DISABLE; +to_transaction_option(include_port_in_address) -> + ?FDB_TR_OPTION_INCLUDE_PORT_IN_ADDRESS; +to_transaction_option(next_write_no_write_conflict_range) -> + ?FDB_TR_OPTION_NEXT_WRITE_NO_WRITE_CONFLICT_RANGE; +to_transaction_option(read_your_writes_disable) -> + ?FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE; +to_transaction_option(read_ahead_disable) -> + ?FDB_TR_OPTION_READ_AHEAD_DISABLE; +to_transaction_option(durability_datacenter) -> + ?FDB_TR_OPTION_DURABILITY_DATACENTER; +to_transaction_option(durability_risky) -> + ?FDB_TR_OPTION_DURABILITY_RISKY; +to_transaction_option(durability_dev_null_is_web_scale) -> + ?FDB_TR_OPTION_DURABILITY_DEV_NULL_IS_WEB_SCALE; +to_transaction_option(priority_system_immediate) -> + ?FDB_TR_OPTION_PRIORITY_SYSTEM_IMMEDIATE; +to_transaction_option(priority_batch) -> + ?FDB_TR_OPTION_PRIORITY_BATCH; +to_transaction_option(initialize_new_database) -> + ?FDB_TR_OPTION_INITIALIZE_NEW_DATABASE; +to_transaction_option(access_system_keys) -> + ?FDB_TR_OPTION_ACCESS_SYSTEM_KEYS; +to_transaction_option(read_system_keys) -> + ?FDB_TR_OPTION_READ_SYSTEM_KEYS; +to_transaction_option(raw_access) -> + ?FDB_TR_OPTION_RAW_ACCESS; +to_transaction_option(debug_dump) -> + ?FDB_TR_OPTION_DEBUG_DUMP; +to_transaction_option(debug_retry_logging) -> + ?FDB_TR_OPTION_DEBUG_RETRY_LOGGING; +to_transaction_option(transaction_logging_enable) -> + ?FDB_TR_OPTION_TRANSACTION_LOGGING_ENABLE; +to_transaction_option(debug_transaction_identifier) -> + ?FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER; +to_transaction_option(log_transaction) -> + ?FDB_TR_OPTION_LOG_TRANSACTION; +to_transaction_option(transaction_logging_max_field_length) -> + ?FDB_TR_OPTION_TRANSACTION_LOGGING_MAX_FIELD_LENGTH; +to_transaction_option(server_request_tracing) -> + ?FDB_TR_OPTION_SERVER_REQUEST_TRACING; +to_transaction_option(timeout) -> + ?FDB_TR_OPTION_TIMEOUT; +to_transaction_option(retry_limit) -> + ?FDB_TR_OPTION_RETRY_LIMIT; +to_transaction_option(max_retry_delay) -> + ?FDB_TR_OPTION_MAX_RETRY_DELAY; +to_transaction_option(size_limit) -> + ?FDB_TR_OPTION_SIZE_LIMIT; +to_transaction_option(snapshot_ryw_enable) -> + ?FDB_TR_OPTION_SNAPSHOT_RYW_ENABLE; +to_transaction_option(snapshot_ryw_disable) -> + ?FDB_TR_OPTION_SNAPSHOT_RYW_DISABLE; +to_transaction_option(lock_aware) -> + ?FDB_TR_OPTION_LOCK_AWARE; +to_transaction_option(used_during_commit_protection_disable) -> + ?FDB_TR_OPTION_USED_DURING_COMMIT_PROTECTION_DISABLE; +to_transaction_option(read_lock_aware) -> + ?FDB_TR_OPTION_READ_LOCK_AWARE; +to_transaction_option(use_provisional_proxies) -> + ?FDB_TR_OPTION_USE_PROVISIONAL_PROXIES; +to_transaction_option(report_conflicting_keys) -> + ?FDB_TR_OPTION_REPORT_CONFLICTING_KEYS; +to_transaction_option(special_key_space_relaxed) -> + ?FDB_TR_OPTION_SPECIAL_KEY_SPACE_RELAXED; +to_transaction_option(special_key_space_enable_writes) -> + ?FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES; +to_transaction_option(tag) -> + ?FDB_TR_OPTION_TAG; +to_transaction_option(auto_throttle_tag) -> + ?FDB_TR_OPTION_AUTO_THROTTLE_TAG; +to_transaction_option(span_parent) -> + ?FDB_TR_OPTION_SPAN_PARENT; +to_transaction_option(expensive_clear_cost_estimation_enable) -> + ?FDB_TR_OPTION_EXPENSIVE_CLEAR_COST_ESTIMATION_ENABLE; +to_transaction_option(bypass_unreadable) -> + ?FDB_TR_OPTION_BYPASS_UNREADABLE; +to_transaction_option(use_grv_cache) -> + ?FDB_TR_OPTION_USE_GRV_CACHE; +to_transaction_option(allow_writes) -> + ?FDB_TR_EXTRA_OPTION_ALLOW_WRITES; +to_transaction_option(disallow_writes) -> + ?FDB_TR_EXTRA_OPTION_DISALLOW_WRITES; +to_transaction_option(_) -> + error(badarg). + +to_stream_mode(want_all) -> + ?FDB_STREAMING_MODE_WANT_ALL; +to_stream_mode(iterator) -> + ?FDB_STREAMING_MODE_ITERATOR; +to_stream_mode(exact) -> + ?FDB_STREAMING_MODE_EXACT; +to_stream_mode(small) -> + ?FDB_STREAMING_MODE_SMALL; +to_stream_mode(medium) -> + ?FDB_STREAMING_MODE_MEDIUM; +to_stream_mode(large) -> + ?FDB_STREAMING_MODE_LARGE; +to_stream_mode(serial) -> + ?FDB_STREAMING_MODE_SERIAL; +%% the below codes are special for the bindingtester +to_stream_mode(?FDB_STREAMING_MODE_WANT_ALL) -> + ?FDB_STREAMING_MODE_WANT_ALL; +to_stream_mode(?FDB_STREAMING_MODE_ITERATOR) -> + ?FDB_STREAMING_MODE_ITERATOR; +to_stream_mode(?FDB_STREAMING_MODE_EXACT) -> + ?FDB_STREAMING_MODE_EXACT; +to_stream_mode(?FDB_STREAMING_MODE_SMALL) -> + ?FDB_STREAMING_MODE_SMALL; +to_stream_mode(?FDB_STREAMING_MODE_MEDIUM) -> + ?FDB_STREAMING_MODE_MEDIUM; +to_stream_mode(?FDB_STREAMING_MODE_LARGE) -> + ?FDB_STREAMING_MODE_LARGE; +to_stream_mode(?FDB_STREAMING_MODE_SERIAL) -> + ?FDB_STREAMING_MODE_SERIAL; +to_stream_mode(_) -> + error(badarg). + +to_mutation_type(add) -> + ?FDB_MUTATION_TYPE_ADD; +to_mutation_type(bit_and) -> + ?FDB_MUTATION_TYPE_BIT_AND; +to_mutation_type(bit_or) -> + ?FDB_MUTATION_TYPE_BIT_OR; +to_mutation_type(bit_xor) -> + ?FDB_MUTATION_TYPE_BIT_XOR; +to_mutation_type(append_if_fits) -> + ?FDB_MUTATION_TYPE_APPEND_IF_FITS; +to_mutation_type(max) -> + ?FDB_MUTATION_TYPE_MAX; +to_mutation_type(min) -> + ?FDB_MUTATION_TYPE_MIN; +to_mutation_type(set_versionstamped_key) -> + ?FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_KEY; +to_mutation_type(set_versionstamped_value) -> + ?FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_VALUE; +to_mutation_type(byte_min) -> + ?FDB_MUTATION_TYPE_BYTE_MIN; +to_mutation_type(byte_max) -> + ?FDB_MUTATION_TYPE_BYTE_MAX; +to_mutation_type(compare_and_clear) -> + ?FDB_MUTATION_TYPE_COMPARE_AND_CLEAR; +to_mutation_type(_) -> + error(badarg). diff --git a/src/erlfdb_tenant.erl b/src/erlfdb_tenant.erl new file mode 100644 index 0000000..d088ba1 --- /dev/null +++ b/src/erlfdb_tenant.erl @@ -0,0 +1,57 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(erlfdb_tenant). + +-export([create_tenant/2, open_tenant/2, delete_tenant/2]). + +-define(IS_DB, {erlfdb_database, _}). +-define(TENANT_MAP_PREFIX, <<16#FF, 16#FF, "/management/tenant_map/">>). + +-import(erlfdb, [get/2, clear/2, set/3, wait/1, transactional/2, set_option/2]). + +create_tenant(?IS_DB = Db, Tenant) -> + transactional(Db, fun(Tx) -> + create_tenant(Tx, Tenant) + end); +create_tenant(Tx, Tenant) -> + set_option(Tx, special_key_space_enable_writes), + Key = tenant_key(Tenant), + case check_tenant_existence(Tx, Key) of + not_found -> + set(Tx, Key, <<>>); + _ -> + {error, tenant_already_exists} + end. + +open_tenant(?IS_DB = Db, Name) -> + erlfdb_nif:database_open_tenant(Db, Name). + +delete_tenant(?IS_DB = Db, Tenant) -> + transactional(Db, fun(Tx) -> + delete_tenant(Tx, Tenant) + end); +delete_tenant(Tx, Tenant) -> + set_option(Tx, special_key_space_enable_writes), + Key = tenant_key(Tenant), + case check_tenant_existence(Tx, Key) of + not_found -> + {error, tenant_not_found}; + _ -> + clear(Tx, Key) + end. + +check_tenant_existence(Tx, Key) -> + wait(get(Tx, Key)). + +tenant_key(Tenant) -> + <>. diff --git a/test/erlfdb_03_transaction_options_test.erl b/test/erlfdb_03_transaction_options_test.erl index 3ce6a1a..f1a64fe 100644 --- a/test/erlfdb_03_transaction_options_test.erl +++ b/test/erlfdb_03_transaction_options_test.erl @@ -89,7 +89,7 @@ cannot_set_watches_if_writes_disallowed_test() -> size_limit_on_db_handle_test() -> Db1 = erlfdb_util:get_test_db(), - erlfdb:set_option(Db1, size_limit, 10000), + erlfdb:set_option(Db1, transaction_size_limit, 10000), ?assertError( {erlfdb_error, 2101}, erlfdb:transactional(Db1, fun(Tx) -> diff --git a/test/tester.es b/test/tester.es index 17250ad..734042d 100755 --- a/test/tester.es +++ b/test/tester.es @@ -15,6 +15,7 @@ -record(st, { db, + tenant, tx_mgr, tx_name, instructions, @@ -227,14 +228,21 @@ stack_pop_tuples(St, Count) -> get_transaction(TxName) -> get({'$erlfdb_tx', TxName}). -new_transaction(Db, TxName) -> - Tx = erlfdb:create_transaction(Db), +new_transaction(#st{db = Db, tenant = Tenant}, TxName) -> + Src = + case Tenant of + undefined -> + Db; + _ -> + Tenant + end, + Tx = erlfdb:create_transaction(Src), put({'$erlfdb_tx', TxName}, Tx). -switch_transaction(Db, TxName) -> +switch_transaction(St, TxName) -> case get_transaction(TxName) of undefined -> - new_transaction(Db, TxName); + new_transaction(St, TxName); _ -> ok end. @@ -305,6 +313,7 @@ init_run_loop(Db, Prefix) -> {StartKey, EndKey} = erlfdb_tuple:range({Prefix}), St = #st{ db = Db, + tenant = undefined, tx_name = Prefix, instructions = erlfdb:get_range(Db, StartKey, EndKey), op_tuple = undefined, @@ -443,11 +452,11 @@ execute(_TxObj, St, <<"WAIT_FUTURE">>) -> stack_push(St#st.stack, Value), St; execute(_TxObj, St, <<"NEW_TRANSACTION">>) -> - new_transaction(St#st.db, St#st.tx_name), + new_transaction(St, St#st.tx_name), St; execute(_TxObj, St, <<"USE_TRANSACTION">>) -> TxName = stack_pop(St), - switch_transaction(St#st.db, TxName), + switch_transaction(St, TxName), St#st{ tx_name = TxName }; @@ -756,6 +765,22 @@ execute(TxObj, St, <<"GET_RANGE_SPLIT_POINTS">>) -> Result = erlfdb:get_range_split_points(TxObj, Start, End, ChunkSize), stack_push_range(St, Result), St; +execute(TxObj, St, <<"TENANT_CREATE">>) -> + Tenant = stack_pop(St), + _Result = erlfdb_tenant:create_tenant(TxObj, Tenant), + stack_push(St, <<"RESULT_NOT_PRESENT">>), + St; +execute(TxObj, St, <<"TENANT_DELETE">>) -> + Tenant = stack_pop(St), + _Result = erlfdb_tenant:delete_tenant(TxObj, Tenant), + stack_push(St, <<"RESULT_NOT_PRESENT">>), + St; +execute(_TxObj, #st{db = Db} = St, <<"TENANT_SET_ACTIVE">>) -> + Tenant = stack_pop(St), + Result = erlfdb_tenant:open_tenant(Db, Tenant), + St#st{tenant = Result}; +execute(_TxObj, St, <<"TENANT_CLEAR_ACTIVE">>) -> + St#st{tenant = undefined}; execute(_TxObj, _St, UnknownOp) -> erlang:error({unknown_op, UnknownOp}).