Skip to content

Commit

Permalink
Make FUNCTION RESTORE FLUSH flush async based on lazyfree-lazy-user-f…
Browse files Browse the repository at this point in the history
…lush (#1254)

FUNCTION RESTORE have a FLUSH option, it will delete all the existing
libraries before restoring the payload. If for some reasons, there are
a lot of libraries, we will block a while in here.

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin authored Nov 21, 2024
1 parent f553ccb commit 6038eda
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
17 changes: 13 additions & 4 deletions src/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ void functionsLibCtxClearCurrent(int async) {
}
}

/* Free the given functions ctx */
static void functionsLibCtxFreeGeneric(functionsLibCtx *functions_lib_ctx, int async) {
if (async) {
freeFunctionsAsync(functions_lib_ctx);
} else {
functionsLibCtxFree(functions_lib_ctx);
}
}

/* Free the given functions ctx */
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) {
functionsLibCtxClear(functions_lib_ctx);
Expand All @@ -196,8 +205,8 @@ void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) {

/* Swap the current functions ctx with the given one.
* Free the old functions ctx. */
void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx) {
functionsLibCtxFree(curr_functions_lib_ctx);
void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async) {
functionsLibCtxFreeGeneric(curr_functions_lib_ctx, async);
curr_functions_lib_ctx = new_lib_ctx;
}

Expand Down Expand Up @@ -769,7 +778,7 @@ void functionRestoreCommand(client *c) {
}

if (restore_replicy == restorePolicy_Flush) {
functionsLibCtxSwapWithCurrent(functions_lib_ctx);
functionsLibCtxSwapWithCurrent(functions_lib_ctx, server.lazyfree_lazy_user_flush);
functions_lib_ctx = NULL; /* avoid releasing the f_ctx in the end */
} else {
if (libraryJoin(curr_functions_lib_ctx, functions_lib_ctx, restore_replicy == restorePolicy_Replace, &err) !=
Expand All @@ -789,7 +798,7 @@ void functionRestoreCommand(client *c) {
addReply(c, shared.ok);
}
if (functions_lib_ctx) {
functionsLibCtxFree(functions_lib_ctx);
functionsLibCtxFreeGeneric(functions_lib_ctx, server.lazyfree_lazy_user_flush);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx);
functionsLibCtx *functionsLibCtxGetCurrent(void);
functionsLibCtx *functionsLibCtxCreate(void);
void functionsLibCtxClearCurrent(int async);
void functionsLibCtxFree(functionsLibCtx *lib_ctx);
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx);
void functionsLibCtxClear(functionsLibCtx *lib_ctx);
void functionsLibCtxSwapWithCurrent(functionsLibCtx *lib_ctx);
void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async);

int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err);

Expand Down
2 changes: 1 addition & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2289,7 +2289,7 @@ void readSyncBulkPayload(connection *conn) {
swapMainDbWithTempDb(diskless_load_tempDb);

/* swap existing functions ctx with the temporary one */
functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx);
functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 0);

moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED,
NULL);
Expand Down

0 comments on commit 6038eda

Please sign in to comment.