From 638624917892d6ac6e690696a0ac64f8929a4e7c Mon Sep 17 00:00:00 2001 From: vsian Date: Sun, 29 Sep 2024 09:47:48 +0800 Subject: [PATCH] Time type for pysdk and HTTP (#1927) ### What problem does this PR solve? Add create table, insert, select support for date, time, datetime type in infinity_sdk, infinity_embedded, and HTTP. ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Test cases - [x] Python SDK impacted, Need to update PyPI --- .../infinity_embedded/local_infinity/types.py | 56 +++++++++++++ .../infinity_embedded/local_infinity/utils.py | 18 +++- python/infinity_http.py | 19 ++++- .../infinity_thrift_rpc/ttypes.py | 57 +++++++++++-- .../infinity/remote_thrift/types.py | 54 ++++++++++++ .../infinity/remote_thrift/utils.py | 18 ++++ python/test_pysdk/common/common_data.py | 16 ++++ python/test_pysdk/test_select.py | 40 +++++++++ src/embedded_infinity/wrap_infinity.cpp | 43 +++++++++- src/function/cast/varchar_cast.cppm | 48 ++++++++++- .../infinity_thrift/infinity_types.cpp | 40 +++++++-- src/network/infinity_thrift/infinity_types.h | 21 ++++- src/network/infinity_thrift_service.cpp | 84 +++++++++++++++++++ src/network/infinity_thrift_service.cppm | 3 + thrift/infinity.thrift | 15 ++++ 15 files changed, 512 insertions(+), 20 deletions(-) diff --git a/python/infinity_embedded/local_infinity/types.py b/python/infinity_embedded/local_infinity/types.py index e07b0ae0b2..43886fd934 100644 --- a/python/infinity_embedded/local_infinity/types.py +++ b/python/infinity_embedded/local_infinity/types.py @@ -21,6 +21,7 @@ from infinity_embedded.common import VEC, SparseVector, InfinityException, DEFAULT_MATCH_VECTOR_TOPN from infinity_embedded.embedded_infinity_ext import * from infinity_embedded.errors import ErrorCode +from datetime import date, time, datetime, timedelta def logic_type_to_dtype(ttype: WrapDataType): match ttype.logical_type: @@ -54,6 +55,16 @@ def logic_type_to_dtype(ttype: WrapDataType): return object case LogicalType.kSparse: return object + case LogicalType.kDate: + return object + case LogicalType.kTime: + return object + case LogicalType.kDateTime: + return object + case LogicalType.kInterval: + return object + case LogicalType.kTimestamp: + return object case _: raise NotImplementedError(f"Unsupported type {ttype}") @@ -222,9 +233,54 @@ def column_vector_to_list(column_type, column_data_type, column_vectors) -> \ return parse_tensor_bytes(column_data_type, column_vector) case LogicalType.kTensorArray: return parse_tensorarray_bytes(column_data_type, column_vector) + case LogicalType.kDate: + return parse_date_bytes(column_vector) + case LogicalType.kTime: + return parse_time_bytes(column_vector) + case LogicalType.kDateTime: + return parse_datetime_bytes(column_vector) + case LogicalType.kTimestamp: + return parse_datetime_bytes(column_vector) + case LogicalType.kInterval: + return parse_interval_bytes(column_vector) + case _: raise NotImplementedError(f"Unsupported type {column_type}") +def parse_date_bytes(column_vector): + parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) + date_list = [] + epoch = date(1970, 1, 1) + for value in parsed_list: + date_list.append(epoch + timedelta(days = value)) + return date_list + +def parse_time_bytes(column_vector): + parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) + time_list = [] + for value in parsed_list: + hours = (value // 3600) % 24 + minutes = (value % 3600) // 60 + seconds = value % 60 + time_list.append(time(hour=hours, minute=minutes, second=seconds)) + return time_list + +def parse_datetime_bytes(column_vector): + parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) + datetime_list = [] + epoch = datetime(1970, 1, 1) + for i in range(0, len(parsed_list), 2): + if i + 1 < len(parsed_list): + datetime_list.append(epoch + timedelta(days = parsed_list[i], seconds = parsed_list[i + 1])); + return datetime_list + +def parse_interval_bytes(column_vector): + parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) + interval_list = [] + for value in parsed_list: + time_list.append(timedelta(seconds=value)) + return interval_list + def parse_sparse_bytes(column_data_type, column_vector): dimension = column_data_type.sparse_type.dimension element_type = column_data_type.sparse_type.element_type diff --git a/python/infinity_embedded/local_infinity/utils.py b/python/infinity_embedded/local_infinity/utils.py index 3a2c6010a4..b1e6848f2a 100644 --- a/python/infinity_embedded/local_infinity/utils.py +++ b/python/infinity_embedded/local_infinity/utils.py @@ -26,7 +26,7 @@ from infinity_embedded.utils import binary_exp_to_paser_exp from infinity_embedded.embedded_infinity_ext import WrapInExpr, WrapParsedExpr, WrapFunctionExpr, WrapColumnExpr, WrapConstantExpr, ParsedExprType, LiteralType from infinity_embedded.embedded_infinity_ext import WrapEmbeddingType, WrapColumnDef, WrapDataType, LogicalType, EmbeddingDataType, WrapSparseType, ConstraintType - +from datetime import date, time, datetime, timedelta def traverse_conditions(cons, fn=None): if isinstance(cons, exp.Binary): @@ -294,6 +294,16 @@ def get_local_constant_expr_from_python_value(value) -> WrapConstantExpr: case _: raise InfinityException(ErrorCode.INVALID_EXPRESSION, f"Invalid sparse vector value type: {type(next(iter(value.values())))}") + case datetime(): + constant_expression.literal_type = LiteralType.kDateTime + constant_expression.str_value = value.strftime("%Y-%m-%d %H:%M:%S") + case date(): + constant_expression.literal_type = LiteralType.kDate + constant_expression.str_value = value.strftime("%Y-%m-%d") + case time(): + constant_expression.literal_type = LiteralType.kTime + constant_expression.str_value = value.strftime("%H:%M:%S") + case _: raise InfinityException(ErrorCode.INVALID_EXPRESSION, f"Invalid constant type: {type(value)}") return constant_expression @@ -536,6 +546,12 @@ def get_ordinary_info(column_info, column_defs, column_name, index): proto_column_type.logical_type = LogicalType.kVarchar case "bool": proto_column_type.logical_type = LogicalType.kBoolean + case "date": + proto_column_type.logical_type = LogicalType.kDate + case "time": + proto_column_type.logical_type = LogicalType.kTime + case "datetime": + proto_column_type.logical_type = LogicalType.kDateTime case _: raise InfinityException(ErrorCode.INVALID_DATA_TYPE, f"Unknown datatype: {datatype}") proto_column_def.column_type = proto_column_type diff --git a/python/infinity_http.py b/python/infinity_http.py index d3696d3aca..17ed3af43f 100644 --- a/python/infinity_http.py +++ b/python/infinity_http.py @@ -15,6 +15,7 @@ import polars as pl import pyarrow as pa from infinity.table import ExplainType +from datetime import date, time, datetime class infinity_http: @@ -403,6 +404,12 @@ def insert(self,values=[]): value[key][idx] = value[key][idx].tolist() if isinstance(value[key], SparseVector): value[key] = value[key].to_dict() + if isinstance(value[key], datetime): + value[key] = value[key].strftime("%Y-%m-%d %H:%M:%S") + if isinstance(value[key], date): + value[key] = value[key].strftime("%Y-%m-%d") + if isinstance(value[key], time): + value[key] = value[key].strftime("%H:%M:%S") url = f"databases/{self.database_name}/tables/{self.table_name}/docs" h = self.set_up_header(["accept", "content-type"]) @@ -634,16 +641,26 @@ def to_df(self): for res in self.output_res: for k in res: + print(res[k]) if k not in df_dict: df_dict[k] = () tup = df_dict[k] if res[k].isdigit() or is_float(res[k]): new_tup = tup + (eval(res[k]), ) + elif is_time(res[k]): + new_tup = tup + (datetime.strptime(res[k], "%H:%M:%S").time(), ) + print("time!") + elif is_datetime(res[k]): + new_tup = tup + (datetime.strptime(res[k], "%Y-%m-%d %H:%M:%S"), ) + print("datetime!") + elif is_date(res[k]): + new_tup = tup + (datetime.strptime(res[k], "%Y-%m-%d").date(), ) + print("date!") elif is_list(res[k]): new_tup = tup + (ast.literal_eval(res[k]), ) elif is_sparse(res[k]):# sparse vector sparse_vec = str2sparse(res[k]) - new_tup = tup + (sparse_vec, ) + new_tup = tup + (sparse_vec, ) else: if res[k].lower() == 'true': res[k] = True diff --git a/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/ttypes.py b/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/ttypes.py index f1762a5ee7..3c12bdf524 100644 --- a/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/ttypes.py +++ b/python/infinity_sdk/infinity/remote_thrift/infinity_thrift_rpc/ttypes.py @@ -34,7 +34,12 @@ class LogicType(object): TensorArray = 14 Sparse = 15 MultiVector = 16 - Invalid = 17 + Date = 17 + Time = 18 + DateTime = 19 + Timestamp = 20 + Interval = 21 + Invalid = 22 _VALUES_TO_NAMES = { 0: "Boolean", @@ -54,7 +59,12 @@ class LogicType(object): 14: "TensorArray", 15: "Sparse", 16: "MultiVector", - 17: "Invalid", + 17: "Date", + 18: "Time", + 19: "DateTime", + 20: "Timestamp", + 21: "Interval", + 22: "Invalid", } _NAMES_TO_VALUES = { @@ -75,7 +85,12 @@ class LogicType(object): "TensorArray": 14, "Sparse": 15, "MultiVector": 16, - "Invalid": 17, + "Date": 17, + "Time": 18, + "DateTime": 19, + "Timestamp": 20, + "Interval": 21, + "Invalid": 22, } @@ -186,6 +201,11 @@ class LiteralType(object): DoubleTensorArray = 10 SparseIntegerArray = 11 SparseDoubleArray = 12 + Date = 13 + Time = 14 + Inteval = 15 + DateTime = 16 + Timestamp = 17 _VALUES_TO_NAMES = { 0: "Boolean", @@ -201,6 +221,11 @@ class LiteralType(object): 10: "DoubleTensorArray", 11: "SparseIntegerArray", 12: "SparseDoubleArray", + 13: "Date", + 14: "Time", + 15: "Inteval", + 16: "DateTime", + 17: "Timestamp", } _NAMES_TO_VALUES = { @@ -217,6 +242,11 @@ class LiteralType(object): "DoubleTensorArray": 10, "SparseIntegerArray": 11, "SparseDoubleArray": 12, + "Date": 13, + "Time": 14, + "Inteval": 15, + "DateTime": 16, + "Timestamp": 17, } @@ -285,7 +315,12 @@ class ColumnType(object): ColumnSparse = 13 ColumnMultiVector = 14 ColumnRowID = 15 - ColumnInvalid = 16 + ColumnDate = 16 + ColumnTime = 17 + ColumnDateTime = 18 + ColumnTimestamp = 19 + ColumnInterval = 20 + ColumnInvalid = 21 _VALUES_TO_NAMES = { 0: "ColumnBool", @@ -304,7 +339,12 @@ class ColumnType(object): 13: "ColumnSparse", 14: "ColumnMultiVector", 15: "ColumnRowID", - 16: "ColumnInvalid", + 16: "ColumnDate", + 17: "ColumnTime", + 18: "ColumnDateTime", + 19: "ColumnTimestamp", + 20: "ColumnInterval", + 21: "ColumnInvalid", } _NAMES_TO_VALUES = { @@ -324,7 +364,12 @@ class ColumnType(object): "ColumnSparse": 13, "ColumnMultiVector": 14, "ColumnRowID": 15, - "ColumnInvalid": 16, + "ColumnDate": 16, + "ColumnTime": 17, + "ColumnDateTime": 18, + "ColumnTimestamp": 19, + "ColumnInterval": 20, + "ColumnInvalid": 21, } diff --git a/python/infinity_sdk/infinity/remote_thrift/types.py b/python/infinity_sdk/infinity/remote_thrift/types.py index 1a75a951b5..132a1cd273 100644 --- a/python/infinity_sdk/infinity/remote_thrift/types.py +++ b/python/infinity_sdk/infinity/remote_thrift/types.py @@ -18,6 +18,7 @@ from infinity.remote_thrift.infinity_thrift_rpc.ttypes import * from collections import defaultdict from typing import Any, Tuple, Dict, List, Optional +from datetime import date, time, datetime, timedelta import polars as pl from numpy import dtype @@ -58,6 +59,16 @@ def logic_type_to_dtype(ttype: ttypes.DataType): return object case ttypes.LogicType.Sparse: return object + case ttypes.LogicType.Date: + return object + case ttypes.LogicType.Time: + return object + case ttypes.LogicType.DateTime: + return object + case ttypes.LogicType.Interval: + return object + case ttypes.LogicType.Timestamp: + return object case _: raise NotImplementedError(f"Unsupported type {ttype}") @@ -149,9 +160,52 @@ def column_vector_to_list(column_type: ttypes.ColumnType, column_data_type: ttyp return parse_tensorarray_bytes(column_data_type, column_vector) case ttypes.ColumnType.ColumnSparse: return parse_sparse_bytes(column_data_type, column_vector) + case ttypes.ColumnType.ColumnDate: + return parse_date_bytes(column_vector) + case ttypes.ColumnType.ColumnTime: + return parse_time_bytes(column_vector) + case ttypes.ColumnType.ColumnDateTime: + return parse_datetime_bytes(column_vector) + case ttypes.ColumnType.ColumnTimestamp: + return parse_datetime_bytes(column_vector) + case ttypes.ColumnType.ColumnInterval: + return parse_interval_bytes(column_vector) case _: raise NotImplementedError(f"Unsupported type {column_type}") +def parse_date_bytes(column_vector): + parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) + date_list = [] + epoch = date(1970, 1, 1) + for value in parsed_list: + date_list.append(epoch + timedelta(days = value)) + return date_list + +def parse_time_bytes(column_vector): + parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) + time_list = [] + for value in parsed_list: + hours = (value // 3600) % 24 + minutes = (value % 3600) // 60 + seconds = value % 60 + time_list.append(time(hour=hours, minute=minutes, second=seconds)) + return time_list + +def parse_datetime_bytes(column_vector): + parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) + datetime_list = [] + epoch = datetime(1970, 1, 1) + for i in range(0, len(parsed_list), 2): + if i + 1 < len(parsed_list): + datetime_list.append(epoch + timedelta(days = parsed_list[i], seconds = parsed_list[i + 1])); + return datetime_list + +def parse_interval_bytes(column_vector): + parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) + interval_list = [] + for value in parsed_list: + time_list.append(timedelta(seconds=value)) + return interval_list def parse_bytes(bytes_data): results = [] diff --git a/python/infinity_sdk/infinity/remote_thrift/utils.py b/python/infinity_sdk/infinity/remote_thrift/utils.py index 702dcac9b4..e4bf91177e 100644 --- a/python/infinity_sdk/infinity/remote_thrift/utils.py +++ b/python/infinity_sdk/infinity/remote_thrift/utils.py @@ -25,6 +25,7 @@ from infinity.utils import binary_exp_to_paser_exp from infinity.common import InfinityException, SparseVector from infinity.errors import ErrorCode +from datetime import date, time, datetime, timedelta def traverse_conditions(cons, fn=None) -> ttypes.ParsedExpr: @@ -315,6 +316,13 @@ def get_remote_constant_expr_from_python_value(value) -> ttypes.ConstantExpr: case _: raise InfinityException(ErrorCode.INVALID_EXPRESSION, f"Invalid sparse vector value type: {type(next(iter(value.values())))}") + case datetime(): + constant_expression = ttypes.ConstantExpr(literal_type=ttypes.LiteralType.DateTime, str_value=value.strftime("%Y-%m-%d %H:%M:%S")) + case date(): + constant_expression = ttypes.ConstantExpr(literal_type=ttypes.LiteralType.Date, str_value=value.strftime("%Y-%m-%d")) + case time(): + constant_expression = ttypes.ConstantExpr(literal_type=ttypes.LiteralType.Time, str_value=value.strftime("%H:%M:%S")) + case _: raise InfinityException(ErrorCode.INVALID_EXPRESSION, f"Invalid constant type: {type(value)}") return constant_expression @@ -549,6 +557,16 @@ def get_ordinary_info(column_info, column_defs, column_name, index): proto_column_type.physical_type = ttypes.VarcharType() case "bool": proto_column_type.logic_type = ttypes.LogicType.Boolean + case "date": + proto_column_type.logic_type = ttypes.LogicType.Date + case "time": + proto_column_type.logic_type = ttypes.LogicType.Time + case "datetime": + proto_column_type.logic_type = ttypes.LogicType.DateTime + case "timestamp": + proto_column_type.logic_type = ttypes.LogicType.Timestamp + case "interval": + proto_column_type.logic_type = ttypes.LogicType.Interval case _: raise InfinityException(ErrorCode.INVALID_DATA_TYPE, f"Unknown datatype: {datatype}") proto_column_def.data_type = proto_column_type diff --git a/python/test_pysdk/common/common_data.py b/python/test_pysdk/common/common_data.py index 44205b3140..727d161288 100644 --- a/python/test_pysdk/common/common_data.py +++ b/python/test_pysdk/common/common_data.py @@ -16,6 +16,7 @@ from enum import Enum from infinity.table import ExplainType from . import common_index +import re default_url = "http://localhost:23820/" @@ -155,6 +156,21 @@ def str2sparse(str_input): return sparce_vec +def is_date(str_input): + if re.match(r"^\b\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])\b$", str_input): + return True + return False + +def is_time(str_input): + if re.match(r"^([01]\d|2[0-3]):[0-5]\d:[0-5]\d$", str_input): + return True + return False + +def is_datetime(str_input): + if re.match(r"^\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01]) ([01]\d|2[0-3]):[0-5]\d:[0-5]\d$", str_input): + return True + return False + index_type_transfrom = { 1:"IVFFLAT", 2:"HNSW", diff --git a/python/test_pysdk/test_select.py b/python/test_pysdk/test_select.py index fb8f839889..76bdc928af 100644 --- a/python/test_pysdk/test_select.py +++ b/python/test_pysdk/test_select.py @@ -17,6 +17,7 @@ sys.path.insert(0, parent_dir) from infinity_http import infinity_http from common.utils import copy_data +from datetime import date, time, datetime @pytest.fixture(scope="class") def local_infinity(request): @@ -216,6 +217,45 @@ def test_select(self, suffix): res = db_obj.drop_table("test_select"+suffix, ConflictType.Error) assert res.error_code == ErrorCode.OK + def test_select_datetime(self, suffix): + """ + target: test table select apis + methods: + 1. create tables + - 'test_select_datetime' + - c1 date + - c2 time + - c3 datetime + 2. insert + - ('2024-09-23', '20:45:11', '2024-09-23 20:45:11') + 3. select + - select * from test_select_datetime + """ + + db_obj = self.infinity_obj.get_database("default_db") + db_obj.drop_table("test_select_datetime"+suffix, ConflictType.Ignore) + table_obj = db_obj.create_table( + "test_select_datetime"+suffix, { + "c1": {"type": "date"}, + "c2": {"type": "time"}, + "c3" : {"type": "datetime"}}, ConflictType.Error) + + assert table_obj is not None + + d = date(2024, 9, 23) + t = time(20, 45, 11) + dt = datetime(2024, 9, 23, 20, 45, 11) + res = table_obj.insert( + {"c1" : d, "c2" : t, "c3" : dt} + ) + assert res.error_code == ErrorCode.OK + + res = table_obj.output(["*"]).to_pl() + assert res.item(0, 0) == d and res.item(0, 1) == t and res.item(0, 2) == dt + + res = db_obj.drop_table("test_select_datetime"+suffix, ConflictType.Ignore) + assert res.error_code == ErrorCode.OK + def test_select_aggregate(self, suffix): """ target: test table select apis diff --git a/src/embedded_infinity/wrap_infinity.cpp b/src/embedded_infinity/wrap_infinity.cpp index c911cf1fd1..074b03d441 100644 --- a/src/embedded_infinity/wrap_infinity.cpp +++ b/src/embedded_infinity/wrap_infinity.cpp @@ -151,6 +151,26 @@ ParsedExpr *WrapConstantExpr::GetParsedExpr(Status &status) { } break; } + case LiteralType::kDate: { + constant_expr->date_value_ = strdup(str_value.c_str()); + break; + } + case LiteralType::kTime: { + constant_expr->date_value_ = strdup(str_value.c_str()); + break; + } + case LiteralType::kDateTime: { + constant_expr->date_value_ = strdup(str_value.c_str()); + break; + } + case LiteralType::kTimestamp: { + constant_expr->date_value_ = strdup(str_value.c_str()); + break; + } + case LiteralType::kInterval: { + constant_expr->date_value_ = strdup(str_value.c_str()); + break; + } default: { status = Status::InvalidConstantType(); return nullptr; @@ -1076,6 +1096,14 @@ void HandleRowIDType(ColumnField &output_column_field, SizeT row_count, const Sh output_column_field.column_type = column_vector->data_type()->type(); } +void HandleTimeRelatedTypes(ColumnField &output_column_field, SizeT row_count, const SharedPtr &column_vector) { + auto size = column_vector->data_type()->Size() * row_count; + String dst; + dst.resize(size); + std::memcpy(dst.data(), column_vector->data(), size); + output_column_field.column_vectors.emplace_back(dst.c_str(), dst.size()); +} + void ProcessColumnFieldType(ColumnField &output_column_field, SizeT row_count, const SharedPtr &column_vector) { switch (column_vector->data_type()->type()) { case LogicalType::kBoolean: { @@ -1122,6 +1150,14 @@ void ProcessColumnFieldType(ColumnField &output_column_field, SizeT row_count, c HandleRowIDType(output_column_field, row_count, column_vector); break; } + case LogicalType::kDate: + case LogicalType::kTime: + case LogicalType::kDateTime: + case LogicalType::kInterval: + case LogicalType::kTimestamp: { + HandleTimeRelatedTypes(output_column_field, row_count, column_vector); + break; + } default: { throw UnrecoverableException("Unsupported column type"); } @@ -1148,7 +1184,12 @@ void DataTypeToWrapDataType(WrapDataType &proto_data_type, const SharedPtrtype(); break; } diff --git a/src/function/cast/varchar_cast.cppm b/src/function/cast/varchar_cast.cppm index 37ac295e79..6e8d92c86b 100644 --- a/src/function/cast/varchar_cast.cppm +++ b/src/function/cast/varchar_cast.cppm @@ -82,13 +82,13 @@ export inline BoundCastFunc BindVarcharCast(const DataType &source, const DataTy UnrecoverableError(error_message); } case LogicalType::kDate: { - return BoundCastFunc(&ColumnVectorCast::TryCastColumnVector); + return BoundCastFunc(&ColumnVectorCast::TryCastVarlenColumnVector); } case LogicalType::kTime: { - return BoundCastFunc(&ColumnVectorCast::TryCastColumnVector); + return BoundCastFunc(&ColumnVectorCast::TryCastVarlenColumnVector); } case LogicalType::kDateTime: { - return BoundCastFunc(&ColumnVectorCast::TryCastColumnVector); + return BoundCastFunc(&ColumnVectorCast::TryCastVarlenColumnVector); } case LogicalType::kTimestamp: { return BoundCastFunc(&ColumnVectorCast::TryCastColumnVector); @@ -296,6 +296,48 @@ struct TryCastVarcharVector { } }; +// Cast VarcharT to DateT type +template <> +inline bool TryCastVarcharVector::Run(const VarcharT &source, ColumnVector* source_vector, DateT &target) { + Span data = source_vector->GetVarcharInner(source); + // Used in libc++ + String substr(data.data(), data.size()); + try { + target.FromString(substr); + } catch(const std::exception &e) { + return false; + } + return true; +} + +// Cast VarcharT to DateT type +template <> +inline bool TryCastVarcharVector::Run(const VarcharT &source, ColumnVector* source_vector, TimeT &target) { + Span data = source_vector->GetVarcharInner(source); + // Used in libc++ + String substr(data.data(), data.size()); + try { + target.FromString(substr); + } catch(const std::exception &e) { + return false; + } + return true; +} + +// Cast VarcharT to DateT type +template <> +inline bool TryCastVarcharVector::Run(const VarcharT &source, ColumnVector* source_vector, DateTimeT &target) { + Span data = source_vector->GetVarcharInner(source); + // Used in libc++ + String substr(data.data(), data.size()); + try { + target.FromString(substr); + } catch(const std::exception &e) { + return false; + } + return true; +} + // Cast VarcharT to BigIntT type template <> inline bool TryCastVarcharVector::Run(const VarcharT &source, ColumnVector* source_vector, i64 &target) { diff --git a/src/network/infinity_thrift/infinity_types.cpp b/src/network/infinity_thrift/infinity_types.cpp index 433e2eb18d..c66661d57b 100644 --- a/src/network/infinity_thrift/infinity_types.cpp +++ b/src/network/infinity_thrift/infinity_types.cpp @@ -31,6 +31,11 @@ int _kLogicTypeValues[] = { LogicType::TensorArray, LogicType::Sparse, LogicType::MultiVector, + LogicType::Date, + LogicType::Time, + LogicType::DateTime, + LogicType::Timestamp, + LogicType::Interval, LogicType::Invalid }; const char* _kLogicTypeNames[] = { @@ -51,9 +56,14 @@ const char* _kLogicTypeNames[] = { "TensorArray", "Sparse", "MultiVector", + "Date", + "Time", + "DateTime", + "Timestamp", + "Interval", "Invalid" }; -const std::map _LogicType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(18, _kLogicTypeValues, _kLogicTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); +const std::map _LogicType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(23, _kLogicTypeValues, _kLogicTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); std::ostream& operator<<(std::ostream& out, const LogicType::type& val) { std::map::const_iterator it = _LogicType_VALUES_TO_NAMES.find(val); @@ -225,7 +235,12 @@ int _kLiteralTypeValues[] = { LiteralType::IntegerTensorArray, LiteralType::DoubleTensorArray, LiteralType::SparseIntegerArray, - LiteralType::SparseDoubleArray + LiteralType::SparseDoubleArray, + LiteralType::Date, + LiteralType::Time, + LiteralType::Inteval, + LiteralType::DateTime, + LiteralType::Timestamp }; const char* _kLiteralTypeNames[] = { "Boolean", @@ -240,9 +255,14 @@ const char* _kLiteralTypeNames[] = { "IntegerTensorArray", "DoubleTensorArray", "SparseIntegerArray", - "SparseDoubleArray" + "SparseDoubleArray", + "Date", + "Time", + "Inteval", + "DateTime", + "Timestamp" }; -const std::map _LiteralType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(13, _kLiteralTypeValues, _kLiteralTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); +const std::map _LiteralType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(18, _kLiteralTypeValues, _kLiteralTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); std::ostream& operator<<(std::ostream& out, const LiteralType::type& val) { std::map::const_iterator it = _LiteralType_VALUES_TO_NAMES.find(val); @@ -350,6 +370,11 @@ int _kColumnTypeValues[] = { ColumnType::ColumnSparse, ColumnType::ColumnMultiVector, ColumnType::ColumnRowID, + ColumnType::ColumnDate, + ColumnType::ColumnTime, + ColumnType::ColumnDateTime, + ColumnType::ColumnTimestamp, + ColumnType::ColumnInterval, ColumnType::ColumnInvalid }; const char* _kColumnTypeNames[] = { @@ -369,9 +394,14 @@ const char* _kColumnTypeNames[] = { "ColumnSparse", "ColumnMultiVector", "ColumnRowID", + "ColumnDate", + "ColumnTime", + "ColumnDateTime", + "ColumnTimestamp", + "ColumnInterval", "ColumnInvalid" }; -const std::map _ColumnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(17, _kColumnTypeValues, _kColumnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); +const std::map _ColumnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(22, _kColumnTypeValues, _kColumnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); std::ostream& operator<<(std::ostream& out, const ColumnType::type& val) { std::map::const_iterator it = _ColumnType_VALUES_TO_NAMES.find(val); diff --git a/src/network/infinity_thrift/infinity_types.h b/src/network/infinity_thrift/infinity_types.h index 47896c4be4..005b2b8a7f 100644 --- a/src/network/infinity_thrift/infinity_types.h +++ b/src/network/infinity_thrift/infinity_types.h @@ -40,7 +40,12 @@ struct LogicType { TensorArray = 14, Sparse = 15, MultiVector = 16, - Invalid = 17 + Date = 17, + Time = 18, + DateTime = 19, + Timestamp = 20, + Interval = 21, + Invalid = 22 }; }; @@ -127,7 +132,12 @@ struct LiteralType { IntegerTensorArray = 9, DoubleTensorArray = 10, SparseIntegerArray = 11, - SparseDoubleArray = 12 + SparseDoubleArray = 12, + Date = 13, + Time = 14, + Inteval = 15, + DateTime = 16, + Timestamp = 17 }; }; @@ -187,7 +197,12 @@ struct ColumnType { ColumnSparse = 13, ColumnMultiVector = 14, ColumnRowID = 15, - ColumnInvalid = 16 + ColumnDate = 16, + ColumnTime = 17, + ColumnDateTime = 18, + ColumnTimestamp = 19, + ColumnInterval = 20, + ColumnInvalid = 21 }; }; diff --git a/src/network/infinity_thrift_service.cpp b/src/network/infinity_thrift_service.cpp index 18d7c3f544..c9e8223805 100644 --- a/src/network/infinity_thrift_service.cpp +++ b/src/network/infinity_thrift_service.cpp @@ -1627,6 +1627,21 @@ SharedPtr InfinityThriftService::GetColumnTypeFromProto(const infinity auto sparse_info = SparseInfo::Make(embedding_type, index_type, type.physical_type.sparse_type.dimension, SparseStoreType::kSort); return MakeShared(infinity::LogicalType::kSparse, sparse_info); } + case infinity_thrift_rpc::LogicType::Date: { + return MakeShared(infinity::LogicalType::kDate); + } + case infinity_thrift_rpc::LogicType::Time: { + return MakeShared(infinity::LogicalType::kTime); + } + case infinity_thrift_rpc::LogicType::DateTime: { + return MakeShared(infinity::LogicalType::kDateTime); + } + case infinity_thrift_rpc::LogicType::Timestamp: { + return MakeShared(infinity::LogicalType::kTimestamp); + } + case infinity_thrift_rpc::LogicType::Interval: { + return MakeShared(infinity::LogicalType::kInterval); + } case infinity_thrift_rpc::LogicType::Invalid: { return MakeShared(infinity::LogicalType::kInvalid); } @@ -1807,6 +1822,22 @@ ConstantExpr *InfinityThriftService::GetConstantFromProto(Status &status, const } return parsed_expr; } + case infinity_thrift_rpc::LiteralType::Date: { + auto parsed_expr = new ConstantExpr(LiteralType::kDate); + parsed_expr->date_value_ = strdup(expr.str_value.c_str()); + return parsed_expr; + } + case infinity_thrift_rpc::LiteralType::Time: { + auto parsed_expr = new ConstantExpr(LiteralType::kTime); + parsed_expr->date_value_ = strdup(expr.str_value.c_str()); + return parsed_expr; + } + case infinity_thrift_rpc::LiteralType::DateTime: { + auto parsed_expr = new ConstantExpr(LiteralType::kDateTime); + parsed_expr->date_value_ = strdup(expr.str_value.c_str()); + return parsed_expr; + } + default: { status = Status::InvalidConstantType(); return nullptr; @@ -2212,6 +2243,16 @@ infinity_thrift_rpc::ColumnType::type InfinityThriftService::DataTypeToProtoColu return infinity_thrift_rpc::ColumnType::ColumnRowID; case LogicalType::kSparse: return infinity_thrift_rpc::ColumnType::ColumnSparse; + case LogicalType::kDate: + return infinity_thrift_rpc::ColumnType::ColumnDate; + case LogicalType::kTime: + return infinity_thrift_rpc::ColumnType::ColumnTime; + case LogicalType::kDateTime: + return infinity_thrift_rpc::ColumnType::ColumnDateTime; + case LogicalType::kTimestamp: + return infinity_thrift_rpc::ColumnType::ColumnTimestamp; + case LogicalType::kInterval: + return infinity_thrift_rpc::ColumnType::ColumnInterval; default: { String error_message = fmt::format("Invalid logical data type: {}", data_type->ToString()); UnrecoverableError(error_message); @@ -2326,6 +2367,31 @@ UniquePtr InfinityThriftService::DataTypeToProtoD data_type_proto->__set_physical_type(physical_type); return data_type_proto; } + case LogicalType::kTime: { + auto data_type_proto = MakeUnique(); + data_type_proto->__set_logic_type(infinity_thrift_rpc::LogicType::Time); + return data_type_proto; + } + case LogicalType::kDate: { + auto data_type_proto = MakeUnique(); + data_type_proto->__set_logic_type(infinity_thrift_rpc::LogicType::Date); + return data_type_proto; + } + case LogicalType::kDateTime: { + auto data_type_proto = MakeUnique(); + data_type_proto->__set_logic_type(infinity_thrift_rpc::LogicType::DateTime); + return data_type_proto; + } + case LogicalType::kTimestamp: { + auto data_type_proto = MakeUnique(); + data_type_proto->__set_logic_type(infinity_thrift_rpc::LogicType::Timestamp); + return data_type_proto; + } + case LogicalType::kInterval: { + auto data_type_proto = MakeUnique(); + data_type_proto->__set_logic_type(infinity_thrift_rpc::LogicType::Interval); + return data_type_proto; + } default: { String error_message = fmt::format("Invalid logical data type: {}", data_type->ToString()); UnrecoverableError(error_message); @@ -2464,6 +2530,14 @@ Status InfinityThriftService::ProcessColumnFieldType(infinity_thrift_rpc::Column HandleRowIDType(output_column_field, row_count, column_vector); break; } + case LogicalType::kDate: + case LogicalType::kTime: + case LogicalType::kDateTime: + case LogicalType::kTimestamp: + case LogicalType::kInterval: { + HandleTimeRelatedTypes(output_column_field, row_count, column_vector); + break; + } default: { return Status::InvalidDataType(); } @@ -2471,6 +2545,16 @@ Status InfinityThriftService::ProcessColumnFieldType(infinity_thrift_rpc::Column return Status::OK(); } +void InfinityThriftService::HandleTimeRelatedTypes(infinity_thrift_rpc::ColumnField &output_column_field, + SizeT row_count, + const SharedPtr &column_vector) { + auto size = column_vector->data_type()->Size() * row_count; + String dst; + dst.resize(size); + std::memcpy(dst.data(), column_vector->data(), size); + output_column_field.column_vectors.emplace_back(std::move(dst)); +} + void InfinityThriftService::HandleBoolType(infinity_thrift_rpc::ColumnField &output_column_field, SizeT row_count, const SharedPtr &column_vector) { diff --git a/src/network/infinity_thrift_service.cppm b/src/network/infinity_thrift_service.cppm index 98c869ca29..09585fd43f 100644 --- a/src/network/infinity_thrift_service.cppm +++ b/src/network/infinity_thrift_service.cppm @@ -228,6 +228,9 @@ private: static void HandleRowIDType(infinity_thrift_rpc::ColumnField &output_column_field, SizeT row_count, const SharedPtr &column_vector); + static void + HandleTimeRelatedTypes(infinity_thrift_rpc::ColumnField &output_column_field, SizeT row_count, const SharedPtr &column_vector); + static void ProcessStatus(infinity_thrift_rpc::CommonResponse &response, const Status &status, const std::string_view error_header = ErrorMsgHeader); diff --git a/thrift/infinity.thrift b/thrift/infinity.thrift index 17248e3764..091d1f49f5 100644 --- a/thrift/infinity.thrift +++ b/thrift/infinity.thrift @@ -21,6 +21,11 @@ Tensor, TensorArray, Sparse, MultiVector, +Date, +Time, +DateTime, +Timestamp, +Interval, Invalid } @@ -110,6 +115,11 @@ IntegerTensorArray, DoubleTensorArray, SparseIntegerArray, SparseDoubleArray, +Date, +Time, +Inteval, +DateTime, +Timestamp, } union ParsedExprType { @@ -296,6 +306,11 @@ ColumnTensorArray, ColumnSparse, ColumnMultiVector, ColumnRowID, +ColumnDate, +ColumnTime, +ColumnDateTime, +ColumnTimestamp, +ColumnInterval, ColumnInvalid, }