From bc05b3f67718e7823c61a7cb9201226d8f296067 Mon Sep 17 00:00:00 2001 From: shen yushi Date: Sun, 17 Nov 2024 11:02:46 +0800 Subject: [PATCH] Fix memidx dump (#2247) ### What problem does this PR solve? Fix: add delta log of dump by line task in txn bottom instead of in back ground thread. Issue link:#2215 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Test cases --------- Signed-off-by: Jin Hai Co-authored-by: Jin Hai --- .../infinity/remote_thrift/client.py | 10 + .../infinity/remote_thrift/infinity.py | 16 + .../infinity_thrift_rpc/InfinityService.py | 378 ++++++++ .../infinity_thrift_rpc/ttypes.py | 160 ++++ python/restart_test/test_memidx.py | 97 ++- src/admin/admin_executor.cpp | 4 +- src/executor/explain_physical_plan.cpp | 3 + src/executor/operator/physical_command.cpp | 11 + src/executor/operator/physical_flush.cpp | 5 +- src/main/infinity.cpp | 27 +- src/main/infinity.cppm | 4 +- src/main/infinity_context.cpp | 10 +- .../infinity_thrift/InfinityService.cpp | 812 +++++++++++++++++- src/network/infinity_thrift/InfinityService.h | 252 ++++++ .../infinity_thrift/infinity_types.cpp | 244 ++++++ src/network/infinity_thrift/infinity_types.h | 111 +++ src/network/infinity_thrift_service.cpp | 28 + src/network/infinity_thrift_service.cppm | 4 + src/network/infinity_thrift_types.cppm | 2 + src/parser/statement/command_statement.cpp | 1 + src/parser/statement/command_statement.cppm | 1 + src/parser/statement/command_statement.h | 15 +- src/parser/statement/flush_statement.h | 1 + src/planner/explain_ast.cpp | 3 + src/planner/explain_logical_plan.cpp | 5 + src/planner/logical_planner.cpp | 14 + src/planner/logical_planner.cppm | 2 + src/planner/node/logical_command.cpp | 5 + src/planner/node/logical_flush.cpp | 4 + src/storage/bg_task/bg_task.cpp | 2 + src/storage/bg_task/bg_task.cppm | 13 + src/storage/compaction_process.cpp | 9 +- src/storage/compaction_process.cppm | 27 + .../meta/entry/segment_index_entry.cpp | 4 +- src/storage/meta/entry/table_entry.cpp | 3 + .../config/restart_test/test_memidx/5.toml | 23 + thrift/infinity.thrift | 15 + 37 files changed, 2288 insertions(+), 37 deletions(-) create mode 100644 test/data/config/restart_test/test_memidx/5.toml diff --git a/python/infinity_sdk/infinity/remote_thrift/client.py b/python/infinity_sdk/infinity/remote_thrift/client.py index bc5be4183e..55485a250e 100644 --- a/python/infinity_sdk/infinity/remote_thrift/client.py +++ b/python/infinity_sdk/infinity/remote_thrift/client.py @@ -311,3 +311,13 @@ def drop_columns(self, db_name: str, table_name: str, column_names: list): def cleanup(self): return self.client.Cleanup(CommonRequest(session_id=self.session_id)) + + def command(self, command: ttypes.CommandRequest): + command.session_id = self.session_id + print(command) + return self.client.Command(command) + + def flush(self, flush_request: ttypes.FlushRequest): + flush_request.session_id = self.session_id + print(flush_request) + return self.client.Flush(flush_request) diff --git a/python/infinity_sdk/infinity/remote_thrift/infinity.py b/python/infinity_sdk/infinity/remote_thrift/infinity.py index 40ccc3f3f1..028eab14a6 100644 --- a/python/infinity_sdk/infinity/remote_thrift/infinity.py +++ b/python/infinity_sdk/infinity/remote_thrift/infinity.py @@ -112,6 +112,22 @@ def optimize(self, db_name: str, table_name: str, optimize_opt: ttypes.OptimizeO else: raise InfinityException(res.error_code, res.error_msg) + def test_command(self, command_content: str): + command = ttypes.CommandRequest() + command.command_type = "test_command" + command.test_command_content = command_content + self._client.command(command) + + def flush_data(self): + flush_request = ttypes.FlushRequest() + flush_request.flush_type = "data" + self._client.flush(flush_request) + + def flush_delta(self): + flush_request = ttypes.FlushRequest() + flush_request.flush_type = "delta" + self._client.flush(flush_request) + def disconnect(self): res = self._client.disconnect() if res.error_code == ErrorCode.OK: diff --git a/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/InfinityService.py b/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/InfinityService.py index a23bb7057e..7da04b05a6 100644 --- a/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/InfinityService.py +++ b/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/InfinityService.py @@ -299,6 +299,22 @@ def Cleanup(self, request): """ pass + def Command(self, request): + """ + Parameters: + - request + + """ + pass + + def Flush(self, request): + """ + Parameters: + - request + + """ + pass + class Client(Iface): def __init__(self, iprot, oprot=None): @@ -1427,6 +1443,70 @@ def recv_Cleanup(self): return result.success raise TApplicationException(TApplicationException.MISSING_RESULT, "Cleanup failed: unknown result") + def Command(self, request): + """ + Parameters: + - request + + """ + self.send_Command(request) + return self.recv_Command() + + def send_Command(self, request): + self._oprot.writeMessageBegin('Command', TMessageType.CALL, self._seqid) + args = Command_args() + args.request = request + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_Command(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = Command_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "Command failed: unknown result") + + def Flush(self, request): + """ + Parameters: + - request + + """ + self.send_Flush(request) + return self.recv_Flush() + + def send_Flush(self, request): + self._oprot.writeMessageBegin('Flush', TMessageType.CALL, self._seqid) + args = Flush_args() + args.request = request + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_Flush(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = Flush_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "Flush failed: unknown result") + class Processor(Iface, TProcessor): def __init__(self, handler): @@ -1467,6 +1547,8 @@ def __init__(self, handler): self._processMap["AddColumns"] = Processor.process_AddColumns self._processMap["DropColumns"] = Processor.process_DropColumns self._processMap["Cleanup"] = Processor.process_Cleanup + self._processMap["Command"] = Processor.process_Command + self._processMap["Flush"] = Processor.process_Flush self._on_message_begin = None def on_message_begin(self, func): @@ -2294,6 +2376,52 @@ def process_Cleanup(self, seqid, iprot, oprot): oprot.writeMessageEnd() oprot.trans.flush() + def process_Command(self, seqid, iprot, oprot): + args = Command_args() + args.read(iprot) + iprot.readMessageEnd() + result = Command_result() + try: + result.success = self._handler.Command(args.request) + msg_type = TMessageType.REPLY + except TTransport.TTransportException: + raise + except TApplicationException as ex: + logging.exception('TApplication exception in handler') + msg_type = TMessageType.EXCEPTION + result = ex + except Exception: + logging.exception('Unexpected exception in handler') + msg_type = TMessageType.EXCEPTION + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("Command", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + + def process_Flush(self, seqid, iprot, oprot): + args = Flush_args() + args.read(iprot) + iprot.readMessageEnd() + result = Flush_result() + try: + result.success = self._handler.Flush(args.request) + msg_type = TMessageType.REPLY + except TTransport.TTransportException: + raise + except TApplicationException as ex: + logging.exception('TApplication exception in handler') + msg_type = TMessageType.EXCEPTION + result = ex + except Exception: + logging.exception('Unexpected exception in handler') + msg_type = TMessageType.EXCEPTION + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("Flush", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + # HELPER FUNCTIONS AND STRUCTURES @@ -6670,5 +6798,255 @@ def __ne__(self, other): Cleanup_result.thrift_spec = ( (0, TType.STRUCT, 'success', [CommonResponse, None], None, ), # 0 ) + + +class Command_args(object): + """ + Attributes: + - request + + """ + + + def __init__(self, request=None,): + self.request = request + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec]) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.request = CommandRequest() + self.request.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec])) + return + oprot.writeStructBegin('Command_args') + if self.request is not None: + oprot.writeFieldBegin('request', TType.STRUCT, 1) + self.request.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) +all_structs.append(Command_args) +Command_args.thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'request', [CommandRequest, None], None, ), # 1 +) + + +class Command_result(object): + """ + Attributes: + - success + + """ + + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec]) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = CommonResponse() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec])) + return + oprot.writeStructBegin('Command_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) +all_structs.append(Command_result) +Command_result.thrift_spec = ( + (0, TType.STRUCT, 'success', [CommonResponse, None], None, ), # 0 +) + + +class Flush_args(object): + """ + Attributes: + - request + + """ + + + def __init__(self, request=None,): + self.request = request + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec]) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.request = FlushRequest() + self.request.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec])) + return + oprot.writeStructBegin('Flush_args') + if self.request is not None: + oprot.writeFieldBegin('request', TType.STRUCT, 1) + self.request.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) +all_structs.append(Flush_args) +Flush_args.thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'request', [FlushRequest, None], None, ), # 1 +) + + +class Flush_result(object): + """ + Attributes: + - success + + """ + + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec]) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = CommonResponse() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec])) + return + oprot.writeStructBegin('Flush_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) +all_structs.append(Flush_result) +Flush_result.thrift_spec = ( + (0, TType.STRUCT, 'success', [CommonResponse, None], None, ), # 0 +) fix_spec(all_structs) del all_structs diff --git a/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/ttypes.py b/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/ttypes.py index 7457a93aa1..0cfab9cd87 100644 --- a/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/ttypes.py +++ b/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/ttypes.py @@ -8682,6 +8682,153 @@ def __eq__(self, other): def __ne__(self, other): return not (self == other) + + +class CommandRequest(object): + """ + Attributes: + - session_id + - command_type + - test_command_content + + """ + + + def __init__(self, session_id=None, command_type=None, test_command_content=None,): + self.session_id = session_id + self.command_type = command_type + self.test_command_content = test_command_content + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec]) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.session_id = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.command_type = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.test_command_content = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec])) + return + oprot.writeStructBegin('CommandRequest') + if self.session_id is not None: + oprot.writeFieldBegin('session_id', TType.I64, 1) + oprot.writeI64(self.session_id) + oprot.writeFieldEnd() + if self.command_type is not None: + oprot.writeFieldBegin('command_type', TType.STRING, 2) + oprot.writeString(self.command_type.encode('utf-8') if sys.version_info[0] == 2 else self.command_type) + oprot.writeFieldEnd() + if self.test_command_content is not None: + oprot.writeFieldBegin('test_command_content', TType.STRING, 3) + oprot.writeString(self.test_command_content.encode('utf-8') if sys.version_info[0] == 2 else self.test_command_content) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class FlushRequest(object): + """ + Attributes: + - session_id + - flush_type + + """ + + + def __init__(self, session_id=None, flush_type=None,): + self.session_id = session_id + self.flush_type = flush_type + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec]) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.session_id = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.flush_type = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec])) + return + oprot.writeStructBegin('FlushRequest') + if self.session_id is not None: + oprot.writeFieldBegin('session_id', TType.I64, 1) + oprot.writeI64(self.session_id) + oprot.writeFieldEnd() + if self.flush_type is not None: + oprot.writeFieldBegin('flush_type', TType.STRING, 2) + oprot.writeString(self.flush_type.encode('utf-8') if sys.version_info[0] == 2 else self.flush_type) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) all_structs.append(Property) Property.thrift_spec = ( None, # 0 @@ -9387,5 +9534,18 @@ def __ne__(self, other): (2, TType.STRING, 'error_msg', 'UTF8', None, ), # 2 (3, TType.STRING, 'node_role', 'UTF8', None, ), # 3 ) +all_structs.append(CommandRequest) +CommandRequest.thrift_spec = ( + None, # 0 + (1, TType.I64, 'session_id', None, None, ), # 1 + (2, TType.STRING, 'command_type', 'UTF8', None, ), # 2 + (3, TType.STRING, 'test_command_content', 'UTF8', None, ), # 3 +) +all_structs.append(FlushRequest) +FlushRequest.thrift_spec = ( + None, # 0 + (1, TType.I64, 'session_id', None, None, ), # 1 + (2, TType.STRING, 'flush_type', 'UTF8', None, ), # 2 +) fix_spec(all_structs) del all_structs diff --git a/python/restart_test/test_memidx.py b/python/restart_test/test_memidx.py index 7af8c36335..bc57cc1f12 100644 --- a/python/restart_test/test_memidx.py +++ b/python/restart_test/test_memidx.py @@ -4,8 +4,7 @@ from infinity import index import time import pathlib -from infinity.common import ConflictType -import os +from infinity.common import ConflictType, SparseVector class TestMemIdx: @@ -130,6 +129,8 @@ def check(): # # result: 13 def test_optimize_from_different_database(self, infinity_runner: InfinityRunner): + infinity_runner.clear() + config1 = "test/data/config/restart_test/test_memidx/1.toml" config2 = "test/data/config/restart_test/test_memidx/3.toml" uri = common_values.TEST_LOCAL_HOST @@ -278,3 +279,95 @@ def part2(infinity_obj): db_obj.drop_table(table_name) part2() + + def test_memidx_recover2(self, infinity_runner: InfinityRunner): + infinity_runner.clear() + + uri = common_values.TEST_LOCAL_HOST + data_dir = "/var/infinity/data" + catalog_dir = "/var/infinity/data/catalog" + + config1 = "test/data/config/restart_test/test_memidx/5.toml" + config2 = "test/data/config/restart_test/test_memidx/4.toml" + decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner) + decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner) + + table_name = "test_memidx4" + + @decorator1 + def part1(infinity_obj): + db_obj = infinity_obj.get_database("default_db") + db_obj.drop_table(table_name, conflict_type=ConflictType.Ignore) + dim = 100 + table_obj = db_obj.create_table( + table_name, + { + "c1": {"type": "int"}, + "c2": {"type": "int"}, + "c3": {"type": f"sparse,{dim},float,int"}, + }, + ) + table_obj.create_index( + "idx1", index.IndexInfo("c2", index.IndexType.Secondary) + ) + table_obj.create_index( + "idx2", + index.IndexInfo( + "c3", + index.IndexType.BMP, + {"BLOCK_SIZE": "8", "COMPRESS_TYPE": "compress"}, + ), + ) + infinity_obj.test_command("stuck dump by line bg_task for 3 second") + table_obj.insert( + [ + { + "c1": i, + "c2": i, + "c3": SparseVector(indices=[0], values=[1.0]), + } + for i in range(8192) + ] + ) + # dump by line submit here + infinity_obj.flush_delta() + for i in range(100): + table_obj.insert( + [ + { + "c1": 8192 + i, + "c2": 8192 + i, + "c3": SparseVector(indices=[1], values=[1.0]), + } + ] + ) + # wait for dump by line done + # time.sleep(4) + + part1() + + delta_paths = list(pathlib.Path(catalog_dir).rglob("*DELTA*")) + if len(delta_paths) < 1: + print("Warning: delta checkpoint not triggered. skip this test") + infinity_runner.clear() + return + + @decorator2 + def part2(infinity_obj): + db_obj = infinity_obj.get_database("default_db") + table_obj = db_obj.get_table(table_name) + data_dict, data_type_dict = ( + table_obj.output(["c1"]).filter("c2 >= 8192").to_result() + ) + assert data_dict["c1"] == [8192 + i for i in range(100)] + + data_dict, data_type_dict = ( + table_obj.output(["c1"]) + .match_sparse("c3", SparseVector(indices=[1], values=[1.0]), "ip", 100) + .to_result() + ) + assert data_dict["c1"] == [8192 + i for i in range(100)] + + part2() + + infinity_runner.clear() diff --git a/src/admin/admin_executor.cpp b/src/admin/admin_executor.cpp index 051cb1a8c3..4f29316156 100644 --- a/src/admin/admin_executor.cpp +++ b/src/admin/admin_executor.cpp @@ -604,14 +604,14 @@ Vector> AdminExecutor::GetAllCheckpointEntries(QueryContext return checkpoint_entries; } - i64 max_checkpoint_ts = 0; + TxnTimeStamp max_checkpoint_ts = 0; WalListIterator iterator(wal_list); while (iterator.HasNext()) { auto wal_entry_ptr = iterator.Next(); for (auto &entry_cmd : wal_entry_ptr->cmds_) { if (entry_cmd->GetType() == WalCommandType::CHECKPOINT) { WalCmdCheckpoint *checkpoint_cmd = static_cast(entry_cmd.get()); - max_checkpoint_ts = std::max(max_checkpoint_ts, checkpoint_cmd->max_commit_ts_); + max_checkpoint_ts = std::max(max_checkpoint_ts, TxnTimeStamp(checkpoint_cmd->max_commit_ts_)); } } } diff --git a/src/executor/explain_physical_plan.cpp b/src/executor/explain_physical_plan.cpp index 8296e18d1e..33918d195c 100644 --- a/src/executor/explain_physical_plan.cpp +++ b/src/executor/explain_physical_plan.cpp @@ -2147,6 +2147,9 @@ void ExplainPhysicalPlan::Explain(const PhysicalFlush *flush_node, SharedPtrflush_type()) { + case FlushType::kDelta: + flush_header_str += "DELTA (" + std::to_string(flush_node->node_id()) + ")"; + break; case FlushType::kData: flush_header_str += "DATA (" + std::to_string(flush_node->node_id()) + ")"; break; diff --git a/src/executor/operator/physical_command.cpp b/src/executor/operator/physical_command.cpp index aea223c5b8..ad341b8768 100644 --- a/src/executor/operator/physical_command.cpp +++ b/src/executor/operator/physical_command.cpp @@ -434,6 +434,17 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat } break; } + case CommandType::kTestCommand: { + auto *test_command = static_cast(command_info_.get()); + LOG_INFO(fmt::format("Execute test command: {}", test_command->command_content())); + if (test_command->command_content() == "stuck dump by line bg_task for 3 second") { + auto *compact_processor = query_context->storage()->compaction_processor(); + compact_processor->AddTestCommand(BGTaskType::kTestCommand, "stuck for 3 seconds"); + } else if (test_command->command_content() == "delta checkpoint") { + + } + break; + } default: { String error_message = fmt::format("Invalid command type: {}", command_info_->ToString()); UnrecoverableError(error_message); diff --git a/src/executor/operator/physical_flush.cpp b/src/executor/operator/physical_flush.cpp index 9fedcf9e02..0fcb1504bf 100644 --- a/src/executor/operator/physical_flush.cpp +++ b/src/executor/operator/physical_flush.cpp @@ -47,6 +47,7 @@ bool PhysicalFlush::Execute(QueryContext *query_context, OperatorState *operator } switch (flush_type_) { + case FlushType::kDelta: case FlushType::kData: { FlushData(query_context, operator_state); break; @@ -66,7 +67,9 @@ bool PhysicalFlush::Execute(QueryContext *query_context, OperatorState *operator void PhysicalFlush::FlushData(QueryContext *query_context, OperatorState *operator_state) { // full checkpoint here - auto force_ckp_task = MakeShared(query_context->GetTxn(), true /*is_full_checkpoint*/); + + bool is_full_checkpoint = flush_type_ == FlushType::kData; + auto force_ckp_task = MakeShared(query_context->GetTxn(), is_full_checkpoint); auto *wal_mgr = query_context->storage()->wal_manager(); if (!wal_mgr->TrySubmitCheckpointTask(force_ckp_task)) { LOG_TRACE(fmt::format("Skip {} checkpoint(manual) because there is already a full checkpoint task running.", "FULL")); diff --git a/src/main/infinity.cpp b/src/main/infinity.cpp index 89c37c5203..100200ed07 100644 --- a/src/main/infinity.cpp +++ b/src/main/infinity.cpp @@ -202,10 +202,17 @@ QueryResult Infinity::Query(const String &query_text) { return result; } -QueryResult Infinity::Flush() { +QueryResult Infinity::Flush(const String &flush_type) { UniquePtr query_context_ptr = GetQueryContext(); UniquePtr flush_statement = MakeUnique(); - flush_statement->type_ = FlushType::kData; + + if (flush_type == "data") { + flush_statement->type_ = FlushType::kData; + } else if (flush_type == "delta") { + flush_statement->type_ = FlushType::kDelta; + } else { + flush_statement->type_ = FlushType::kData; + } QueryResult result = query_context_ptr->QueryStatement(flush_statement.get()); return result; @@ -1081,6 +1088,22 @@ QueryResult Infinity::CompactTable(const String &db_name, const String &table_na return result; } +QueryResult Infinity::TestCommand(const String &command_content) { + auto query_context_ptr = MakeUnique(session_.get()); + query_context_ptr->Init(InfinityContext::instance().config(), + InfinityContext::instance().task_scheduler(), + InfinityContext::instance().storage(), + InfinityContext::instance().resource_manager(), + InfinityContext::instance().session_manager(), + InfinityContext::instance().persistence_manager()); + auto command_statement = MakeUnique(); + command_statement->command_info_ = MakeUnique(command_content); + + QueryResult result = query_context_ptr->QueryStatement(command_statement.get()); + + return result; +} + QueryResult Infinity::AdminShowCatalogs() { auto query_context_ptr = MakeUnique(session_.get()); query_context_ptr->Init(InfinityContext::instance().config(), diff --git a/src/main/infinity.cppm b/src/main/infinity.cppm index 26ef04bb53..ccd248ee46 100644 --- a/src/main/infinity.cppm +++ b/src/main/infinity.cppm @@ -72,7 +72,7 @@ public: QueryResult ShowDatabase(const String &db_name); - QueryResult Flush(); + QueryResult Flush(const String &flush_type = ""); QueryResult SetVariableOrConfig(const String &name, bool value, SetScope scope); @@ -204,6 +204,8 @@ public: QueryResult ForceCheckpoint(); QueryResult CompactTable(const String &db_name, const String &table_name); + QueryResult TestCommand(const String &command_content); + // Admin interface QueryResult AdminShowCatalogs(); QueryResult AdminShowCatalog(i64 index); diff --git a/src/main/infinity_context.cpp b/src/main/infinity_context.cpp index b80cfa8297..76e25e51c9 100644 --- a/src/main/infinity_context.cpp +++ b/src/main/infinity_context.cpp @@ -73,12 +73,20 @@ void InfinityContext::Init(const SharedPtr &config_path, bool admin_flag session_mgr_ = MakeUnique(); + if (admin_flag) { + Status change_to_admin = ChangeServerRole(NodeRole::kAdmin); + if (!change_to_admin.ok()) { + UnrecoverableError(change_to_admin.message()); + return; + } + return; + } Status change_result = ChangeServerRole(NodeRole::kAdmin); if (!status.ok()) { UnrecoverableError(status.message()); return; } - if (admin_flag or config_->ServerMode() == "cluster") { + if (config_->ServerMode() == "cluster") { // Admin mode or cluster start phase return; } diff --git a/src/network/infinity_thrift/InfinityService.cpp b/src/network/infinity_thrift/InfinityService.cpp index f6e0f2902b..d3fa99f290 100644 --- a/src/network/infinity_thrift/InfinityService.cpp +++ b/src/network/infinity_thrift/InfinityService.cpp @@ -6553,6 +6553,380 @@ uint32_t InfinityService_Cleanup_presult::read(::apache::thrift::protocol::TProt return xfer; } + +InfinityService_Command_args::~InfinityService_Command_args() noexcept { +} + + +uint32_t InfinityService_Command_args::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->request.read(iprot); + this->__isset.request = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t InfinityService_Command_args::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("InfinityService_Command_args"); + + xfer += oprot->writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->request.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +InfinityService_Command_pargs::~InfinityService_Command_pargs() noexcept { +} + + +uint32_t InfinityService_Command_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("InfinityService_Command_pargs"); + + xfer += oprot->writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += (*(this->request)).write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +InfinityService_Command_result::~InfinityService_Command_result() noexcept { +} + + +uint32_t InfinityService_Command_result::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->success.read(iprot); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t InfinityService_Command_result::write(::apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("InfinityService_Command_result"); + + if (this->__isset.success) { + xfer += oprot->writeFieldBegin("success", ::apache::thrift::protocol::T_STRUCT, 0); + xfer += this->success.write(oprot); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +InfinityService_Command_presult::~InfinityService_Command_presult() noexcept { +} + + +uint32_t InfinityService_Command_presult::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += (*(this->success)).read(iprot); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + + +InfinityService_Flush_args::~InfinityService_Flush_args() noexcept { +} + + +uint32_t InfinityService_Flush_args::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->request.read(iprot); + this->__isset.request = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t InfinityService_Flush_args::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("InfinityService_Flush_args"); + + xfer += oprot->writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->request.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +InfinityService_Flush_pargs::~InfinityService_Flush_pargs() noexcept { +} + + +uint32_t InfinityService_Flush_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("InfinityService_Flush_pargs"); + + xfer += oprot->writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += (*(this->request)).write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +InfinityService_Flush_result::~InfinityService_Flush_result() noexcept { +} + + +uint32_t InfinityService_Flush_result::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->success.read(iprot); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t InfinityService_Flush_result::write(::apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("InfinityService_Flush_result"); + + if (this->__isset.success) { + xfer += oprot->writeFieldBegin("success", ::apache::thrift::protocol::T_STRUCT, 0); + xfer += this->success.write(oprot); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +InfinityService_Flush_presult::~InfinityService_Flush_presult() noexcept { +} + + +uint32_t InfinityService_Flush_presult::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += (*(this->success)).read(iprot); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + void InfinityServiceClient::Connect(CommonResponse& _return, const ConnectRequest& request) { send_Connect(request); @@ -8449,12 +8823,128 @@ void InfinityServiceClient::recv_AddColumns(CommonResponse& _return) iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } - if (fname.compare("AddColumns") != 0) { + if (fname.compare("AddColumns") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + InfinityService_AddColumns_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + // _return pointer has now been filled + return; + } + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "AddColumns failed: unknown result"); +} + +void InfinityServiceClient::DropColumns(CommonResponse& _return, const DropColumnsRequest& request) +{ + send_DropColumns(request); + recv_DropColumns(_return); +} + +void InfinityServiceClient::send_DropColumns(const DropColumnsRequest& request) +{ + int32_t cseqid = 0; + oprot_->writeMessageBegin("DropColumns", ::apache::thrift::protocol::T_CALL, cseqid); + + InfinityService_DropColumns_pargs args; + args.request = &request; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); +} + +void InfinityServiceClient::recv_DropColumns(CommonResponse& _return) +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("DropColumns") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + InfinityService_DropColumns_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + // _return pointer has now been filled + return; + } + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "DropColumns failed: unknown result"); +} + +void InfinityServiceClient::Cleanup(CommonResponse& _return, const CommonRequest& request) +{ + send_Cleanup(request); + recv_Cleanup(_return); +} + +void InfinityServiceClient::send_Cleanup(const CommonRequest& request) +{ + int32_t cseqid = 0; + oprot_->writeMessageBegin("Cleanup", ::apache::thrift::protocol::T_CALL, cseqid); + + InfinityService_Cleanup_pargs args; + args.request = &request; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); +} + +void InfinityServiceClient::recv_Cleanup(CommonResponse& _return) +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("Cleanup") != 0) { iprot_->skip(::apache::thrift::protocol::T_STRUCT); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } - InfinityService_AddColumns_presult result; + InfinityService_Cleanup_presult result; result.success = &_return; result.read(iprot_); iprot_->readMessageEnd(); @@ -8464,21 +8954,21 @@ void InfinityServiceClient::recv_AddColumns(CommonResponse& _return) // _return pointer has now been filled return; } - throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "AddColumns failed: unknown result"); + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "Cleanup failed: unknown result"); } -void InfinityServiceClient::DropColumns(CommonResponse& _return, const DropColumnsRequest& request) +void InfinityServiceClient::Command(CommonResponse& _return, const CommandRequest& request) { - send_DropColumns(request); - recv_DropColumns(_return); + send_Command(request); + recv_Command(_return); } -void InfinityServiceClient::send_DropColumns(const DropColumnsRequest& request) +void InfinityServiceClient::send_Command(const CommandRequest& request) { int32_t cseqid = 0; - oprot_->writeMessageBegin("DropColumns", ::apache::thrift::protocol::T_CALL, cseqid); + oprot_->writeMessageBegin("Command", ::apache::thrift::protocol::T_CALL, cseqid); - InfinityService_DropColumns_pargs args; + InfinityService_Command_pargs args; args.request = &request; args.write(oprot_); @@ -8487,7 +8977,7 @@ void InfinityServiceClient::send_DropColumns(const DropColumnsRequest& request) oprot_->getTransport()->flush(); } -void InfinityServiceClient::recv_DropColumns(CommonResponse& _return) +void InfinityServiceClient::recv_Command(CommonResponse& _return) { int32_t rseqid = 0; @@ -8507,12 +8997,12 @@ void InfinityServiceClient::recv_DropColumns(CommonResponse& _return) iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } - if (fname.compare("DropColumns") != 0) { + if (fname.compare("Command") != 0) { iprot_->skip(::apache::thrift::protocol::T_STRUCT); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } - InfinityService_DropColumns_presult result; + InfinityService_Command_presult result; result.success = &_return; result.read(iprot_); iprot_->readMessageEnd(); @@ -8522,21 +9012,21 @@ void InfinityServiceClient::recv_DropColumns(CommonResponse& _return) // _return pointer has now been filled return; } - throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "DropColumns failed: unknown result"); + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "Command failed: unknown result"); } -void InfinityServiceClient::Cleanup(CommonResponse& _return, const CommonRequest& request) +void InfinityServiceClient::Flush(CommonResponse& _return, const FlushRequest& request) { - send_Cleanup(request); - recv_Cleanup(_return); + send_Flush(request); + recv_Flush(_return); } -void InfinityServiceClient::send_Cleanup(const CommonRequest& request) +void InfinityServiceClient::send_Flush(const FlushRequest& request) { int32_t cseqid = 0; - oprot_->writeMessageBegin("Cleanup", ::apache::thrift::protocol::T_CALL, cseqid); + oprot_->writeMessageBegin("Flush", ::apache::thrift::protocol::T_CALL, cseqid); - InfinityService_Cleanup_pargs args; + InfinityService_Flush_pargs args; args.request = &request; args.write(oprot_); @@ -8545,7 +9035,7 @@ void InfinityServiceClient::send_Cleanup(const CommonRequest& request) oprot_->getTransport()->flush(); } -void InfinityServiceClient::recv_Cleanup(CommonResponse& _return) +void InfinityServiceClient::recv_Flush(CommonResponse& _return) { int32_t rseqid = 0; @@ -8565,12 +9055,12 @@ void InfinityServiceClient::recv_Cleanup(CommonResponse& _return) iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } - if (fname.compare("Cleanup") != 0) { + if (fname.compare("Flush") != 0) { iprot_->skip(::apache::thrift::protocol::T_STRUCT); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } - InfinityService_Cleanup_presult result; + InfinityService_Flush_presult result; result.success = &_return; result.read(iprot_); iprot_->readMessageEnd(); @@ -8580,7 +9070,7 @@ void InfinityServiceClient::recv_Cleanup(CommonResponse& _return) // _return pointer has now been filled return; } - throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "Cleanup failed: unknown result"); + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "Flush failed: unknown result"); } bool InfinityServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext) { @@ -10492,6 +10982,114 @@ void InfinityServiceProcessor::process_Cleanup(int32_t seqid, ::apache::thrift:: } } +void InfinityServiceProcessor::process_Command(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) +{ + void* ctx = nullptr; + if (this->eventHandler_.get() != nullptr) { + ctx = this->eventHandler_->getContext("InfinityService.Command", callContext); + } + ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "InfinityService.Command"); + + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->preRead(ctx, "InfinityService.Command"); + } + + InfinityService_Command_args args; + args.read(iprot); + iprot->readMessageEnd(); + uint32_t bytes = iprot->getTransport()->readEnd(); + + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->postRead(ctx, "InfinityService.Command", bytes); + } + + InfinityService_Command_result result; + try { + iface_->Command(result.success, args.request); + result.__isset.success = true; + } catch (const std::exception& e) { + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->handlerError(ctx, "InfinityService.Command"); + } + + ::apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("Command", ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return; + } + + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->preWrite(ctx, "InfinityService.Command"); + } + + oprot->writeMessageBegin("Command", ::apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + bytes = oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->postWrite(ctx, "InfinityService.Command", bytes); + } +} + +void InfinityServiceProcessor::process_Flush(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) +{ + void* ctx = nullptr; + if (this->eventHandler_.get() != nullptr) { + ctx = this->eventHandler_->getContext("InfinityService.Flush", callContext); + } + ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "InfinityService.Flush"); + + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->preRead(ctx, "InfinityService.Flush"); + } + + InfinityService_Flush_args args; + args.read(iprot); + iprot->readMessageEnd(); + uint32_t bytes = iprot->getTransport()->readEnd(); + + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->postRead(ctx, "InfinityService.Flush", bytes); + } + + InfinityService_Flush_result result; + try { + iface_->Flush(result.success, args.request); + result.__isset.success = true; + } catch (const std::exception& e) { + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->handlerError(ctx, "InfinityService.Flush"); + } + + ::apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("Flush", ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return; + } + + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->preWrite(ctx, "InfinityService.Flush"); + } + + oprot->writeMessageBegin("Flush", ::apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + bytes = oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + + if (this->eventHandler_.get() != nullptr) { + this->eventHandler_->postWrite(ctx, "InfinityService.Flush", bytes); + } +} + ::std::shared_ptr< ::apache::thrift::TProcessor > InfinityServiceProcessorFactory::getProcessor(const ::apache::thrift::TConnectionInfo& connInfo) { ::apache::thrift::ReleaseHandler< InfinityServiceIfFactory > cleanup(handlerFactory_); ::std::shared_ptr< InfinityServiceIf > handler(handlerFactory_->getHandler(connInfo), cleanup); @@ -13439,5 +14037,173 @@ void InfinityServiceConcurrentClient::recv_Cleanup(CommonResponse& _return, cons } // end while(true) } +void InfinityServiceConcurrentClient::Command(CommonResponse& _return, const CommandRequest& request) +{ + int32_t seqid = send_Command(request); + recv_Command(_return, seqid); +} + +int32_t InfinityServiceConcurrentClient::send_Command(const CommandRequest& request) +{ + int32_t cseqid = this->sync_->generateSeqId(); + ::apache::thrift::async::TConcurrentSendSentry sentry(this->sync_.get()); + oprot_->writeMessageBegin("Command", ::apache::thrift::protocol::T_CALL, cseqid); + + InfinityService_Command_pargs args; + args.request = &request; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); + + sentry.commit(); + return cseqid; +} + +void InfinityServiceConcurrentClient::recv_Command(CommonResponse& _return, const int32_t seqid) +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + // the read mutex gets dropped and reacquired as part of waitForWork() + // The destructor of this sentry wakes up other clients + ::apache::thrift::async::TConcurrentRecvSentry sentry(this->sync_.get(), seqid); + + while(true) { + if(!this->sync_->getPending(fname, mtype, rseqid)) { + iprot_->readMessageBegin(fname, mtype, rseqid); + } + if(seqid == rseqid) { + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + sentry.commit(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("Command") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + // in a bad state, don't commit + using ::apache::thrift::protocol::TProtocolException; + throw TProtocolException(TProtocolException::INVALID_DATA); + } + InfinityService_Command_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + // _return pointer has now been filled + sentry.commit(); + return; + } + // in a bad state, don't commit + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "Command failed: unknown result"); + } + // seqid != rseqid + this->sync_->updatePending(fname, mtype, rseqid); + + // this will temporarily unlock the readMutex, and let other clients get work done + this->sync_->waitForWork(seqid); + } // end while(true) +} + +void InfinityServiceConcurrentClient::Flush(CommonResponse& _return, const FlushRequest& request) +{ + int32_t seqid = send_Flush(request); + recv_Flush(_return, seqid); +} + +int32_t InfinityServiceConcurrentClient::send_Flush(const FlushRequest& request) +{ + int32_t cseqid = this->sync_->generateSeqId(); + ::apache::thrift::async::TConcurrentSendSentry sentry(this->sync_.get()); + oprot_->writeMessageBegin("Flush", ::apache::thrift::protocol::T_CALL, cseqid); + + InfinityService_Flush_pargs args; + args.request = &request; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); + + sentry.commit(); + return cseqid; +} + +void InfinityServiceConcurrentClient::recv_Flush(CommonResponse& _return, const int32_t seqid) +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + // the read mutex gets dropped and reacquired as part of waitForWork() + // The destructor of this sentry wakes up other clients + ::apache::thrift::async::TConcurrentRecvSentry sentry(this->sync_.get(), seqid); + + while(true) { + if(!this->sync_->getPending(fname, mtype, rseqid)) { + iprot_->readMessageBegin(fname, mtype, rseqid); + } + if(seqid == rseqid) { + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + sentry.commit(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("Flush") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + // in a bad state, don't commit + using ::apache::thrift::protocol::TProtocolException; + throw TProtocolException(TProtocolException::INVALID_DATA); + } + InfinityService_Flush_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + // _return pointer has now been filled + sentry.commit(); + return; + } + // in a bad state, don't commit + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "Flush failed: unknown result"); + } + // seqid != rseqid + this->sync_->updatePending(fname, mtype, rseqid); + + // this will temporarily unlock the readMutex, and let other clients get work done + this->sync_->waitForWork(seqid); + } // end while(true) +} + } // namespace diff --git a/src/network/infinity_thrift/InfinityService.h b/src/network/infinity_thrift/InfinityService.h index 3c83a8af99..abcf51f50d 100644 --- a/src/network/infinity_thrift/InfinityService.h +++ b/src/network/infinity_thrift/InfinityService.h @@ -57,6 +57,8 @@ class InfinityServiceIf { virtual void AddColumns(CommonResponse& _return, const AddColumnsRequest& request) = 0; virtual void DropColumns(CommonResponse& _return, const DropColumnsRequest& request) = 0; virtual void Cleanup(CommonResponse& _return, const CommonRequest& request) = 0; + virtual void Command(CommonResponse& _return, const CommandRequest& request) = 0; + virtual void Flush(CommonResponse& _return, const FlushRequest& request) = 0; }; class InfinityServiceIfFactory { @@ -191,6 +193,12 @@ class InfinityServiceNull : virtual public InfinityServiceIf { void Cleanup(CommonResponse& /* _return */, const CommonRequest& /* request */) override { return; } + void Command(CommonResponse& /* _return */, const CommandRequest& /* request */) override { + return; + } + void Flush(CommonResponse& /* _return */, const FlushRequest& /* request */) override { + return; + } }; typedef struct _InfinityService_Connect_args__isset { @@ -3833,6 +3841,214 @@ class InfinityService_Cleanup_presult { }; +typedef struct _InfinityService_Command_args__isset { + _InfinityService_Command_args__isset() : request(false) {} + bool request :1; +} _InfinityService_Command_args__isset; + +class InfinityService_Command_args { + public: + + InfinityService_Command_args(const InfinityService_Command_args&); + InfinityService_Command_args& operator=(const InfinityService_Command_args&); + InfinityService_Command_args() noexcept { + } + + virtual ~InfinityService_Command_args() noexcept; + CommandRequest request; + + _InfinityService_Command_args__isset __isset; + + void __set_request(const CommandRequest& val); + + bool operator == (const InfinityService_Command_args & rhs) const + { + if (!(request == rhs.request)) + return false; + return true; + } + bool operator != (const InfinityService_Command_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const InfinityService_Command_args & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class InfinityService_Command_pargs { + public: + + + virtual ~InfinityService_Command_pargs() noexcept; + const CommandRequest* request; + + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _InfinityService_Command_result__isset { + _InfinityService_Command_result__isset() : success(false) {} + bool success :1; +} _InfinityService_Command_result__isset; + +class InfinityService_Command_result { + public: + + InfinityService_Command_result(const InfinityService_Command_result&); + InfinityService_Command_result& operator=(const InfinityService_Command_result&); + InfinityService_Command_result() noexcept { + } + + virtual ~InfinityService_Command_result() noexcept; + CommonResponse success; + + _InfinityService_Command_result__isset __isset; + + void __set_success(const CommonResponse& val); + + bool operator == (const InfinityService_Command_result & rhs) const + { + if (!(success == rhs.success)) + return false; + return true; + } + bool operator != (const InfinityService_Command_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const InfinityService_Command_result & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _InfinityService_Command_presult__isset { + _InfinityService_Command_presult__isset() : success(false) {} + bool success :1; +} _InfinityService_Command_presult__isset; + +class InfinityService_Command_presult { + public: + + + virtual ~InfinityService_Command_presult() noexcept; + CommonResponse* success; + + _InfinityService_Command_presult__isset __isset; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + +}; + +typedef struct _InfinityService_Flush_args__isset { + _InfinityService_Flush_args__isset() : request(false) {} + bool request :1; +} _InfinityService_Flush_args__isset; + +class InfinityService_Flush_args { + public: + + InfinityService_Flush_args(const InfinityService_Flush_args&); + InfinityService_Flush_args& operator=(const InfinityService_Flush_args&); + InfinityService_Flush_args() noexcept { + } + + virtual ~InfinityService_Flush_args() noexcept; + FlushRequest request; + + _InfinityService_Flush_args__isset __isset; + + void __set_request(const FlushRequest& val); + + bool operator == (const InfinityService_Flush_args & rhs) const + { + if (!(request == rhs.request)) + return false; + return true; + } + bool operator != (const InfinityService_Flush_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const InfinityService_Flush_args & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class InfinityService_Flush_pargs { + public: + + + virtual ~InfinityService_Flush_pargs() noexcept; + const FlushRequest* request; + + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _InfinityService_Flush_result__isset { + _InfinityService_Flush_result__isset() : success(false) {} + bool success :1; +} _InfinityService_Flush_result__isset; + +class InfinityService_Flush_result { + public: + + InfinityService_Flush_result(const InfinityService_Flush_result&); + InfinityService_Flush_result& operator=(const InfinityService_Flush_result&); + InfinityService_Flush_result() noexcept { + } + + virtual ~InfinityService_Flush_result() noexcept; + CommonResponse success; + + _InfinityService_Flush_result__isset __isset; + + void __set_success(const CommonResponse& val); + + bool operator == (const InfinityService_Flush_result & rhs) const + { + if (!(success == rhs.success)) + return false; + return true; + } + bool operator != (const InfinityService_Flush_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const InfinityService_Flush_result & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _InfinityService_Flush_presult__isset { + _InfinityService_Flush_presult__isset() : success(false) {} + bool success :1; +} _InfinityService_Flush_presult__isset; + +class InfinityService_Flush_presult { + public: + + + virtual ~InfinityService_Flush_presult() noexcept; + CommonResponse* success; + + _InfinityService_Flush_presult__isset __isset; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + +}; + class InfinityServiceClient : virtual public InfinityServiceIf { public: InfinityServiceClient(std::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) { @@ -3963,6 +4179,12 @@ class InfinityServiceClient : virtual public InfinityServiceIf { void Cleanup(CommonResponse& _return, const CommonRequest& request) override; void send_Cleanup(const CommonRequest& request); void recv_Cleanup(CommonResponse& _return); + void Command(CommonResponse& _return, const CommandRequest& request) override; + void send_Command(const CommandRequest& request); + void recv_Command(CommonResponse& _return); + void Flush(CommonResponse& _return, const FlushRequest& request) override; + void send_Flush(const FlushRequest& request); + void recv_Flush(CommonResponse& _return); protected: std::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_; std::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_; @@ -4013,6 +4235,8 @@ class InfinityServiceProcessor : public ::apache::thrift::TDispatchProcessor { void process_AddColumns(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_DropColumns(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_Cleanup(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); + void process_Command(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); + void process_Flush(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); public: InfinityServiceProcessor(::std::shared_ptr iface) : iface_(iface) { @@ -4051,6 +4275,8 @@ class InfinityServiceProcessor : public ::apache::thrift::TDispatchProcessor { processMap_["AddColumns"] = &InfinityServiceProcessor::process_AddColumns; processMap_["DropColumns"] = &InfinityServiceProcessor::process_DropColumns; processMap_["Cleanup"] = &InfinityServiceProcessor::process_Cleanup; + processMap_["Command"] = &InfinityServiceProcessor::process_Command; + processMap_["Flush"] = &InfinityServiceProcessor::process_Flush; } virtual ~InfinityServiceProcessor() {} @@ -4429,6 +4655,26 @@ class InfinityServiceMultiface : virtual public InfinityServiceIf { return; } + void Command(CommonResponse& _return, const CommandRequest& request) override { + size_t sz = ifaces_.size(); + size_t i = 0; + for (; i < (sz - 1); ++i) { + ifaces_[i]->Command(_return, request); + } + ifaces_[i]->Command(_return, request); + return; + } + + void Flush(CommonResponse& _return, const FlushRequest& request) override { + size_t sz = ifaces_.size(); + size_t i = 0; + for (; i < (sz - 1); ++i) { + ifaces_[i]->Flush(_return, request); + } + ifaces_[i]->Flush(_return, request); + return; + } + }; // The 'concurrent' client is a thread safe client that correctly handles @@ -4566,6 +4812,12 @@ class InfinityServiceConcurrentClient : virtual public InfinityServiceIf { void Cleanup(CommonResponse& _return, const CommonRequest& request) override; int32_t send_Cleanup(const CommonRequest& request); void recv_Cleanup(CommonResponse& _return, const int32_t seqid); + void Command(CommonResponse& _return, const CommandRequest& request) override; + int32_t send_Command(const CommandRequest& request); + void recv_Command(CommonResponse& _return, const int32_t seqid); + void Flush(CommonResponse& _return, const FlushRequest& request) override; + int32_t send_Flush(const FlushRequest& request); + void recv_Flush(CommonResponse& _return, const int32_t seqid); protected: std::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_; std::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_; diff --git a/src/network/infinity_thrift/infinity_types.cpp b/src/network/infinity_thrift/infinity_types.cpp index c5181197a1..503b014ba3 100644 --- a/src/network/infinity_thrift/infinity_types.cpp +++ b/src/network/infinity_thrift/infinity_types.cpp @@ -14877,4 +14877,248 @@ void ShowCurrentNodeResponse::printTo(std::ostream& out) const { out << ")"; } + +CommandRequest::~CommandRequest() noexcept { +} + + +void CommandRequest::__set_session_id(const int64_t val) { + this->session_id = val; +} + +void CommandRequest::__set_command_type(const std::string& val) { + this->command_type = val; +} + +void CommandRequest::__set_test_command_content(const std::string& val) { + this->test_command_content = val; +} +std::ostream& operator<<(std::ostream& out, const CommandRequest& obj) +{ + obj.printTo(out); + return out; +} + + +uint32_t CommandRequest::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->session_id); + this->__isset.session_id = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->command_type); + this->__isset.command_type = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->test_command_content); + this->__isset.test_command_content = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t CommandRequest::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("CommandRequest"); + + xfer += oprot->writeFieldBegin("session_id", ::apache::thrift::protocol::T_I64, 1); + xfer += oprot->writeI64(this->session_id); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("command_type", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->command_type); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("test_command_content", ::apache::thrift::protocol::T_STRING, 3); + xfer += oprot->writeString(this->test_command_content); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(CommandRequest &a, CommandRequest &b) { + using ::std::swap; + swap(a.session_id, b.session_id); + swap(a.command_type, b.command_type); + swap(a.test_command_content, b.test_command_content); + swap(a.__isset, b.__isset); +} + +CommandRequest::CommandRequest(const CommandRequest& other540) { + session_id = other540.session_id; + command_type = other540.command_type; + test_command_content = other540.test_command_content; + __isset = other540.__isset; +} +CommandRequest& CommandRequest::operator=(const CommandRequest& other541) { + session_id = other541.session_id; + command_type = other541.command_type; + test_command_content = other541.test_command_content; + __isset = other541.__isset; + return *this; +} +void CommandRequest::printTo(std::ostream& out) const { + using ::apache::thrift::to_string; + out << "CommandRequest("; + out << "session_id=" << to_string(session_id); + out << ", " << "command_type=" << to_string(command_type); + out << ", " << "test_command_content=" << to_string(test_command_content); + out << ")"; +} + + +FlushRequest::~FlushRequest() noexcept { +} + + +void FlushRequest::__set_session_id(const int64_t val) { + this->session_id = val; +} + +void FlushRequest::__set_flush_type(const std::string& val) { + this->flush_type = val; +} +std::ostream& operator<<(std::ostream& out, const FlushRequest& obj) +{ + obj.printTo(out); + return out; +} + + +uint32_t FlushRequest::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->session_id); + this->__isset.session_id = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->flush_type); + this->__isset.flush_type = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t FlushRequest::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("FlushRequest"); + + xfer += oprot->writeFieldBegin("session_id", ::apache::thrift::protocol::T_I64, 1); + xfer += oprot->writeI64(this->session_id); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("flush_type", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->flush_type); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(FlushRequest &a, FlushRequest &b) { + using ::std::swap; + swap(a.session_id, b.session_id); + swap(a.flush_type, b.flush_type); + swap(a.__isset, b.__isset); +} + +FlushRequest::FlushRequest(const FlushRequest& other542) { + session_id = other542.session_id; + flush_type = other542.flush_type; + __isset = other542.__isset; +} +FlushRequest& FlushRequest::operator=(const FlushRequest& other543) { + session_id = other543.session_id; + flush_type = other543.flush_type; + __isset = other543.__isset; + return *this; +} +void FlushRequest::printTo(std::ostream& out) const { + using ::apache::thrift::to_string; + out << "FlushRequest("; + out << "session_id=" << to_string(session_id); + out << ", " << "flush_type=" << to_string(flush_type); + out << ")"; +} + } // namespace diff --git a/src/network/infinity_thrift/infinity_types.h b/src/network/infinity_thrift/infinity_types.h index 065e83f806..ef9059930c 100644 --- a/src/network/infinity_thrift/infinity_types.h +++ b/src/network/infinity_thrift/infinity_types.h @@ -412,6 +412,10 @@ class ShowCurrentNodeRequest; class ShowCurrentNodeResponse; +class CommandRequest; + +class FlushRequest; + typedef struct _Property__isset { _Property__isset() : key(false), value(false) {} bool key :1; @@ -5886,6 +5890,113 @@ void swap(ShowCurrentNodeResponse &a, ShowCurrentNodeResponse &b); std::ostream& operator<<(std::ostream& out, const ShowCurrentNodeResponse& obj); +typedef struct _CommandRequest__isset { + _CommandRequest__isset() : session_id(false), command_type(false), test_command_content(false) {} + bool session_id :1; + bool command_type :1; + bool test_command_content :1; +} _CommandRequest__isset; + +class CommandRequest : public virtual ::apache::thrift::TBase { + public: + + CommandRequest(const CommandRequest&); + CommandRequest& operator=(const CommandRequest&); + CommandRequest() noexcept + : session_id(0), + command_type(), + test_command_content() { + } + + virtual ~CommandRequest() noexcept; + int64_t session_id; + std::string command_type; + std::string test_command_content; + + _CommandRequest__isset __isset; + + void __set_session_id(const int64_t val); + + void __set_command_type(const std::string& val); + + void __set_test_command_content(const std::string& val); + + bool operator == (const CommandRequest & rhs) const + { + if (!(session_id == rhs.session_id)) + return false; + if (!(command_type == rhs.command_type)) + return false; + if (!(test_command_content == rhs.test_command_content)) + return false; + return true; + } + bool operator != (const CommandRequest &rhs) const { + return !(*this == rhs); + } + + bool operator < (const CommandRequest & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot) override; + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const override; + + virtual void printTo(std::ostream& out) const; +}; + +void swap(CommandRequest &a, CommandRequest &b); + +std::ostream& operator<<(std::ostream& out, const CommandRequest& obj); + +typedef struct _FlushRequest__isset { + _FlushRequest__isset() : session_id(false), flush_type(false) {} + bool session_id :1; + bool flush_type :1; +} _FlushRequest__isset; + +class FlushRequest : public virtual ::apache::thrift::TBase { + public: + + FlushRequest(const FlushRequest&); + FlushRequest& operator=(const FlushRequest&); + FlushRequest() noexcept + : session_id(0), + flush_type() { + } + + virtual ~FlushRequest() noexcept; + int64_t session_id; + std::string flush_type; + + _FlushRequest__isset __isset; + + void __set_session_id(const int64_t val); + + void __set_flush_type(const std::string& val); + + bool operator == (const FlushRequest & rhs) const + { + if (!(session_id == rhs.session_id)) + return false; + if (!(flush_type == rhs.flush_type)) + return false; + return true; + } + bool operator != (const FlushRequest &rhs) const { + return !(*this == rhs); + } + + bool operator < (const FlushRequest & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot) override; + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const override; + + virtual void printTo(std::ostream& out) const; +}; + +void swap(FlushRequest &a, FlushRequest &b); + +std::ostream& operator<<(std::ostream& out, const FlushRequest& obj); + } // namespace #endif diff --git a/src/network/infinity_thrift_service.cpp b/src/network/infinity_thrift_service.cpp index a9227c6938..05cc27ee6d 100644 --- a/src/network/infinity_thrift_service.cpp +++ b/src/network/infinity_thrift_service.cpp @@ -1725,6 +1725,34 @@ void InfinityThriftService::ShowCurrentNode(infinity_thrift_rpc::ShowCurrentNode } } +void InfinityThriftService::Command(infinity_thrift_rpc::CommonResponse &response, const infinity_thrift_rpc::CommandRequest &request) { + auto [infinity, infinity_status] = GetInfinityBySessionID(request.session_id); + if (!infinity_status.ok()) { + ProcessStatus(response, infinity_status); + return; + } + LOG_TRACE(fmt::format("THRIFT: Command Type: {}, Test Command Content: {}", request.command_type, request.test_command_content)); + if (request.command_type != "test_command") { + LOG_WARN(fmt::format("Not support command type: {}", request.command_type)); + return; + } + + QueryResult result = infinity->TestCommand(request.test_command_content); + ProcessQueryResult(response, result); +} + +void InfinityThriftService::Flush(infinity_thrift_rpc::CommonResponse &response, const infinity_thrift_rpc::FlushRequest &request) { + auto [infinity, infinity_status] = GetInfinityBySessionID(request.session_id); + if (!infinity_status.ok()) { + ProcessStatus(response, infinity_status); + return; + } + LOG_TRACE(fmt::format("THRIFT: Flush Type: {}", request.flush_type)); + + QueryResult result = infinity->Flush(request.flush_type); + ProcessQueryResult(response, result); +} + Tuple InfinityThriftService::GetInfinityBySessionID(i64 session_id) { std::lock_guard lock(infinity_session_map_mutex_); auto iter = infinity_session_map_.find(session_id); diff --git a/src/network/infinity_thrift_service.cppm b/src/network/infinity_thrift_service.cppm index 3754082710..d5e4937784 100644 --- a/src/network/infinity_thrift_service.cppm +++ b/src/network/infinity_thrift_service.cppm @@ -154,6 +154,10 @@ public: void ShowCurrentNode(infinity_thrift_rpc::ShowCurrentNodeResponse &response, const infinity_thrift_rpc::ShowCurrentNodeRequest &request) final; + void Command(infinity_thrift_rpc::CommonResponse &response, const infinity_thrift_rpc::CommandRequest &request) final; + + void Flush(infinity_thrift_rpc::CommonResponse &response, const infinity_thrift_rpc::FlushRequest &request) final; + private: Tuple GetInfinityBySessionID(i64 session_id); diff --git a/src/network/infinity_thrift_types.cppm b/src/network/infinity_thrift_types.cppm index 7ab7aa925c..751697cc33 100644 --- a/src/network/infinity_thrift_types.cppm +++ b/src/network/infinity_thrift_types.cppm @@ -57,6 +57,8 @@ export using infinity_thrift_rpc::OptimizeRequest; export using infinity_thrift_rpc::AddColumnsRequest; export using infinity_thrift_rpc::DropColumnsRequest; export using infinity_thrift_rpc::ShowCurrentNodeRequest; +export using infinity_thrift_rpc::CommandRequest; +export using infinity_thrift_rpc::FlushRequest; export using infinity_thrift_rpc::CommonResponse; export using infinity_thrift_rpc::DeleteResponse; export using infinity_thrift_rpc::SelectResponse; diff --git a/src/parser/statement/command_statement.cpp b/src/parser/statement/command_statement.cpp index ead1711303..74e758383b 100644 --- a/src/parser/statement/command_statement.cpp +++ b/src/parser/statement/command_statement.cpp @@ -24,5 +24,6 @@ std::string CommandStatement::ToString() const { return command_info_->ToString( std::string LockCmd::ToString() const { return "Lock Command"; } std::string UnlockCmd::ToString() const { return "Unlock Command"; } std::string CleanupCmd::ToString() const { return "Cleanup Command"; } +std::string TestCmd::ToString() const { return "Test Command: " + command_content_; } } // namespace infinity \ No newline at end of file diff --git a/src/parser/statement/command_statement.cppm b/src/parser/statement/command_statement.cppm index bf65797c6b..366d8efd0f 100644 --- a/src/parser/statement/command_statement.cppm +++ b/src/parser/statement/command_statement.cppm @@ -33,4 +33,5 @@ export using infinity::ExportCmd; export using infinity::LockCmd; export using infinity::UnlockCmd; export using infinity::CleanupCmd; +export using infinity::TestCmd; } diff --git a/src/parser/statement/command_statement.h b/src/parser/statement/command_statement.h index 49fa8893a4..4c8d629532 100644 --- a/src/parser/statement/command_statement.h +++ b/src/parser/statement/command_statement.h @@ -30,7 +30,8 @@ enum class CommandType { kCheckTable, kLockTable, kUnlockTable, - kCleanup + kCleanup, + kTestCommand, }; class CommandInfo { @@ -237,4 +238,16 @@ class CommandStatement final : public BaseStatement { [[nodiscard]] std::string ToString() const final; }; +class TestCmd final : public CommandInfo { +public: + TestCmd(std::string command_content) : CommandInfo(CommandType::kTestCommand), command_content_(command_content) {} + + [[nodiscard]] std::string ToString() const final; + + const std::string &command_content() { return command_content_; } + +private: + std::string command_content_{}; +}; + } // namespace infinity diff --git a/src/parser/statement/flush_statement.h b/src/parser/statement/flush_statement.h index f3f7251908..5ee4b3e61b 100644 --- a/src/parser/statement/flush_statement.h +++ b/src/parser/statement/flush_statement.h @@ -26,6 +26,7 @@ namespace infinity { enum class FlushType { + kDelta, kData, kLog, kBuffer, diff --git a/src/planner/explain_ast.cpp b/src/planner/explain_ast.cpp index f558f42ed2..949f3d0aac 100644 --- a/src/planner/explain_ast.cpp +++ b/src/planner/explain_ast.cpp @@ -767,6 +767,9 @@ Status ExplainAST::BuildShow(const ShowStatement *show_statement, SharedPtr>> &result, i64) { switch (flush_statement->type_) { + case FlushType::kDelta: + result->emplace_back(MakeShared("FLUSH DELTA")); + break; case FlushType::kData: result->emplace_back(MakeShared("FLUSH DATA")); break; diff --git a/src/planner/explain_logical_plan.cpp b/src/planner/explain_logical_plan.cpp index c6d35e3d6a..b9ba519a3b 100644 --- a/src/planner/explain_logical_plan.cpp +++ b/src/planner/explain_logical_plan.cpp @@ -2315,6 +2315,11 @@ Status ExplainLogicalPlan::Explain(const LogicalFlush *flush_node, SharedPtrflush_type()) { + case FlushType::kDelta: + flush_header_str += "DELTA ("; + flush_header_str += std::to_string(flush_node->node_id()); + flush_header_str += ")"; + break; case FlushType::kData: flush_header_str += "DATA ("; flush_header_str += std::to_string(flush_node->node_id()); diff --git a/src/planner/logical_planner.cpp b/src/planner/logical_planner.cpp index eb6eedcd6a..8f4d30f347 100644 --- a/src/planner/logical_planner.cpp +++ b/src/planner/logical_planner.cpp @@ -1315,6 +1315,11 @@ Status LogicalPlanner::BuildCommand(const CommandStatement *command_statement, S this->logical_plan_ = logical_command; break; } + case CommandType::kTestCommand: { + auto logical_command = MakeShared(bind_context_ptr->GetNewLogicalNodeId(), command_statement->command_info_); + this->logical_plan_ = logical_command; + break; + } default: { String error_message = "Invalid command type."; UnrecoverableError(error_message); @@ -1706,6 +1711,9 @@ Status LogicalPlanner::BuildShow(ShowStatement *statement, SharedPtr &bind_context_ptr) { switch (statement->type()) { + case FlushType::kDelta: { + return BuildFlushDelta(statement, bind_context_ptr); + } case FlushType::kData: { return BuildFlushData(statement, bind_context_ptr); } @@ -1719,6 +1727,12 @@ Status LogicalPlanner::BuildFlush(const FlushStatement *statement, SharedPtr &bind_context_ptr) { + SharedPtr logical_flush = MakeShared(bind_context_ptr->GetNewLogicalNodeId(), FlushType::kDelta); + this->logical_plan_ = logical_flush; + return Status::OK(); +} + Status LogicalPlanner::BuildFlushData(const FlushStatement *, SharedPtr &bind_context_ptr) { SharedPtr logical_flush = MakeShared(bind_context_ptr->GetNewLogicalNodeId(), FlushType::kData); this->logical_plan_ = logical_flush; diff --git a/src/planner/logical_planner.cppm b/src/planner/logical_planner.cppm index b0b156cf03..0b3037f2a6 100644 --- a/src/planner/logical_planner.cppm +++ b/src/planner/logical_planner.cppm @@ -128,6 +128,8 @@ public: // Flush Status BuildFlush(const FlushStatement *statement, SharedPtr &bind_context_ptr); + Status BuildFlushDelta(const FlushStatement *, SharedPtr &bind_context_ptr); + Status BuildFlushData(const FlushStatement *statement, SharedPtr &bind_context_ptr); Status BuildFlushLog(const FlushStatement *statement, SharedPtr &bind_context_ptr); diff --git a/src/planner/node/logical_command.cpp b/src/planner/node/logical_command.cpp index 160059a64c..d66a8c8263 100644 --- a/src/planner/node/logical_command.cpp +++ b/src/planner/node/logical_command.cpp @@ -118,6 +118,11 @@ String LogicalCommand::ToString(i64 &space) const { ss << String(space, ' ') << arrow_str << "Cleanup"; break; } + case CommandType::kTestCommand: { + auto *test_command_info = static_cast(command_info_.get()); + ss << String(space, ' ') << arrow_str << "Test command: " << test_command_info->command_content(); + break; + } case CommandType::kInvalid: { String error_message = "Invalid command type."; UnrecoverableError(error_message); diff --git a/src/planner/node/logical_flush.cpp b/src/planner/node/logical_flush.cpp index 3a50cd2109..e4876a0d2f 100644 --- a/src/planner/node/logical_flush.cpp +++ b/src/planner/node/logical_flush.cpp @@ -40,6 +40,10 @@ String LogicalFlush::ToString(i64 &space) const { } switch (flush_type_) { + case FlushType::kDelta: + ss << String(space, ' ') << "-> " + << "Flush Delta: "; + break; case FlushType::kData: ss << String(space, ' ') << "-> " << "Flush Data: "; diff --git a/src/storage/bg_task/bg_task.cpp b/src/storage/bg_task/bg_task.cpp index d8e59d21d0..78ea4f4ff8 100644 --- a/src/storage/bg_task/bg_task.cpp +++ b/src/storage/bg_task/bg_task.cpp @@ -44,4 +44,6 @@ DumpIndexBylineTask::DumpIndexBylineTask(SharedPtr db_name, : BGTask(BGTaskType::kDumpIndexByline, true), db_name_(db_name), table_name_(table_name), index_name_(index_name), segment_id_(segment_id), dumped_chunk_(dumped_chunk) {} +TestCommandTask::TestCommandTask(String command_content) : BGTask(BGTaskType::kTestCommand, true), command_content_(std::move(command_content)) {} + } // namespace infinity \ No newline at end of file diff --git a/src/storage/bg_task/bg_task.cppm b/src/storage/bg_task/bg_task.cppm index d1cbe51b0c..b9eb8e568e 100644 --- a/src/storage/bg_task/bg_task.cppm +++ b/src/storage/bg_task/bg_task.cppm @@ -37,6 +37,7 @@ export enum class BGTaskType { kUpdateSegmentBloomFilterData, // Not used kDumpIndex, kDumpIndexByline, + kTestCommand, kInvalid }; @@ -208,4 +209,16 @@ public: SharedPtr dumped_chunk_; }; +export class TestCommandTask final : public BGTask { +public: + TestCommandTask(String command_content); + + ~TestCommandTask() override = default; + + String ToString() const override { return "TestCommandTask"; } + +public: + String command_content_{}; +}; + } // namespace infinity diff --git a/src/storage/compaction_process.cpp b/src/storage/compaction_process.cpp index 8d3892457c..11b91ccffe 100644 --- a/src/storage/compaction_process.cpp +++ b/src/storage/compaction_process.cpp @@ -213,9 +213,6 @@ void CompactionProcessor::DoDumpByline(DumpIndexBylineTask *dump_task) { if (dumped_chunk->deprecate_ts_ != UNCOMMIT_TS) { RecoverableError(Status::TxnRollback(txn->TxnID(), fmt::format("Dumped chunk {} is deleted.", dumped_chunk->encode()))); } - auto *table_entry = table_index_entry->table_index_meta()->GetTableEntry(); - TxnTableStore *txn_table_store = txn->GetTxnTableStore(table_entry); - txn_table_store->AddChunkIndexStore(table_index_entry, dumped_chunk); SharedPtr segment_index_entry; bool created = table_index_entry->GetOrCreateSegment(dump_task->segment_id_, txn, segment_index_entry); @@ -287,6 +284,12 @@ void CompactionProcessor::Process() { UnrecoverableError("Uninitialized storage mode"); } if (storage_mode == StorageMode::kWritable) { + if (auto cmd = test_commander_.Check(BGTaskType::kDumpIndexByline)) { + if (cmd.value() == "stuck for 3 seconds") { + LOG_INFO("Compact process stuck for 3 seconds"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } auto dump_task = static_cast(bg_task.get()); LOG_DEBUG(dump_task->ToString()); // Trigger transaction to save the mem index diff --git a/src/storage/compaction_process.cppm b/src/storage/compaction_process.cppm index 550725fad6..37ba07ada0 100644 --- a/src/storage/compaction_process.cppm +++ b/src/storage/compaction_process.cppm @@ -28,6 +28,29 @@ class Catalog; class TxnManager; class SessionManager; +class TestCommander { +public: + void Add(BGTaskType type, const String &command) { + std::lock_guard lock(mtx_); + test_commands_[type] = command; + } + + Optional Check(BGTaskType type) { + std::lock_guard lock(mtx_); + auto iter = test_commands_.find(type); + if (iter != test_commands_.end()) { + String res = iter->second; + test_commands_.erase(iter); + return res; + } + return None; + } + +private: + HashMap test_commands_; + std::mutex mtx_; +}; + export class CompactionProcessor { public: CompactionProcessor(Catalog *catalog, TxnManager *txn_mgr); @@ -48,6 +71,8 @@ public: bool rollback, Optional> mid_func = None); // false unit test + void AddTestCommand(BGTaskType type, const String &command) { test_commander_.Add(type, command); } + private: Vector, Txn *>> ScanForCompact(Txn *scan_txn); @@ -69,6 +94,8 @@ private: SessionManager *session_mgr_{}; Atomic task_count_{}; + + TestCommander test_commander_; }; } // namespace infinity \ No newline at end of file diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index a72e9db1f3..c1ed1ffe6a 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -1057,7 +1057,9 @@ SharedPtr SegmentIndexEntry::AddChunkIndexEntryReplayWal(ChunkI return entry->chunk_id_ < id; }); if (iter != chunk_index_entries_.end() && (*iter)->chunk_id_ == chunk_id) { - UnrecoverableError(fmt::format("Chunk ID: {} already exists in segment: {}", chunk_id, segment_id_)); + // Dump wal is after delta catalog, which is possible + LOG_WARN(fmt::format("Chunk ID: {} already exists in segment: {}", chunk_id, segment_id_)); + return *iter; } chunk_index_entries_.insert(iter, chunk_index_entry); ChunkID old_next_chunk_id = next_chunk_id_; diff --git a/src/storage/meta/entry/table_entry.cpp b/src/storage/meta/entry/table_entry.cpp index 4cd4ad3119..b76705abb6 100644 --- a/src/storage/meta/entry/table_entry.cpp +++ b/src/storage/meta/entry/table_entry.cpp @@ -774,6 +774,9 @@ void TableEntry::MemIndexInsertInner(TableIndexEntry *table_index_entry, Txn *tx if (chunk_index_entry.get() != nullptr) { chunk_index_entry->Commit(txn->CommitTS()); + TxnTableStore *txn_table_store = txn->GetTxnTableStore(this); + txn_table_store->AddChunkIndexStore(table_index_entry, chunk_index_entry.get()); + auto *compaction_process = InfinityContext::instance().storage()->compaction_processor(); compaction_process->Submit( diff --git a/test/data/config/restart_test/test_memidx/5.toml b/test/data/config/restart_test/test_memidx/5.toml new file mode 100644 index 0000000000..5332da93fe --- /dev/null +++ b/test/data/config/restart_test/test_memidx/5.toml @@ -0,0 +1,23 @@ +[general] +version = "0.5.0" +time_zone = "utc-8" + +[network] +[log] +log_to_stdout = true +log_level = "trace" + +[storage] +persistence_dir = "" +optimize_interval = "0s" +cleanup_interval = "0s" +compact_interval = "0s" +mem_index_capacity = 8192 + +[buffer] +memindex_memory_quota = "100MB" # big enough to hold test data +[wal] +delta_checkpoint_interval = "0s" +full_checkpoint_interval = "0s" + +[resource] diff --git a/thrift/infinity.thrift b/thrift/infinity.thrift index eb29d8bbd8..3b49c54754 100644 --- a/thrift/infinity.thrift +++ b/thrift/infinity.thrift @@ -733,6 +733,17 @@ struct ShowCurrentNodeResponse { 3: string node_role } +struct CommandRequest { +1: i64 session_id +2: string command_type, +3: string test_command_content +} + +struct FlushRequest { +1: i64 session_id +2: string flush_type, +} + // Service service InfinityService { CommonResponse Connect(1:ConnectRequest request), @@ -782,4 +793,8 @@ CommonResponse DropColumns(1:DropColumnsRequest request), CommonResponse Cleanup(1:CommonRequest request), +CommonResponse Command(1: CommandRequest request), + +CommonResponse Flush(1: FlushRequest request), + }