diff --git a/proto/gen/rill/runtime/v1/runtime.swagger.yaml b/proto/gen/rill/runtime/v1/runtime.swagger.yaml index d5578f954af..ff4c3b2f797 100644 --- a/proto/gen/rill/runtime/v1/runtime.swagger.yaml +++ b/proto/gen/rill/runtime/v1/runtime.swagger.yaml @@ -3099,6 +3099,7 @@ definitions: - CODE_FLOAT32 - CODE_FLOAT64 - CODE_TIMESTAMP + - CODE_INTERVAL - CODE_DATE - CODE_TIME - CODE_STRING diff --git a/proto/gen/rill/runtime/v1/schema.pb.go b/proto/gen/rill/runtime/v1/schema.pb.go index 449b7fa0926..77c603b1cdb 100644 --- a/proto/gen/rill/runtime/v1/schema.pb.go +++ b/proto/gen/rill/runtime/v1/schema.pb.go @@ -41,6 +41,7 @@ const ( Type_CODE_FLOAT32 Type_Code = 12 Type_CODE_FLOAT64 Type_Code = 13 Type_CODE_TIMESTAMP Type_Code = 14 + Type_CODE_INTERVAL Type_Code = 27 Type_CODE_DATE Type_Code = 15 Type_CODE_TIME Type_Code = 16 Type_CODE_STRING Type_Code = 17 @@ -73,6 +74,7 @@ var ( 12: "CODE_FLOAT32", 13: "CODE_FLOAT64", 14: "CODE_TIMESTAMP", + 27: "CODE_INTERVAL", 15: "CODE_DATE", 16: "CODE_TIME", 17: "CODE_STRING", @@ -102,6 +104,7 @@ var ( "CODE_FLOAT32": 12, "CODE_FLOAT64": 13, "CODE_TIMESTAMP": 14, + "CODE_INTERVAL": 27, "CODE_DATE": 15, "CODE_TIME": 16, "CODE_STRING": 17, @@ -392,7 +395,7 @@ var file_rill_runtime_v1_schema_proto_rawDesc = []byte{ 0x0a, 0x1c, 0x72, 0x69, 0x6c, 0x6c, 0x2f, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x22, - 0xd6, 0x05, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x2e, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, + 0xe9, 0x05, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x2e, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1a, 0x2e, 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x2e, 0x43, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x6e, 0x75, 0x6c, 0x6c, @@ -408,7 +411,7 @@ var file_rill_runtime_v1_schema_proto_rawDesc = []byte{ 0x75, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x33, 0x0a, 0x08, 0x6d, 0x61, 0x70, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x54, - 0x79, 0x70, 0x65, 0x52, 0x07, 0x6d, 0x61, 0x70, 0x54, 0x79, 0x70, 0x65, 0x22, 0xc9, 0x03, 0x0a, + 0x79, 0x70, 0x65, 0x52, 0x07, 0x6d, 0x61, 0x70, 0x54, 0x79, 0x70, 0x65, 0x22, 0xdc, 0x03, 0x0a, 0x04, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x14, 0x0a, 0x10, 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x42, 0x4f, 0x4f, 0x4c, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, @@ -427,46 +430,47 @@ var file_rill_runtime_v1_schema_proto_rawDesc = []byte{ 0x10, 0x1a, 0x12, 0x10, 0x0a, 0x0c, 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x33, 0x32, 0x10, 0x0c, 0x12, 0x10, 0x0a, 0x0c, 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x36, 0x34, 0x10, 0x0d, 0x12, 0x12, 0x0a, 0x0e, 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x54, - 0x49, 0x4d, 0x45, 0x53, 0x54, 0x41, 0x4d, 0x50, 0x10, 0x0e, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, - 0x44, 0x45, 0x5f, 0x44, 0x41, 0x54, 0x45, 0x10, 0x0f, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, 0x44, - 0x45, 0x5f, 0x54, 0x49, 0x4d, 0x45, 0x10, 0x10, 0x12, 0x0f, 0x0a, 0x0b, 0x43, 0x4f, 0x44, 0x45, - 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x11, 0x12, 0x0e, 0x0a, 0x0a, 0x43, 0x4f, 0x44, - 0x45, 0x5f, 0x42, 0x59, 0x54, 0x45, 0x53, 0x10, 0x12, 0x12, 0x0e, 0x0a, 0x0a, 0x43, 0x4f, 0x44, - 0x45, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x13, 0x12, 0x0f, 0x0a, 0x0b, 0x43, 0x4f, 0x44, - 0x45, 0x5f, 0x53, 0x54, 0x52, 0x55, 0x43, 0x54, 0x10, 0x14, 0x12, 0x0c, 0x0a, 0x08, 0x43, 0x4f, - 0x44, 0x45, 0x5f, 0x4d, 0x41, 0x50, 0x10, 0x15, 0x12, 0x10, 0x0a, 0x0c, 0x43, 0x4f, 0x44, 0x45, - 0x5f, 0x44, 0x45, 0x43, 0x49, 0x4d, 0x41, 0x4c, 0x10, 0x16, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, - 0x44, 0x45, 0x5f, 0x4a, 0x53, 0x4f, 0x4e, 0x10, 0x17, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, 0x44, - 0x45, 0x5f, 0x55, 0x55, 0x49, 0x44, 0x10, 0x18, 0x22, 0x8f, 0x01, 0x0a, 0x0a, 0x53, 0x74, 0x72, - 0x75, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, - 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, - 0x64, 0x73, 0x1a, 0x46, 0x0a, 0x05, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, - 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, - 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, - 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x2e, - 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x71, 0x0a, 0x07, 0x4d, 0x61, - 0x70, 0x54, 0x79, 0x70, 0x65, 0x12, 0x30, 0x0a, 0x08, 0x6b, 0x65, 0x79, 0x5f, 0x74, 0x79, 0x70, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, - 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x07, - 0x6b, 0x65, 0x79, 0x54, 0x79, 0x70, 0x65, 0x12, 0x34, 0x0a, 0x0a, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x72, 0x69, - 0x6c, 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, - 0x70, 0x65, 0x52, 0x09, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x54, 0x79, 0x70, 0x65, 0x42, 0xbe, 0x01, - 0x0a, 0x13, 0x63, 0x6f, 0x6d, 0x2e, 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, - 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0b, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x72, 0x69, 0x6c, 0x6c, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x72, 0x69, 0x6c, 0x6c, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x72, 0x69, 0x6c, 0x6c, 0x2f, 0x72, 0x75, - 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x76, 0x31, 0x3b, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, - 0x76, 0x31, 0xa2, 0x02, 0x03, 0x52, 0x52, 0x58, 0xaa, 0x02, 0x0f, 0x52, 0x69, 0x6c, 0x6c, 0x2e, - 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0f, 0x52, 0x69, 0x6c, - 0x6c, 0x5c, 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x1b, 0x52, - 0x69, 0x6c, 0x6c, 0x5c, 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47, - 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x11, 0x52, 0x69, 0x6c, - 0x6c, 0x3a, 0x3a, 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x49, 0x4d, 0x45, 0x53, 0x54, 0x41, 0x4d, 0x50, 0x10, 0x0e, 0x12, 0x11, 0x0a, 0x0d, 0x43, 0x4f, + 0x44, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x56, 0x41, 0x4c, 0x10, 0x1b, 0x12, 0x0d, 0x0a, + 0x09, 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x44, 0x41, 0x54, 0x45, 0x10, 0x0f, 0x12, 0x0d, 0x0a, 0x09, + 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x54, 0x49, 0x4d, 0x45, 0x10, 0x10, 0x12, 0x0f, 0x0a, 0x0b, 0x43, + 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x11, 0x12, 0x0e, 0x0a, 0x0a, + 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x42, 0x59, 0x54, 0x45, 0x53, 0x10, 0x12, 0x12, 0x0e, 0x0a, 0x0a, + 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x13, 0x12, 0x0f, 0x0a, 0x0b, + 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x55, 0x43, 0x54, 0x10, 0x14, 0x12, 0x0c, 0x0a, + 0x08, 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x41, 0x50, 0x10, 0x15, 0x12, 0x10, 0x0a, 0x0c, 0x43, + 0x4f, 0x44, 0x45, 0x5f, 0x44, 0x45, 0x43, 0x49, 0x4d, 0x41, 0x4c, 0x10, 0x16, 0x12, 0x0d, 0x0a, + 0x09, 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x4a, 0x53, 0x4f, 0x4e, 0x10, 0x17, 0x12, 0x0d, 0x0a, 0x09, + 0x43, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x55, 0x49, 0x44, 0x10, 0x18, 0x22, 0x8f, 0x01, 0x0a, 0x0a, + 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x06, 0x66, 0x69, + 0x65, 0x6c, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x72, 0x69, 0x6c, + 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, + 0x75, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, + 0x69, 0x65, 0x6c, 0x64, 0x73, 0x1a, 0x46, 0x0a, 0x05, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x12, + 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x15, 0x2e, 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, + 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x71, 0x0a, + 0x07, 0x4d, 0x61, 0x70, 0x54, 0x79, 0x70, 0x65, 0x12, 0x30, 0x0a, 0x08, 0x6b, 0x65, 0x79, 0x5f, + 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x72, 0x69, 0x6c, + 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, + 0x65, 0x52, 0x07, 0x6b, 0x65, 0x79, 0x54, 0x79, 0x70, 0x65, 0x12, 0x34, 0x0a, 0x0a, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, + 0x2e, 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x09, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x54, 0x79, 0x70, 0x65, + 0x42, 0xbe, 0x01, 0x0a, 0x13, 0x63, 0x6f, 0x6d, 0x2e, 0x72, 0x69, 0x6c, 0x6c, 0x2e, 0x72, 0x75, + 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0b, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x69, 0x6c, 0x6c, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x72, 0x69, 0x6c, + 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x72, 0x69, 0x6c, 0x6c, + 0x2f, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x76, 0x31, 0x3b, 0x72, 0x75, 0x6e, 0x74, + 0x69, 0x6d, 0x65, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x52, 0x52, 0x58, 0xaa, 0x02, 0x0f, 0x52, 0x69, + 0x6c, 0x6c, 0x2e, 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0f, + 0x52, 0x69, 0x6c, 0x6c, 0x5c, 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x5c, 0x56, 0x31, 0xe2, + 0x02, 0x1b, 0x52, 0x69, 0x6c, 0x6c, 0x5c, 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x5c, 0x56, + 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x11, + 0x52, 0x69, 0x6c, 0x6c, 0x3a, 0x3a, 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x3a, 0x3a, 0x56, + 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/rill/runtime/v1/schema.proto b/proto/rill/runtime/v1/schema.proto index 3e3e0d04675..b602c8f8a3d 100644 --- a/proto/rill/runtime/v1/schema.proto +++ b/proto/rill/runtime/v1/schema.proto @@ -22,6 +22,7 @@ message Type { CODE_FLOAT32 = 12; CODE_FLOAT64 = 13; CODE_TIMESTAMP = 14; + CODE_INTERVAL = 27; CODE_DATE = 15; CODE_TIME = 16; CODE_STRING = 17; diff --git a/runtime/drivers/clickhouse/olap.go b/runtime/drivers/clickhouse/olap.go index 4394123e3f7..a861f003006 100644 --- a/runtime/drivers/clickhouse/olap.go +++ b/runtime/drivers/clickhouse/olap.go @@ -878,6 +878,8 @@ func databaseTypeToPB(dbt string, nullable bool) (*runtimev1.Type, error) { t.Code = runtimev1.Type_CODE_TIMESTAMP case "DATETIME64": t.Code = runtimev1.Type_CODE_TIMESTAMP + case "INTERVALNANOSECOND", "INTERVALMICROSECOND", "INTERVALMILLISECOND", "INTERVALSECOND", "INTERVALMINUTE", "INTERVALHOUR", "INTERVALDAY", "INTERVALWEEK", "INTERVALMONTH", "INTERVALQUARTER", "INTERVALYEAR": + t.Code = runtimev1.Type_CODE_INTERVAL case "JSON": t.Code = runtimev1.Type_CODE_JSON case "UUID": diff --git a/runtime/drivers/clickhouse/olap_test.go b/runtime/drivers/clickhouse/olap_test.go index 6221da6d3bb..f0023d47ea5 100644 --- a/runtime/drivers/clickhouse/olap_test.go +++ b/runtime/drivers/clickhouse/olap_test.go @@ -6,7 +6,9 @@ import ( "github.com/stretchr/testify/assert" "testing" + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" + "github.com/rilldata/rill/runtime/drivers/clickhouse" "github.com/rilldata/rill/runtime/pkg/activity" "github.com/rilldata/rill/runtime/storage" "github.com/rilldata/rill/runtime/testruntime" @@ -14,37 +16,32 @@ import ( "go.uber.org/zap" ) -func TestClickhouseCrudOps(t *testing.T) { - // t.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true") - if testing.Short() { - t.Skip("clickhouse: skipping test in short mode") - } - - dsn, cluster := testruntime.ClickhouseCluster(t) - t.Run("SingleHost", func(t *testing.T) { testClickhouseSingleHost(t, dsn) }) - t.Run("Cluster", func(t *testing.T) { testClickhouseCluster(t, dsn, cluster) }) -} +func TestClickhouseSingle(t *testing.T) { + cfg := testruntime.AcquireConnector(t, "clickhouse") -func testClickhouseSingleHost(t *testing.T, dsn string) { - conn, err := drivers.Open("clickhouse", "default", map[string]any{"dsn": dsn}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) + conn, err := drivers.Open("clickhouse", "default", cfg, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) require.NoError(t, err) defer conn.Close() prepareConn(t, conn) olap, ok := conn.AsOLAP("default") require.True(t, ok) - t.Run("RenameView", func(t *testing.T) { - testRenameView(t, olap) - }) + t.Run("RenameView", func(t *testing.T) { testRenameView(t, olap) }) t.Run("RenameTable", func(t *testing.T) { testRenameTable(t, olap) }) t.Run("CreateTableAsSelect", func(t *testing.T) { testCreateTableAsSelect(t, olap) }) t.Run("InsertTableAsSelect_WithAppend", func(t *testing.T) { testInsertTableAsSelect_WithAppend(t, olap) }) t.Run("InsertTableAsSelect_WithMerge", func(t *testing.T) { testInsertTableAsSelect_WithMerge(t, olap) }) t.Run("TestDictionary", func(t *testing.T) { testDictionary(t, olap) }) - + t.Run("TestIntervalType", func(t *testing.T) { testIntervalType(t, olap) }) } -func testClickhouseCluster(t *testing.T, dsn, cluster string) { +func TestClickhouseCluster(t *testing.T) { + if testing.Short() { + t.Skip("clickhouse: skipping test in short mode") + } + + dsn, cluster := testruntime.ClickhouseCluster(t) + conn, err := drivers.Open("clickhouse", "default", map[string]any{"dsn": dsn, "cluster": cluster}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) require.NoError(t, err) defer conn.Close() @@ -54,9 +51,7 @@ func testClickhouseCluster(t *testing.T, dsn, cluster string) { prepareClusterConn(t, olap, cluster) - t.Run("RenameView", func(t *testing.T) { - testRenameView(t, olap) - }) + t.Run("RenameView", func(t *testing.T) { testRenameView(t, olap) }) t.Run("RenameTable", func(t *testing.T) { testRenameTable(t, olap) }) t.Run("CreateTableAsSelect", func(t *testing.T) { testCreateTableAsSelect(t, olap) }) t.Run("InsertTableAsSelect_WithAppend", func(t *testing.T) { testInsertTableAsSelect_WithAppend(t, olap) }) @@ -271,6 +266,33 @@ func testDictionary(t *testing.T, olap drivers.OLAPStore) { require.NoError(t, olap.DropTable(context.Background(), "dict1")) } +func testIntervalType(t *testing.T, olap drivers.OLAPStore) { + cases := []struct { + query string + ms int64 + }{ + {query: "SELECT INTERVAL '1' SECOND", ms: 1000}, + {query: "SELECT INTERVAL '2' MINUTES", ms: 2 * 60 * 1000}, + {query: "SELECT INTERVAL '3' HOURS", ms: 3 * 60 * 60 * 1000}, + {query: "SELECT INTERVAL '4' DAYS", ms: 4 * 24 * 60 * 60 * 1000}, + {query: "SELECT INTERVAL '5' MONTHS", ms: 5 * 30 * 24 * 60 * 60 * 1000}, + {query: "SELECT INTERVAL '6' YEAR", ms: 6 * 365 * 24 * 60 * 60 * 1000}, + } + for _, c := range cases { + rows, err := olap.Execute(context.Background(), &drivers.Statement{Query: c.query}) + require.NoError(t, err) + require.Equal(t, runtimev1.Type_CODE_INTERVAL, rows.Schema.Fields[0].Type.Code) + + require.True(t, rows.Next()) + var s string + require.NoError(t, rows.Scan(&s)) + ms, ok := clickhouse.ParseIntervalToMillis(s) + require.True(t, ok) + require.Equal(t, c.ms, ms) + require.NoError(t, rows.Close()) + } +} + func prepareClusterConn(t *testing.T, olap drivers.OLAPStore, cluster string) { err := olap.Exec(context.Background(), &drivers.Statement{ Query: fmt.Sprintf("CREATE OR REPLACE TABLE foo_local ON CLUSTER %s (bar VARCHAR, baz INTEGER) engine=MergeTree ORDER BY tuple()", cluster), diff --git a/runtime/drivers/clickhouse/utils.go b/runtime/drivers/clickhouse/utils.go index 86d49183ad7..2595ac12746 100644 --- a/runtime/drivers/clickhouse/utils.go +++ b/runtime/drivers/clickhouse/utils.go @@ -1,9 +1,52 @@ package clickhouse import ( + "strconv" + "strings" + "github.com/rilldata/rill/runtime/drivers" ) +// ParseIntervalToMillis parses a ClickHouse INTERVAL string into milliseconds. +// ClickHouse currently returns INTERVALs as strings in the format "1 Month", "2 Minutes", etc. +// This function follows our current policy of treating months as 30 days when converting to milliseconds. +func ParseIntervalToMillis(s string) (int64, bool) { + s1, s2, ok := strings.Cut(s, " ") + if !ok { + return 0, false + } + + units, err := strconv.ParseInt(s1, 10, 64) + if err != nil { + return 0, false + } + + switch s2 { + case "Nanosecond", "Nanoseconds": + return int64(float64(units) / 1_000_000), true + case "Microsecond", "Microseconds": + return int64(float64(units) / 1_000), true + case "Millisecond", "Milliseconds": + return units * 1, true + case "Second", "Seconds": + return units * 1000, true + case "Minute", "Minutes": + return units * 60 * 1000, true + case "Hour", "Hours": + return units * 60 * 60 * 1000, true + case "Day", "Days": + return units * 24 * 60 * 60 * 1000, true + case "Month", "Months": + return units * 30 * 24 * 60 * 60 * 1000, true + case "Quarter", "Quarters": + return units * 3 * 30 * 24 * 60 * 60 * 1000, true + case "Year", "Years": + return units * 365 * 24 * 60 * 60 * 1000, true + default: + return 0, false + } +} + func safeSQLName(name string) string { return drivers.DialectClickHouse.EscapeIdentifier(name) } diff --git a/runtime/drivers/druid/druid_container_test.go b/runtime/drivers/druid/druid_container_test.go new file mode 100644 index 00000000000..a5705f7184a --- /dev/null +++ b/runtime/drivers/druid/druid_container_test.go @@ -0,0 +1,232 @@ +package druid + +import ( + "context" + "fmt" + "net/url" + "strings" + "testing" + "time" + + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" + "github.com/rilldata/rill/runtime/drivers" + "github.com/rilldata/rill/runtime/pkg/activity" + "github.com/rilldata/rill/runtime/storage" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + "go.uber.org/zap" +) + +const testTable = "test_data" + +var testCSV = strings.TrimSpace(` +id,timestamp,publisher,domain,bid_price +5000,2022-03-18T12:25:58.074Z,Facebook,facebook.com,4.19 +9000,2022-03-15T11:17:23.530Z,Microsoft,msn.com,3.48 +10000,2022-03-02T04:00:56.643Z,Microsoft,msn.com,3.57 +11000,2022-01-16T00:26:44.770Z,,instagram.com,5.38 +12000,2022-01-17T08:55:09.270Z,,msn.com,1.34 +13000,2022-03-20T03:16:57.618Z,Yahoo,news.yahoo.com,1.05 +14000,2022-01-29T19:05:33.545Z,Google,news.google.com,4.54 +15000,2022-03-22T00:56:22.035Z,Yahoo,news.yahoo.com,1.13 +16000,2022-01-24T13:41:43.527Z,,instagram.com,1.78 +`) + +var testIngestSpec = fmt.Sprintf(`{ + "type": "index_parallel", + "spec": { + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "inline", + "data": "%s" + }, + "inputFormat": { + "type": "csv", + "findColumnsFromHeader": true + } + }, + "tuningConfig": { + "type": "index_parallel", + "partitionsSpec": { + "type": "dynamic" + } + }, + "dataSchema": { + "dataSource": "%s", + "timestampSpec": { + "column": "timestamp", + "format": "iso" + }, + "transformSpec": {}, + "dimensionsSpec": { + "dimensions": [ + {"type": "long", "name": "id"}, + "publisher", + "domain", + {"type": "double", "name": "bid_price"} + ] + }, + "granularitySpec": { + "queryGranularity": "none", + "rollup": false, + "segmentGranularity": "day" + } + } + } +}`, strings.ReplaceAll(testCSV, "\n", "\\n"), testTable) + +// TestContainer starts a Druid cluster using testcontainers, ingests data into it, then runs all other tests +// in this file as sub-tests (to prevent spawning many clusters). +// +// Unfortunately starting a Druid cluster with test containers is extremely slow. +// If you have access to our Druid test cluster, consider using the test_druid.go file instead. +func TestContainer(t *testing.T) { + if testing.Short() { + t.Skip("druid: skipping test in short mode") + } + + ctx := context.Background() + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + Started: true, + ContainerRequest: testcontainers.ContainerRequest{ + WaitingFor: wait.ForHTTP("/status/health").WithPort("8081").WithStartupTimeout(time.Minute * 2), + Image: "gcr.io/rilldata/druid-micro:25.0.0", + ExposedPorts: []string{"8081/tcp", "8082/tcp"}, + Cmd: []string{"./bin/start-micro-quickstart"}, + }, + }) + require.NoError(t, err) + defer container.Terminate(ctx) + + coordinatorURL, err := container.PortEndpoint(ctx, "8081/tcp", "http") + require.NoError(t, err) + + t.Run("ingest", func(t *testing.T) { testIngest(t, coordinatorURL) }) + + brokerURL, err := container.PortEndpoint(ctx, "8082/tcp", "http") + require.NoError(t, err) + + druidAPIURL, err := url.JoinPath(brokerURL, "/druid/v2/sql") + require.NoError(t, err) + + dd := &driver{} + conn, err := dd.Open("default", map[string]any{"dsn": druidAPIURL}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + + olap, ok := conn.AsOLAP("") + require.True(t, ok) + + t.Run("count", func(t *testing.T) { testCount(t, olap) }) + t.Run("max", func(t *testing.T) { testMax(t, olap) }) + t.Run("schema all", func(t *testing.T) { testSchemaAll(t, olap) }) + t.Run("schema all like", func(t *testing.T) { testSchemaAllLike(t, olap) }) + t.Run("schema lookup", func(t *testing.T) { testSchemaLookup(t, olap) }) + // Add new tests here + t.Run("time floor", func(t *testing.T) { testTimeFloor(t, olap) }) + + require.NoError(t, conn.Close()) +} + +func testIngest(t *testing.T, coordinatorURL string) { + timeout := 5 * time.Minute + err := Ingest(coordinatorURL, testIngestSpec, testTable, timeout) + require.NoError(t, err) +} + +func testCount(t *testing.T, olap drivers.OLAPStore) { + qry := fmt.Sprintf("SELECT count(*) FROM %s", testTable) + rows, err := olap.Execute(context.Background(), &drivers.Statement{Query: qry}) + require.NoError(t, err) + + var count int + rows.Next() + + require.NoError(t, rows.Scan(&count)) + require.Equal(t, 9, count) + require.NoError(t, rows.Close()) +} + +func testMax(t *testing.T, olap drivers.OLAPStore) { + qry := fmt.Sprintf("SELECT max(id) FROM %s", testTable) + expectedValue := 16000 + rows, err := olap.Execute(context.Background(), &drivers.Statement{Query: qry}) + require.NoError(t, err) + + var count int + rows.Next() + require.NoError(t, rows.Scan(&count)) + require.Equal(t, expectedValue, count) + require.NoError(t, rows.Close()) +} + +func testTimeFloor(t *testing.T, olap drivers.OLAPStore) { + qry := fmt.Sprintf("SELECT time_floor(__time, 'P1D', null, CAST(? AS VARCHAR)) FROM %s", testTable) + rows, err := olap.Execute(context.Background(), &drivers.Statement{ + Query: qry, + Args: []any{"Asia/Kathmandu"}, + }) + require.NoError(t, err) + defer rows.Close() + + var tmString string + count := 0 + for rows.Next() { + require.NoError(t, rows.Scan(&tmString)) + tm, err := time.Parse(time.RFC3339, tmString) + require.NoError(t, err) + require.Equal(t, 15, tm.Minute()) + count += 1 + } + require.Equal(t, 9, count) +} + +func testSchemaAll(t *testing.T, olap drivers.OLAPStore) { + tables, err := olap.InformationSchema().All(context.Background(), "") + require.NoError(t, err) + + require.Equal(t, 1, len(tables)) + require.Equal(t, testTable, tables[0].Name) + + require.Equal(t, 5, len(tables[0].Schema.Fields)) + + mp := make(map[string]*runtimev1.StructType_Field) + for _, f := range tables[0].Schema.Fields { + mp[f.Name] = f + } + + f := mp["__time"] + require.Equal(t, "__time", f.Name) + require.Equal(t, runtimev1.Type_CODE_TIMESTAMP, f.Type.Code) + require.Equal(t, false, f.Type.Nullable) + f = mp["bid_price"] + require.Equal(t, runtimev1.Type_CODE_FLOAT64, f.Type.Code) + require.Equal(t, false, f.Type.Nullable) + f = mp["domain"] + require.Equal(t, runtimev1.Type_CODE_STRING, f.Type.Code) + require.Equal(t, true, f.Type.Nullable) + f = mp["id"] + require.Equal(t, runtimev1.Type_CODE_INT64, f.Type.Code) + require.Equal(t, false, f.Type.Nullable) + f = mp["publisher"] + require.Equal(t, runtimev1.Type_CODE_STRING, f.Type.Code) + require.Equal(t, true, f.Type.Nullable) +} + +func testSchemaAllLike(t *testing.T, olap drivers.OLAPStore) { + tables, err := olap.InformationSchema().All(context.Background(), "%test%") + require.NoError(t, err) + require.Equal(t, 1, len(tables)) + require.Equal(t, testTable, tables[0].Name) +} + +func testSchemaLookup(t *testing.T, olap drivers.OLAPStore) { + ctx := context.Background() + table, err := olap.InformationSchema().Lookup(ctx, "", "", testTable) + require.NoError(t, err) + require.Equal(t, testTable, table.Name) + + _, err = olap.InformationSchema().Lookup(ctx, "", "", "foo") + require.Equal(t, drivers.ErrNotFound, err) +} diff --git a/runtime/drivers/druid/druid_test.go b/runtime/drivers/druid/druid_test.go index 635d4f2279e..b313bdcb6e6 100644 --- a/runtime/drivers/druid/druid_test.go +++ b/runtime/drivers/druid/druid_test.go @@ -1,229 +1,49 @@ -package druid +package druid_test import ( "context" - "fmt" - "net/url" - "strings" "testing" "time" - runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/activity" "github.com/rilldata/rill/runtime/storage" + "github.com/rilldata/rill/runtime/testruntime" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" "go.uber.org/zap" ) -const testTable = "test_data" +func TestScan(t *testing.T) { + _, olap := acquireTestDruid(t) -var testCSV = strings.TrimSpace(` -id,timestamp,publisher,domain,bid_price -5000,2022-03-18T12:25:58.074Z,Facebook,facebook.com,4.19 -9000,2022-03-15T11:17:23.530Z,Microsoft,msn.com,3.48 -10000,2022-03-02T04:00:56.643Z,Microsoft,msn.com,3.57 -11000,2022-01-16T00:26:44.770Z,,instagram.com,5.38 -12000,2022-01-17T08:55:09.270Z,,msn.com,1.34 -13000,2022-03-20T03:16:57.618Z,Yahoo,news.yahoo.com,1.05 -14000,2022-01-29T19:05:33.545Z,Google,news.google.com,4.54 -15000,2022-03-22T00:56:22.035Z,Yahoo,news.yahoo.com,1.13 -16000,2022-01-24T13:41:43.527Z,,instagram.com,1.78 -`) - -var testIngestSpec = fmt.Sprintf(`{ - "type": "index_parallel", - "spec": { - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "inline", - "data": "%s" - }, - "inputFormat": { - "type": "csv", - "findColumnsFromHeader": true - } - }, - "tuningConfig": { - "type": "index_parallel", - "partitionsSpec": { - "type": "dynamic" - } - }, - "dataSchema": { - "dataSource": "%s", - "timestampSpec": { - "column": "timestamp", - "format": "iso" - }, - "transformSpec": {}, - "dimensionsSpec": { - "dimensions": [ - {"type": "long", "name": "id"}, - "publisher", - "domain", - {"type": "double", "name": "bid_price"} - ] - }, - "granularitySpec": { - "queryGranularity": "none", - "rollup": false, - "segmentGranularity": "day" - } - } - } -}`, strings.ReplaceAll(testCSV, "\n", "\\n"), testTable) - -// TestDruid starts a Druid cluster using testcontainers, ingests data into it, then runs all other tests -// in this file as sub-tests (to prevent spawning many clusters). -func TestDruid(t *testing.T) { - if testing.Short() { - t.Skip("druid: skipping test in short mode") - } - - ctx := context.Background() - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - Started: true, - ContainerRequest: testcontainers.ContainerRequest{ - WaitingFor: wait.ForHTTP("/status/health").WithPort("8081").WithStartupTimeout(time.Minute * 2), - Image: "gcr.io/rilldata/druid-micro:25.0.0", - ExposedPorts: []string{"8081/tcp", "8082/tcp"}, - Cmd: []string{"./bin/start-micro-quickstart"}, - }, - }) - require.NoError(t, err) - defer container.Terminate(ctx) - - coordinatorURL, err := container.PortEndpoint(ctx, "8081/tcp", "http") - require.NoError(t, err) - - t.Run("ingest", func(t *testing.T) { testIngest(t, coordinatorURL) }) - - brokerURL, err := container.PortEndpoint(ctx, "8082/tcp", "http") - require.NoError(t, err) - - druidAPIURL, err := url.JoinPath(brokerURL, "/druid/v2/sql") - require.NoError(t, err) - - dd := &driver{} - conn, err := dd.Open("default", map[string]any{"dsn": druidAPIURL}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) - require.NoError(t, err) - - olap, ok := conn.AsOLAP("") - require.True(t, ok) - - t.Run("count", func(t *testing.T) { testCount(t, olap) }) - t.Run("max", func(t *testing.T) { testMax(t, olap) }) - t.Run("schema all", func(t *testing.T) { testSchemaAll(t, olap) }) - t.Run("schema all like", func(t *testing.T) { testSchemaAllLike(t, olap) }) - t.Run("schema lookup", func(t *testing.T) { testSchemaLookup(t, olap) }) - // Add new tests here - t.Run("time floor", func(t *testing.T) { testTimeFloor(t, olap) }) - - require.NoError(t, conn.Close()) -} - -func testIngest(t *testing.T, coordinatorURL string) { - timeout := 5 * time.Minute - err := Ingest(coordinatorURL, testIngestSpec, testTable, timeout) - require.NoError(t, err) -} - -func testCount(t *testing.T, olap drivers.OLAPStore) { - qry := fmt.Sprintf("SELECT count(*) FROM %s", testTable) - rows, err := olap.Execute(context.Background(), &drivers.Statement{Query: qry}) + rows, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT 1, 'hello world', true, null, CAST('2024-01-01T00:00:00Z' AS TIMESTAMP)"}) require.NoError(t, err) - var count int - rows.Next() - - require.NoError(t, rows.Scan(&count)) - require.Equal(t, 9, count) - require.NoError(t, rows.Close()) -} + var i int + var s string + var b bool + var n any + var t1 time.Time + require.True(t, rows.Next()) + require.NoError(t, rows.Scan(&i, &s, &b, &n, &t1)) -func testMax(t *testing.T, olap drivers.OLAPStore) { - qry := fmt.Sprintf("SELECT max(id) FROM %s", testTable) - expectedValue := 16000 - rows, err := olap.Execute(context.Background(), &drivers.Statement{Query: qry}) - require.NoError(t, err) + require.Equal(t, 1, i) + require.Equal(t, "hello world", s) + require.Equal(t, true, b) + require.Nil(t, n) + require.Equal(t, time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), t1) - var count int - rows.Next() - require.NoError(t, rows.Scan(&count)) - require.Equal(t, expectedValue, count) require.NoError(t, rows.Close()) } -func testTimeFloor(t *testing.T, olap drivers.OLAPStore) { - qry := fmt.Sprintf("SELECT time_floor(__time, 'P1D', null, CAST(? AS VARCHAR)) FROM %s", testTable) - rows, err := olap.Execute(context.Background(), &drivers.Statement{ - Query: qry, - Args: []any{"Asia/Kathmandu"}, - }) - require.NoError(t, err) - defer rows.Close() - - var tmString string - count := 0 - for rows.Next() { - require.NoError(t, rows.Scan(&tmString)) - tm, err := time.Parse(time.RFC3339, tmString) - require.NoError(t, err) - require.Equal(t, 15, tm.Minute()) - count += 1 - } - require.Equal(t, 9, count) -} - -func testSchemaAll(t *testing.T, olap drivers.OLAPStore) { - tables, err := olap.InformationSchema().All(context.Background(), "") +func acquireTestDruid(t *testing.T) (drivers.Handle, drivers.OLAPStore) { + cfg := testruntime.AcquireConnector(t, "druid") + conn, err := drivers.Open("druid", "default", cfg, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) require.NoError(t, err) + t.Cleanup(func() { conn.Close() }) - require.Equal(t, 1, len(tables)) - require.Equal(t, testTable, tables[0].Name) - - require.Equal(t, 5, len(tables[0].Schema.Fields)) - - mp := make(map[string]*runtimev1.StructType_Field) - for _, f := range tables[0].Schema.Fields { - mp[f.Name] = f - } - - f := mp["__time"] - require.Equal(t, "__time", f.Name) - require.Equal(t, runtimev1.Type_CODE_TIMESTAMP, f.Type.Code) - require.Equal(t, false, f.Type.Nullable) - f = mp["bid_price"] - require.Equal(t, runtimev1.Type_CODE_FLOAT64, f.Type.Code) - require.Equal(t, false, f.Type.Nullable) - f = mp["domain"] - require.Equal(t, runtimev1.Type_CODE_STRING, f.Type.Code) - require.Equal(t, true, f.Type.Nullable) - f = mp["id"] - require.Equal(t, runtimev1.Type_CODE_INT64, f.Type.Code) - require.Equal(t, false, f.Type.Nullable) - f = mp["publisher"] - require.Equal(t, runtimev1.Type_CODE_STRING, f.Type.Code) - require.Equal(t, true, f.Type.Nullable) -} - -func testSchemaAllLike(t *testing.T, olap drivers.OLAPStore) { - tables, err := olap.InformationSchema().All(context.Background(), "%test%") - require.NoError(t, err) - require.Equal(t, 1, len(tables)) - require.Equal(t, testTable, tables[0].Name) -} - -func testSchemaLookup(t *testing.T, olap drivers.OLAPStore) { - ctx := context.Background() - table, err := olap.InformationSchema().Lookup(ctx, "", "", testTable) - require.NoError(t, err) - require.Equal(t, testTable, table.Name) + olap, ok := conn.AsOLAP("default") + require.True(t, ok) - _, err = olap.InformationSchema().Lookup(ctx, "", "", "foo") - require.Equal(t, drivers.ErrNotFound, err) + return conn, olap } diff --git a/runtime/drivers/druid/sql_driver_test.go b/runtime/drivers/druid/sql_driver_test.go index ff9dbf57bd7..b526b77767b 100644 --- a/runtime/drivers/druid/sql_driver_test.go +++ b/runtime/drivers/druid/sql_driver_test.go @@ -6,11 +6,10 @@ import ( "testing" "github.com/rilldata/rill/runtime/drivers" - "github.com/rilldata/rill/runtime/storage" - "github.com/stretchr/testify/require" - "github.com/rilldata/rill/runtime/pkg/activity" "github.com/rilldata/rill/runtime/pkg/pbutil" + "github.com/rilldata/rill/runtime/storage" + "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/protobuf/types/known/structpb" ) diff --git a/runtime/drivers/duckdb/information_schema.go b/runtime/drivers/duckdb/information_schema.go index ee199f1aaf2..3b780f0a067 100644 --- a/runtime/drivers/duckdb/information_schema.go +++ b/runtime/drivers/duckdb/information_schema.go @@ -199,7 +199,7 @@ func databaseTypeToPB(dbt string, nullable bool) (*runtimev1.Type, error) { case "TIME WITH TIME ZONE": t.Code = runtimev1.Type_CODE_TIME case "INTERVAL": - t.Code = runtimev1.Type_CODE_UNSPECIFIED // TODO - Consider adding interval type + t.Code = runtimev1.Type_CODE_INTERVAL case "HUGEINT": t.Code = runtimev1.Type_CODE_INT128 case "VARCHAR": diff --git a/runtime/drivers/file/model_executor_olap_self.go b/runtime/drivers/file/model_executor_olap_self.go index 5b30aea8fed..bb4804b1624 100644 --- a/runtime/drivers/file/model_executor_olap_self.go +++ b/runtime/drivers/file/model_executor_olap_self.go @@ -268,7 +268,7 @@ func writeParquet(res *drivers.Result, fw io.Writer) error { arrowField.Type = arrow.PrimitiveTypes.Float64 case runtimev1.Type_CODE_TIMESTAMP, runtimev1.Type_CODE_TIME: arrowField.Type = arrow.FixedWidthTypes.Timestamp_us - case runtimev1.Type_CODE_STRING, runtimev1.Type_CODE_DATE, runtimev1.Type_CODE_ARRAY, runtimev1.Type_CODE_STRUCT, runtimev1.Type_CODE_MAP, runtimev1.Type_CODE_JSON, runtimev1.Type_CODE_UUID: + case runtimev1.Type_CODE_STRING, runtimev1.Type_CODE_INTERVAL, runtimev1.Type_CODE_DATE, runtimev1.Type_CODE_ARRAY, runtimev1.Type_CODE_STRUCT, runtimev1.Type_CODE_MAP, runtimev1.Type_CODE_JSON, runtimev1.Type_CODE_UUID: arrowField.Type = arrow.BinaryTypes.String case runtimev1.Type_CODE_BYTES: arrowField.Type = arrow.BinaryTypes.Binary @@ -334,7 +334,7 @@ func writeParquet(res *drivers.Result, fw io.Writer) error { return err } recordBuilder.Field(i).(*array.TimestampBuilder).Append(tmp) - case runtimev1.Type_CODE_STRING, runtimev1.Type_CODE_DATE, runtimev1.Type_CODE_ARRAY, runtimev1.Type_CODE_STRUCT, runtimev1.Type_CODE_MAP, runtimev1.Type_CODE_JSON, runtimev1.Type_CODE_UUID: + case runtimev1.Type_CODE_STRING, runtimev1.Type_CODE_INTERVAL, runtimev1.Type_CODE_DATE, runtimev1.Type_CODE_ARRAY, runtimev1.Type_CODE_STRUCT, runtimev1.Type_CODE_MAP, runtimev1.Type_CODE_JSON, runtimev1.Type_CODE_UUID: res, err := json.Marshal(v) if err != nil { return fmt.Errorf("failed to convert to JSON value: %w", err) diff --git a/runtime/metricsview/executor.go b/runtime/metricsview/executor.go index c6936622f5b..9268fb9f773 100644 --- a/runtime/metricsview/executor.go +++ b/runtime/metricsview/executor.go @@ -71,12 +71,6 @@ func (e *Executor) Cacheable(qry *Query) bool { return e.olap.Dialect() == drivers.DialectDuckDB } -// ValidateMetricsView validates the dimensions and measures in the executor's metrics view. -func (e *Executor) ValidateMetricsView(ctx context.Context) error { - // TODO: Implement it - panic("not implemented") -} - // ValidateQuery validates the provided query against the executor's metrics view. func (e *Executor) ValidateQuery(qry *Query) error { // TODO: Implement it diff --git a/runtime/validate.go b/runtime/metricsview/executor_validate.go similarity index 68% rename from runtime/validate.go rename to runtime/metricsview/executor_validate.go index 0e84c604313..34ba4dd7c9e 100644 --- a/runtime/validate.go +++ b/runtime/metricsview/executor_validate.go @@ -1,4 +1,4 @@ -package runtime +package metricsview import ( "context" @@ -10,12 +10,14 @@ import ( "sync" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" + "github.com/rilldata/rill/runtime" "github.com/rilldata/rill/runtime/drivers" "golang.org/x/sync/errgroup" ) const validateConcurrencyLimit = 10 +// ValidateMetricsViewResult contains the results of validating a metrics view. type ValidateMetricsViewResult struct { TimeDimensionErr error DimensionErrs []IndexErr @@ -23,11 +25,13 @@ type ValidateMetricsViewResult struct { OtherErrs []error } +// IndexErr contains an error and the index of the dimension or measure that caused the error. type IndexErr struct { Idx int Err error } +// IsZero returns true if the result contains no errors. func (r *ValidateMetricsViewResult) IsZero() bool { return r.TimeDimensionErr == nil && len(r.DimensionErrs) == 0 && len(r.MeasureErrs) == 0 && len(r.OtherErrs) == 0 } @@ -49,25 +53,14 @@ func (r *ValidateMetricsViewResult) Error() error { return errors.Join(errs...) } -// ValidateMetricsView validates a metrics view spec. -// NOTE: If we need validation for more resources, we should consider moving it to the queries (or a dedicated validation package). -func (r *Runtime) ValidateMetricsView(ctx context.Context, instanceID string, mv *runtimev1.MetricsViewSpec) (*ValidateMetricsViewResult, error) { - ctrl, err := r.Controller(ctx, instanceID) - if err != nil { - return nil, err - } - - olap, release, err := ctrl.AcquireOLAP(ctx, mv.Connector) - if err != nil { - return nil, err - } - defer release() - +// ValidateMetricsView validates the dimensions and measures in the executor's metrics view. +func (e *Executor) ValidateMetricsView(ctx context.Context) (*ValidateMetricsViewResult, error) { // Create the result res := &ValidateMetricsViewResult{} // Check underlying table exists - t, err := olap.InformationSchema().Lookup(ctx, mv.Database, mv.DatabaseSchema, mv.Table) + mv := e.metricsView + t, err := e.olap.InformationSchema().Lookup(ctx, mv.Database, mv.DatabaseSchema, mv.Table) if err != nil { if errors.Is(err, drivers.ErrNotFound) { res.OtherErrs = append(res.OtherErrs, fmt.Errorf("table %q does not exist", mv.Table)) @@ -112,7 +105,7 @@ func (r *Runtime) ValidateMetricsView(ctx context.Context, instanceID string, mv // ClickHouse specifically does not support using a column name as a dimension or measure name if the dimension or measure has an expression. // This is due to ClickHouse's aggressive substitution of aliases: https://github.com/ClickHouse/ClickHouse/issues/9715. - if olap.Dialect() == drivers.DialectClickHouse { + if e.olap.Dialect() == drivers.DialectClickHouse { for _, d := range mv.Dimensions { if d.Expression == "" && !d.Unnest { continue @@ -133,25 +126,39 @@ func (r *Runtime) ValidateMetricsView(ctx context.Context, instanceID string, mv } // For performance, attempt to validate all dimensions and measures at once - err = validateAllDimensionsAndMeasures(ctx, olap, t, mv) + err = e.validateAllDimensionsAndMeasures(ctx, t, mv) if err != nil { // One or more dimension/measure expressions failed to validate. We need to check each one individually to provide useful errors. - validateIndividualDimensionsAndMeasures(ctx, olap, t, mv, cols, res) + e.validateIndividualDimensionsAndMeasures(ctx, t, mv, cols, res) } // Pinot does have any native support for time shift using time grain specifiers - if olap.Dialect() == drivers.DialectPinot && (mv.FirstDayOfWeek > 1 || mv.FirstMonthOfYear > 1) { + if e.olap.Dialect() == drivers.DialectPinot && (mv.FirstDayOfWeek > 1 || mv.FirstMonthOfYear > 1) { res.OtherErrs = append(res.OtherErrs, fmt.Errorf("time shift not supported for Pinot dialect, so FirstDayOfWeek and FirstMonthOfYear should be 1")) } // Check the default theme exists if mv.DefaultTheme != "" { - _, err := ctrl.Get(ctx, &runtimev1.ResourceName{Kind: ResourceKindTheme, Name: mv.DefaultTheme}, false) + ctrl, err := e.rt.Controller(ctx, e.instanceID) + if err != nil { + return nil, fmt.Errorf("could not get controller: %w", err) + } + + _, err = ctrl.Get(ctx, &runtimev1.ResourceName{Kind: runtime.ResourceKindTheme, Name: mv.DefaultTheme}, false) if err != nil { if errors.Is(err, drivers.ErrNotFound) { res.OtherErrs = append(res.OtherErrs, fmt.Errorf("theme %q does not exist", mv.DefaultTheme)) + } else { + return nil, fmt.Errorf("could not find theme %q: %w", mv.DefaultTheme, err) } - return nil, fmt.Errorf("could not find theme %q: %w", mv.DefaultTheme, err) + } + } + + // Validate the metrics view schema. + if res.IsZero() { // All dimensions and measures need to be valid to compute the schema. + err = e.validateSchema(ctx, res) + if err != nil { + res.OtherErrs = append(res.OtherErrs, fmt.Errorf("failed to validate metrics view schema: %w", err)) } } @@ -159,8 +166,8 @@ func (r *Runtime) ValidateMetricsView(ctx context.Context, instanceID string, mv } // validateAllDimensionsAndMeasures validates all dimensions and measures with one query. It returns an error if any of the expressions are invalid. -func validateAllDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, mv *runtimev1.MetricsViewSpec) error { - dialect := olap.Dialect() +func (e *Executor) validateAllDimensionsAndMeasures(ctx context.Context, t *drivers.Table, mv *runtimev1.MetricsViewSpec) error { + dialect := e.olap.Dialect() var dimExprs []string var unnestClauses []string var groupIndexes []string @@ -186,13 +193,13 @@ func validateAllDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStor } if len(dimExprs) == 0 { // Only metrics - query = fmt.Sprintf("SELECT 1, %s FROM %s GROUP BY 1", strings.Join(metricExprs, ","), olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name)) + query = fmt.Sprintf("SELECT 1, %s FROM %s GROUP BY 1", strings.Join(metricExprs, ","), e.olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name)) } else if len(metricExprs) == 0 { // No metrics query = fmt.Sprintf( "SELECT %s FROM %s %s GROUP BY %s", strings.Join(dimExprs, ","), - olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name), + e.olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name), strings.Join(unnestClauses, ""), strings.Join(groupIndexes, ","), ) @@ -201,12 +208,12 @@ func validateAllDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStor "SELECT %s, %s FROM %s %s GROUP BY %s", strings.Join(dimExprs, ","), strings.Join(metricExprs, ","), - olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name), + e.olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name), strings.Join(unnestClauses, ""), strings.Join(groupIndexes, ","), ) } - err := olap.Exec(ctx, &drivers.Statement{ + err := e.olap.Exec(ctx, &drivers.Statement{ Query: query, DryRun: true, }) @@ -218,7 +225,7 @@ func validateAllDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStor // validateIndividualDimensionsAndMeasures validates each dimension and measure individually. // It adds validation errors to the provided res. -func validateIndividualDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, mv *runtimev1.MetricsViewSpec, cols map[string]*runtimev1.StructType_Field, res *ValidateMetricsViewResult) { +func (e *Executor) validateIndividualDimensionsAndMeasures(ctx context.Context, t *drivers.Table, mv *runtimev1.MetricsViewSpec, cols map[string]*runtimev1.StructType_Field, res *ValidateMetricsViewResult) { // Validate dimensions and measures concurrently with a limit of 10 concurrent validations var mu sync.Mutex var grp errgroup.Group @@ -229,7 +236,7 @@ func validateIndividualDimensionsAndMeasures(ctx context.Context, olap drivers.O idx := idx d := d grp.Go(func() error { - err := validateDimension(ctx, olap, t, d, cols) + err := e.validateDimension(ctx, t, d, cols) if err != nil { mu.Lock() defer mu.Unlock() @@ -252,7 +259,7 @@ func validateIndividualDimensionsAndMeasures(ctx context.Context, olap drivers.O idx := idx m := m grp.Go(func() error { - err := validateMeasure(ctx, olap, t, m) + err := e.validateMeasure(ctx, t, m) if err != nil { mu.Lock() defer mu.Unlock() @@ -275,7 +282,7 @@ func validateIndividualDimensionsAndMeasures(ctx context.Context, olap drivers.O } // validateDimension validates a metrics view dimension. -func validateDimension(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, d *runtimev1.MetricsViewSpec_DimensionV2, fields map[string]*runtimev1.StructType_Field) error { +func (e *Executor) validateDimension(ctx context.Context, t *drivers.Table, d *runtimev1.MetricsViewSpec_DimensionV2, fields map[string]*runtimev1.StructType_Field) error { // Validate with a simple check if it's a column if d.Column != "" { if _, isColumn := fields[strings.ToLower(d.Column)]; !isColumn { @@ -287,11 +294,11 @@ func validateDimension(ctx context.Context, olap drivers.OLAPStore, t *drivers.T } } - dialect := olap.Dialect() + dialect := e.olap.Dialect() expr, unnestClause := dialect.DimensionSelect(t.Database, t.DatabaseSchema, t.Name, d) // Validate with a query if it's an expression - err := olap.Exec(ctx, &drivers.Statement{ + err := e.olap.Exec(ctx, &drivers.Statement{ Query: fmt.Sprintf("SELECT %s FROM %s %s GROUP BY 1", expr, dialect.EscapeTable(t.Database, t.DatabaseSchema, t.Name), unnestClause), DryRun: true, }) @@ -302,10 +309,41 @@ func validateDimension(ctx context.Context, olap drivers.OLAPStore, t *drivers.T } // validateMeasure validates a metrics view measure. -func validateMeasure(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, m *runtimev1.MetricsViewSpec_MeasureV2) error { - err := olap.Exec(ctx, &drivers.Statement{ - Query: fmt.Sprintf("SELECT 1, (%s) FROM %s GROUP BY 1", m.Expression, olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name)), +func (e *Executor) validateMeasure(ctx context.Context, t *drivers.Table, m *runtimev1.MetricsViewSpec_MeasureV2) error { + err := e.olap.Exec(ctx, &drivers.Statement{ + Query: fmt.Sprintf("SELECT 1, (%s) FROM %s GROUP BY 1", m.Expression, e.olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name)), DryRun: true, }) return err } + +// validateSchema validates that the metrics view's measures are numeric. +func (e *Executor) validateSchema(ctx context.Context, res *ValidateMetricsViewResult) error { + // Resolve the schema of the metrics view's dimensions and measures + schema, err := e.Schema(ctx) + if err != nil { + return err + } + types := make(map[string]*runtimev1.Type, len(schema.Fields)) + for _, f := range schema.Fields { + types[f.Name] = f.Type + } + + // Check that the measures are not strings + for i, m := range e.metricsView.Measures { + typ, ok := types[m.Name] + if !ok { + return fmt.Errorf("internal: measure %q not found in schema", m.Name) + } + + switch typ.Code { + case runtimev1.Type_CODE_TIMESTAMP, runtimev1.Type_CODE_DATE, runtimev1.Type_CODE_TIME, runtimev1.Type_CODE_STRING, runtimev1.Type_CODE_BYTES, runtimev1.Type_CODE_ARRAY, runtimev1.Type_CODE_STRUCT, runtimev1.Type_CODE_MAP, runtimev1.Type_CODE_JSON, runtimev1.Type_CODE_UUID: + res.MeasureErrs = append(res.MeasureErrs, IndexErr{ + Idx: i, + Err: fmt.Errorf("measure %q is of type %s, but must be a numeric type", m.Name, typ.Code), + }) + } + } + + return nil +} diff --git a/runtime/validate_test.go b/runtime/metricsview/executor_validate_test.go similarity index 92% rename from runtime/validate_test.go rename to runtime/metricsview/executor_validate_test.go index 84e34a554f3..826f1266d1b 100644 --- a/runtime/validate_test.go +++ b/runtime/metricsview/executor_validate_test.go @@ -1,4 +1,4 @@ -package runtime_test +package metricsview_test import ( "context" @@ -6,26 +6,32 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" + "github.com/rilldata/rill/runtime/metricsview" "github.com/rilldata/rill/runtime/testruntime" "github.com/stretchr/testify/require" ) func TestValidateMetricsView(t *testing.T) { rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") - res, err := rt.ValidateMetricsView(context.Background(), instanceID, &runtimev1.MetricsViewSpec{ + mv := &runtimev1.MetricsViewSpec{ Connector: "duckdb", Table: "ad_bids", DisplayName: "Ad Bids", TimeDimension: "timestamp", Dimensions: []*runtimev1.MetricsViewSpec_DimensionV2{ - {Column: "publisher"}, + {Name: "publisher", Column: "publisher"}, }, Measures: []*runtimev1.MetricsViewSpec_MeasureV2{ {Name: "records", Expression: "count(*)", Type: runtimev1.MetricsViewSpec_MEASURE_TYPE_SIMPLE}, {Name: "invalid_nested_aggregation", Expression: "MAX(COUNT(DISTINCT publisher))", Type: runtimev1.MetricsViewSpec_MEASURE_TYPE_SIMPLE}, {Name: "invalid_partition", Expression: "AVG(bid_price) OVER (PARTITION BY publisher)", Type: runtimev1.MetricsViewSpec_MEASURE_TYPE_SIMPLE}, }, - }) + } + + e, err := metricsview.NewExecutor(context.Background(), rt, instanceID, mv, runtime.ResolvedSecurityOpen, 0) + require.NoError(t, err) + + res, err := e.ValidateMetricsView(context.Background()) require.NoError(t, err) require.Empty(t, res.TimeDimensionErr) require.Empty(t, res.DimensionErrs) diff --git a/runtime/pkg/jsonval/jsonval.go b/runtime/pkg/jsonval/jsonval.go index 384b9d748db..2a7e503ab7f 100644 --- a/runtime/pkg/jsonval/jsonval.go +++ b/runtime/pkg/jsonval/jsonval.go @@ -14,6 +14,7 @@ import ( "github.com/marcboeker/go-duckdb" "github.com/paulmach/orb" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" + "github.com/rilldata/rill/runtime/drivers/clickhouse" ) // ToValue converts a value scanned from a database/sql driver to a Go type that can be marshaled to JSON. @@ -58,12 +59,22 @@ func ToValue(v any, t *runtimev1.Type) (any, error) { } return v, nil case string: - if t != nil && t.Code == runtimev1.Type_CODE_DECIMAL { - // Evil cast to float until frontend can deal with bigs: - v2, ok := new(big.Float).SetString(v) - if ok { - f, _ := v2.Float64() - return f, nil + if t != nil { + switch t.Code { + case runtimev1.Type_CODE_DECIMAL: + // Evil cast to float until frontend can deal with bigs: + v2, ok := new(big.Float).SetString(v) + if ok { + f, _ := v2.Float64() + return f, nil + } + case runtimev1.Type_CODE_INTERVAL: + // ClickHouse currently returns INTERVALs as strings. + // Our current policy is to convert INTERVALs to milliseconds, treating one month as 30 days. + v2, ok := clickhouse.ParseIntervalToMillis(v) + if ok { + return v2, nil + } } } return strings.ToValidUTF8(v, "�"), nil @@ -110,7 +121,11 @@ func ToValue(v any, t *runtimev1.Type) (any, error) { case duckdb.Map: return ToValue(map[any]any(v), t) case duckdb.Interval: - return map[string]any{"months": v.Months, "days": v.Days, "micros": v.Micros}, nil + // Our current policy is to convert INTERVALs to milliseconds, treating one month as 30 days. + ms := v.Micros / 1000 + ms += int64(v.Days) * 24 * 60 * 60 * 1000 + ms += int64(v.Months) * 30 * 24 * 60 * 60 * 1000 + return ms, nil case net.IP: return v.String(), nil case orb.Point: diff --git a/runtime/pkg/pbutil/pbutil.go b/runtime/pkg/pbutil/pbutil.go index 1860759c77a..8d7cc874c32 100644 --- a/runtime/pkg/pbutil/pbutil.go +++ b/runtime/pkg/pbutil/pbutil.go @@ -13,6 +13,7 @@ import ( "github.com/marcboeker/go-duckdb" "github.com/paulmach/orb" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" + "github.com/rilldata/rill/runtime/drivers/clickhouse" "google.golang.org/protobuf/types/known/structpb" ) @@ -103,12 +104,11 @@ func ToValue(v any, t *runtimev1.Type) (*structpb.Value, error) { } return structpb.NewStructValue(v2), nil case duckdb.Interval: - m := map[string]any{"months": v.Months, "days": v.Days, "micros": v.Micros} - v2, err := ToStruct(m, nil) - if err != nil { - return nil, err - } - return structpb.NewStructValue(v2), nil + // Our current policy is to convert INTERVALs to milliseconds, treating one month as 30 days. + ms := v.Micros / 1000 + ms += int64(v.Days) * 24 * 60 * 60 * 1000 + ms += int64(v.Months) * 30 * 24 * 60 * 60 * 1000 + return structpb.NewNumberValue(float64(ms)), nil case []byte: if t != nil && t.Code == runtimev1.Type_CODE_UUID { uid, err := uuid.FromBytes(v) @@ -117,12 +117,22 @@ func ToValue(v any, t *runtimev1.Type) (*structpb.Value, error) { } } case string: - if t != nil && t.Code == runtimev1.Type_CODE_DECIMAL { - // Evil cast to float until frontend can deal with bigs: - v2, ok := new(big.Float).SetString(v) - if ok { - f, _ := v2.Float64() - return structpb.NewNumberValue(f), nil + if t != nil { + switch t.Code { + case runtimev1.Type_CODE_DECIMAL: + // Evil cast to float until frontend can deal with bigs: + v2, ok := new(big.Float).SetString(v) + if ok { + f, _ := v2.Float64() + return structpb.NewNumberValue(f), nil + } + case runtimev1.Type_CODE_INTERVAL: + // ClickHouse currently returns INTERVALs as strings. + // Our current policy is to convert INTERVALs to milliseconds, treating one month as 30 days. + v2, ok := clickhouse.ParseIntervalToMillis(v) + if ok { + return structpb.NewNumberValue(float64(v2)), nil + } } } return structpb.NewStringValue(strings.ToValidUTF8(v, "�")), nil diff --git a/runtime/queries/metricsview.go b/runtime/queries/metricsview.go index 7f96fe86703..f83044107af 100644 --- a/runtime/queries/metricsview.go +++ b/runtime/queries/metricsview.go @@ -648,7 +648,7 @@ func WriteParquet(meta []*runtimev1.MetricsViewColumn, data []*structpb.Struct, arrowField.Type = arrow.PrimitiveTypes.Float32 case runtimev1.Type_CODE_FLOAT64: arrowField.Type = arrow.PrimitiveTypes.Float64 - case runtimev1.Type_CODE_STRUCT, runtimev1.Type_CODE_UUID, runtimev1.Type_CODE_ARRAY, runtimev1.Type_CODE_STRING, runtimev1.Type_CODE_MAP: + case runtimev1.Type_CODE_STRUCT, runtimev1.Type_CODE_UUID, runtimev1.Type_CODE_ARRAY, runtimev1.Type_CODE_STRING, runtimev1.Type_CODE_MAP, runtimev1.Type_CODE_INTERVAL: arrowField.Type = arrow.BinaryTypes.String case runtimev1.Type_CODE_TIMESTAMP, runtimev1.Type_CODE_DATE, runtimev1.Type_CODE_TIME: arrowField.Type = arrow.FixedWidthTypes.Timestamp_us @@ -707,6 +707,15 @@ func WriteParquet(meta []*runtimev1.MetricsViewColumn, data []*structpb.Struct, } recordBuilder.Field(idx).(*array.StringBuilder).Append(string(bts)) + case runtimev1.Type_CODE_INTERVAL: + switch v := v.GetKind().(type) { + case *structpb.Value_NumberValue: + s := fmt.Sprintf("%f", v.NumberValue) + recordBuilder.Field(idx).(*array.StringBuilder).Append(s) + case *structpb.Value_StringValue: + recordBuilder.Field(idx).(*array.StringBuilder).Append(v.StringValue) + default: + } } } } diff --git a/runtime/reconcilers/metrics_view.go b/runtime/reconcilers/metrics_view.go index 3eb5dd3f7e6..0fe95511828 100644 --- a/runtime/reconcilers/metrics_view.go +++ b/runtime/reconcilers/metrics_view.go @@ -7,6 +7,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" + "github.com/rilldata/rill/runtime/metricsview" ) func init() { @@ -85,7 +86,12 @@ func (r *MetricsViewReconciler) Reconcile(ctx context.Context, n *runtimev1.Reso // NOTE: Not checking refs for errors since they may still be valid even if they have errors. Instead, we just validate the metrics view against the table name. // Validate the metrics view and update ValidSpec - validateResult, validateErr := r.C.Runtime.ValidateMetricsView(ctx, r.C.InstanceID, mv.Spec) + e, err := metricsview.NewExecutor(ctx, r.C.Runtime, r.C.InstanceID, mv.Spec, runtime.ResolvedSecurityOpen, 0) + if err != nil { + return runtime.ReconcileResult{Err: fmt.Errorf("failed to create metrics view executor: %w", err)} + } + defer e.Close() + validateResult, validateErr := e.ValidateMetricsView(ctx) if validateErr == nil { validateErr = validateResult.Error() } diff --git a/runtime/resolvers/resolvers_test.go b/runtime/resolvers/resolvers_test.go index d4dd267a1d9..03230fea308 100644 --- a/runtime/resolvers/resolvers_test.go +++ b/runtime/resolvers/resolvers_test.go @@ -6,6 +6,7 @@ import ( "encoding/csv" "encoding/json" "flag" + "fmt" "os" "path/filepath" "strings" @@ -16,7 +17,6 @@ import ( "github.com/rilldata/rill/runtime/pkg/fileutil" "github.com/rilldata/rill/runtime/testruntime" "github.com/stretchr/testify/require" - "golang.org/x/exp/maps" "gopkg.in/yaml.v3" ) @@ -103,8 +103,11 @@ func TestResolvers(t *testing.T) { for _, connector := range tf.Connectors { acquire, ok := testruntime.Connectors[connector] require.True(t, ok, "unknown connector %q", connector) - connectorVars := acquire(t) - maps.Copy(vars, connectorVars) + cfg := acquire(t) + for k, v := range cfg { + k = fmt.Sprintf("connector.%s.%s", connector, k) + vars[k] = v + } } // Create the test runtime instance. diff --git a/runtime/security.go b/runtime/security.go index f84576b346d..39950e32d24 100644 --- a/runtime/security.go +++ b/runtime/security.go @@ -149,16 +149,16 @@ func (r *ResolvedSecurity) QueryFilter() *runtimev1.Expression { // truth is the compass that guides us through the labyrinth of existence. var truth = true -// openAccess allows access to a resource. -var openAccess = &ResolvedSecurity{ +// ResolvedSecurityOpen is a ResolvedSecurity that allows access with no restrictions. +var ResolvedSecurityOpen = &ResolvedSecurity{ access: &truth, fieldAccess: nil, rowFilter: "", queryFilter: nil, } -// closedAccess denies access to a resource. -var closedAccess = &ResolvedSecurity{ +// ResolvedSecurityClosed is a ResolvedSecurity that denies access. +var ResolvedSecurityClosed = &ResolvedSecurity{ access: nil, fieldAccess: nil, rowFilter: "", @@ -194,7 +194,7 @@ func newSecurityEngine(cacheSize int, logger *zap.Logger) *securityEngine { func (p *securityEngine) resolveSecurity(instanceID, environment string, vars map[string]string, claims *SecurityClaims, r *runtimev1.Resource) (*ResolvedSecurity, error) { // If security checks are skipped, return open access if claims.SkipChecks { - return openAccess, nil + return ResolvedSecurityOpen, nil } // Combine rules with any contained in the resource itself @@ -209,7 +209,7 @@ func (p *securityEngine) resolveSecurity(instanceID, environment string, vars ma } } if !validRule { - return closedAccess, nil + return ResolvedSecurityClosed, nil } cacheKey, err := computeCacheKey(instanceID, environment, claims, r) diff --git a/runtime/server/generate_metrics_view.go b/runtime/server/generate_metrics_view.go index c2570be3784..7bb4053d8fd 100644 --- a/runtime/server/generate_metrics_view.go +++ b/runtime/server/generate_metrics_view.go @@ -13,6 +13,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" "github.com/rilldata/rill/runtime/drivers" + "github.com/rilldata/rill/runtime/metricsview" "github.com/rilldata/rill/runtime/pkg/activity" "github.com/rilldata/rill/runtime/pkg/observability" "github.com/rilldata/rill/runtime/server/auth" @@ -252,7 +253,13 @@ func (s *Server) generateMetricsViewYAMLWithAI(ctx context.Context, instanceID, FormatPreset: measure.FormatPreset, }) } - validateResult, err := s.runtime.ValidateMetricsView(ctx, instanceID, spec) + + e, err := metricsview.NewExecutor(ctx, s.runtime, instanceID, spec, runtime.ResolvedSecurityOpen, 0) + if err != nil { + return nil, err + } + defer e.Close() + validateResult, err := e.ValidateMetricsView(ctx) if err != nil { return nil, err } diff --git a/runtime/testruntime/connectors.go b/runtime/testruntime/connectors.go index c9eb6d20ea6..8d907ada28d 100644 --- a/runtime/testruntime/connectors.go +++ b/runtime/testruntime/connectors.go @@ -13,8 +13,21 @@ import ( "github.com/testcontainers/testcontainers-go/modules/clickhouse" ) +// AcquireConnector acquires a test connector by name. +// For a list of available connectors, see the Connectors map below. +func AcquireConnector(t TestingT, name string) map[string]any { + acquire, ok := Connectors[name] + require.True(t, ok, "connector not found") + vars := acquire(t) + cfg := make(map[string]any, len(vars)) + for k, v := range vars { + cfg[k] = v + } + return cfg +} + // ConnectorAcquireFunc is a function that acquires a connector for a test. -// It should return a map of variables to add to the test runtime instance. +// It should return a map of config keys suitable for passing to drivers.Open. type ConnectorAcquireFunc func(t TestingT) (vars map[string]string) // Connectors is a map of available connectors for use in tests. @@ -61,7 +74,7 @@ var Connectors = map[string]ConnectorAcquireFunc{ require.NoError(t, err) dsn := fmt.Sprintf("clickhouse://clickhouse:clickhouse@%v:%v", host, port.Port()) - return map[string]string{"connector.clickhouse.dsn": dsn} + return map[string]string{"dsn": dsn} }, // druid connects to a real Druid cluster using the connection string in RILL_RUNTIME_DRUID_TEST_DSN. @@ -77,6 +90,6 @@ var Connectors = map[string]ConnectorAcquireFunc{ dsn := os.Getenv("RILL_RUNTIME_DRUID_TEST_DSN") require.NotEmpty(t, dsn, "Druid test DSN not configured") - return map[string]string{"connector.druid.dsn": dsn} + return map[string]string{"dsn": dsn} }, } diff --git a/web-common/src/features/dashboards/tab-bar/TabBar.svelte b/web-common/src/features/dashboards/tab-bar/TabBar.svelte index 9853aeb6f1d..be5314c0ba1 100644 --- a/web-common/src/features/dashboards/tab-bar/TabBar.svelte +++ b/web-common/src/features/dashboards/tab-bar/TabBar.svelte @@ -1,90 +1,75 @@