From 6038eda010dfb99eff908cf0839cc41004383acd Mon Sep 17 00:00:00 2001 From: Binbin Date: Thu, 21 Nov 2024 21:02:05 +0800 Subject: [PATCH] Make FUNCTION RESTORE FLUSH flush async based on lazyfree-lazy-user-flush (#1254) FUNCTION RESTORE have a FLUSH option, it will delete all the existing libraries before restoring the payload. If for some reasons, there are a lot of libraries, we will block a while in here. Signed-off-by: Binbin --- src/functions.c | 17 +++++++++++++---- src/functions.h | 4 ++-- src/replication.c | 2 +- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/functions.c b/src/functions.c index c9ec42b322..916d8fd622 100644 --- a/src/functions.c +++ b/src/functions.c @@ -185,6 +185,15 @@ void functionsLibCtxClearCurrent(int async) { } } +/* Free the given functions ctx */ +static void functionsLibCtxFreeGeneric(functionsLibCtx *functions_lib_ctx, int async) { + if (async) { + freeFunctionsAsync(functions_lib_ctx); + } else { + functionsLibCtxFree(functions_lib_ctx); + } +} + /* Free the given functions ctx */ void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) { functionsLibCtxClear(functions_lib_ctx); @@ -196,8 +205,8 @@ void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) { /* Swap the current functions ctx with the given one. * Free the old functions ctx. */ -void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx) { - functionsLibCtxFree(curr_functions_lib_ctx); +void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async) { + functionsLibCtxFreeGeneric(curr_functions_lib_ctx, async); curr_functions_lib_ctx = new_lib_ctx; } @@ -769,7 +778,7 @@ void functionRestoreCommand(client *c) { } if (restore_replicy == restorePolicy_Flush) { - functionsLibCtxSwapWithCurrent(functions_lib_ctx); + functionsLibCtxSwapWithCurrent(functions_lib_ctx, server.lazyfree_lazy_user_flush); functions_lib_ctx = NULL; /* avoid releasing the f_ctx in the end */ } else { if (libraryJoin(curr_functions_lib_ctx, functions_lib_ctx, restore_replicy == restorePolicy_Replace, &err) != @@ -789,7 +798,7 @@ void functionRestoreCommand(client *c) { addReply(c, shared.ok); } if (functions_lib_ctx) { - functionsLibCtxFree(functions_lib_ctx); + functionsLibCtxFreeGeneric(functions_lib_ctx, server.lazyfree_lazy_user_flush); } } diff --git a/src/functions.h b/src/functions.h index da196cf197..429405bb2d 100644 --- a/src/functions.h +++ b/src/functions.h @@ -134,9 +134,9 @@ size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx); functionsLibCtx *functionsLibCtxGetCurrent(void); functionsLibCtx *functionsLibCtxCreate(void); void functionsLibCtxClearCurrent(int async); -void functionsLibCtxFree(functionsLibCtx *lib_ctx); +void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx); void functionsLibCtxClear(functionsLibCtx *lib_ctx); -void functionsLibCtxSwapWithCurrent(functionsLibCtx *lib_ctx); +void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async); int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err); diff --git a/src/replication.c b/src/replication.c index 75f08c4c89..437ae278ec 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2289,7 +2289,7 @@ void readSyncBulkPayload(connection *conn) { swapMainDbWithTempDb(diskless_load_tempDb); /* swap existing functions ctx with the temporary one */ - functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx); + functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 0); moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED, NULL);