From 9076c163df53137fd64f9b0c33bfc8f5d771e029 Mon Sep 17 00:00:00 2001 From: hechao Date: Tue, 3 Dec 2024 10:56:04 +0800 Subject: [PATCH] Expose arrow flight sql port in disaggregated service --- pkg/common/utils/resource/pod.go | 2 +- pkg/common/utils/resource/service.go | 26 +++++++++---------- .../resource/service_disaggregated_ms.go | 4 +-- .../computegroups/service.go | 17 +++++++++--- .../disaggregated_fe/service.go | 17 ++++++++---- .../metaservice/service.go | 2 +- 6 files changed, 42 insertions(+), 26 deletions(-) diff --git a/pkg/common/utils/resource/pod.go b/pkg/common/utils/resource/pod.go index ea6388a..ab9525b 100644 --- a/pkg/common/utils/resource/pod.go +++ b/pkg/common/utils/resource/pod.go @@ -824,7 +824,7 @@ func getProbe(port int32, path string, commands []string, pt ProbeType) corev1.P func getTcpSocket(port int32) corev1.ProbeHandler { return corev1.ProbeHandler{ TCPSocket: &corev1.TCPSocketAction{ - Port: intstr.FromInt(int(port)), + Port: intstr.FromInt32(port), }, } } diff --git a/pkg/common/utils/resource/service.go b/pkg/common/utils/resource/service.go index 6e307d6..bc78ed5 100644 --- a/pkg/common/utils/resource/service.go +++ b/pkg/common/utils/resource/service.go @@ -73,19 +73,19 @@ func getInternalServicePort(config map[string]interface{}, componentType v1.Comp return corev1.ServicePort{ Name: GetPortKey(QUERY_PORT), Port: GetPort(config, QUERY_PORT), - TargetPort: intstr.FromInt(int(GetPort(config, QUERY_PORT))), + TargetPort: intstr.FromInt32(GetPort(config, QUERY_PORT)), } case v1.Component_BE, v1.Component_CN: return corev1.ServicePort{ Name: GetPortKey(HEARTBEAT_SERVICE_PORT), Port: GetPort(config, HEARTBEAT_SERVICE_PORT), - TargetPort: intstr.FromInt(int(GetPort(config, HEARTBEAT_SERVICE_PORT))), + TargetPort: intstr.FromInt32(GetPort(config, HEARTBEAT_SERVICE_PORT)), } case v1.Component_Broker: return corev1.ServicePort{ Name: GetPortKey(BROKER_IPC_PORT), Port: GetPort(config, BROKER_IPC_PORT), - TargetPort: intstr.FromInt(int(GetPort(config, BROKER_IPC_PORT))), + TargetPort: intstr.FromInt32(GetPort(config, BROKER_IPC_PORT)), } default: klog.Infof("getInternalServicePort not supported the type %s", componentType) @@ -173,18 +173,18 @@ func getFeServicePorts(config map[string]interface{}) (ports []corev1.ServicePor editPort := GetPort(config, EDIT_LOG_PORT) arrowFlightPort := GetPort(config, ARROW_FLIGHT_SQL_PORT) ports = append(ports, corev1.ServicePort{ - Port: httpPort, TargetPort: intstr.FromInt(int(httpPort)), Name: GetPortKey(HTTP_PORT), + Port: httpPort, TargetPort: intstr.FromInt32(httpPort), Name: GetPortKey(HTTP_PORT), }, corev1.ServicePort{ - Port: rpcPort, TargetPort: intstr.FromInt(int(rpcPort)), Name: GetPortKey(RPC_PORT), + Port: rpcPort, TargetPort: intstr.FromInt32(rpcPort), Name: GetPortKey(RPC_PORT), }, corev1.ServicePort{ - Port: queryPort, TargetPort: intstr.FromInt(int(queryPort)), Name: GetPortKey(QUERY_PORT), + Port: queryPort, TargetPort: intstr.FromInt32(queryPort), Name: GetPortKey(QUERY_PORT), }, corev1.ServicePort{ - Port: editPort, TargetPort: intstr.FromInt(int(editPort)), Name: GetPortKey(EDIT_LOG_PORT), + Port: editPort, TargetPort: intstr.FromInt32(editPort), Name: GetPortKey(EDIT_LOG_PORT), }) if arrowFlightPort != -1 { ports = append(ports, corev1.ServicePort{ - Port: arrowFlightPort, TargetPort: intstr.FromInt(int(arrowFlightPort)), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT), + Port: arrowFlightPort, TargetPort: intstr.FromInt32(arrowFlightPort), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT), }) } @@ -199,18 +199,18 @@ func getBeServicePorts(config map[string]interface{}) (ports []corev1.ServicePor arrowFlightPort := GetPort(config, ARROW_FLIGHT_SQL_PORT) ports = append(ports, corev1.ServicePort{ - Port: bePort, TargetPort: intstr.FromInt(int(bePort)), Name: GetPortKey(BE_PORT), + Port: bePort, TargetPort: intstr.FromInt32(bePort), Name: GetPortKey(BE_PORT), }, corev1.ServicePort{ - Port: webseverPort, TargetPort: intstr.FromInt(int(webseverPort)), Name: GetPortKey(WEBSERVER_PORT), + Port: webseverPort, TargetPort: intstr.FromInt32(webseverPort), Name: GetPortKey(WEBSERVER_PORT), }, corev1.ServicePort{ - Port: heartPort, TargetPort: intstr.FromInt(int(heartPort)), Name: GetPortKey(HEARTBEAT_SERVICE_PORT), + Port: heartPort, TargetPort: intstr.FromInt32(heartPort), Name: GetPortKey(HEARTBEAT_SERVICE_PORT), }, corev1.ServicePort{ - Port: brpcPort, TargetPort: intstr.FromInt(int(brpcPort)), Name: GetPortKey(BRPC_PORT), + Port: brpcPort, TargetPort: intstr.FromInt32(brpcPort), Name: GetPortKey(BRPC_PORT), }) if arrowFlightPort != -1 { ports = append(ports, corev1.ServicePort{ - Port: arrowFlightPort, TargetPort: intstr.FromInt(int(arrowFlightPort)), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT), + Port: arrowFlightPort, TargetPort: intstr.FromInt32(arrowFlightPort), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT), }) } diff --git a/pkg/common/utils/resource/service_disaggregated_ms.go b/pkg/common/utils/resource/service_disaggregated_ms.go index fed31a1..c92663b 100644 --- a/pkg/common/utils/resource/service_disaggregated_ms.go +++ b/pkg/common/utils/resource/service_disaggregated_ms.go @@ -65,13 +65,13 @@ func getDMSServicePort(brpcPort int32, componentType mv1.ComponentType) corev1.S return corev1.ServicePort{ Name: GetPortKey(BRPC_LISTEN_PORT), Port: brpcPort, - TargetPort: intstr.FromInt(int(brpcPort)), + TargetPort: intstr.FromInt32(brpcPort), } case mv1.Component_RC: return corev1.ServicePort{ Name: GetPortKey(BRPC_LISTEN_PORT), Port: brpcPort, - TargetPort: intstr.FromInt(int(brpcPort)), + TargetPort: intstr.FromInt32(brpcPort), } default: klog.Infof("getDMSInternalServicePort not supported the type %s", componentType) diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go index a68a54f..7d0218e 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go @@ -59,24 +59,33 @@ func newComputeServicePorts(cvs map[string]interface{}, svcConf *dv1.ExportServi webserverPort := resource.GetPort(cvs, resource.WEBSERVER_PORT) heartbeatPort := resource.GetPort(cvs, resource.HEARTBEAT_SERVICE_PORT) brpcPort := resource.GetPort(cvs, resource.BRPC_PORT) + arrowFlightPort := resource.GetPort(cvs, resource.ARROW_FLIGHT_SQL_PORT) sps := []corev1.ServicePort{{ Name: resource.GetPortKey(resource.BE_PORT), - TargetPort: intstr.FromInt(int(bePort)), + TargetPort: intstr.FromInt32(bePort), Port: bePort, }, { Name: resource.GetPortKey(resource.WEBSERVER_PORT), - TargetPort: intstr.FromInt(int(webserverPort)), + TargetPort: intstr.FromInt32(webserverPort), Port: webserverPort, }, { Name: resource.GetPortKey(resource.HEARTBEAT_SERVICE_PORT), - TargetPort: intstr.FromInt(int(heartbeatPort)), + TargetPort: intstr.FromInt32(heartbeatPort), Port: heartbeatPort, }, { Name: resource.GetPortKey(resource.BRPC_PORT), - TargetPort: intstr.FromInt(int(brpcPort)), + TargetPort: intstr.FromInt32(brpcPort), Port: brpcPort, }} + if arrowFlightPort != -1 { + sps = append(sps, corev1.ServicePort{ + Name: resource.GetPortKey(resource.ARROW_FLIGHT_SQL_PORT), + TargetPort: intstr.FromInt32(arrowFlightPort), + Port: arrowFlightPort, + }) + } + if svcConf == nil || svcConf.Type != corev1.ServiceTypeNodePort { return sps } diff --git a/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/service.go b/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/service.go index 39d6f8c..e2f2815 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/service.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/service.go @@ -58,17 +58,24 @@ func newFEServicePorts(config map[string]interface{}, svcConf *dv1.ExportService rpcPort := resource.GetPort(config, resource.RPC_PORT) queryPort := resource.GetPort(config, resource.QUERY_PORT) editPort := resource.GetPort(config, resource.EDIT_LOG_PORT) + arrowFlightPort := resource.GetPort(config, resource.ARROW_FLIGHT_SQL_PORT) ports := []corev1.ServicePort{ { - Port: httpPort, TargetPort: intstr.FromInt(int(httpPort)), Name: resource.GetPortKey(resource.HTTP_PORT), + Port: httpPort, TargetPort: intstr.FromInt32(httpPort), Name: resource.GetPortKey(resource.HTTP_PORT), }, { - Port: rpcPort, TargetPort: intstr.FromInt(int(rpcPort)), Name: resource.GetPortKey(resource.RPC_PORT), + Port: rpcPort, TargetPort: intstr.FromInt32(rpcPort), Name: resource.GetPortKey(resource.RPC_PORT), }, { - Port: queryPort, TargetPort: intstr.FromInt(int(queryPort)), Name: resource.GetPortKey(resource.QUERY_PORT), + Port: queryPort, TargetPort: intstr.FromInt32(queryPort), Name: resource.GetPortKey(resource.QUERY_PORT), }, { - Port: editPort, TargetPort: intstr.FromInt(int(editPort)), Name: resource.GetPortKey(resource.EDIT_LOG_PORT), + Port: editPort, TargetPort: intstr.FromInt32(editPort), Name: resource.GetPortKey(resource.EDIT_LOG_PORT), }} + if arrowFlightPort != -1 { + ports = append(ports, corev1.ServicePort{ + Port: arrowFlightPort, TargetPort: intstr.FromInt32(arrowFlightPort), Name: resource.GetPortKey(resource.ARROW_FLIGHT_SQL_PORT), + }) + } + if svcConf == nil || svcConf.Type != corev1.ServiceTypeNodePort { return ports } @@ -119,7 +126,7 @@ func (dfc *DisaggregatedFEController) newInternalService(ddc *dv1.DorisDisaggreg func getInternalServicePort(config map[string]interface{}) corev1.ServicePort { httpPort := resource.GetPort(config, resource.QUERY_PORT) return corev1.ServicePort{ - Port: httpPort, TargetPort: intstr.FromInt(int(httpPort)), Name: resource.GetPortKey(resource.QUERY_PORT), + Port: httpPort, TargetPort: intstr.FromInt32(httpPort), Name: resource.GetPortKey(resource.QUERY_PORT), } } diff --git a/pkg/controller/sub_controller/disaggregated_cluster/metaservice/service.go b/pkg/controller/sub_controller/disaggregated_cluster/metaservice/service.go index e01feea..97b0c11 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/metaservice/service.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/metaservice/service.go @@ -63,7 +63,7 @@ func (dms *DisaggregatedMSController) newMSServicePorts(config map[string]interf { Name: resource.GetPortKey(resource.BRPC_LISTEN_PORT), Port: brpcPort, - TargetPort: intstr.FromInt(int(brpcPort)), + TargetPort: intstr.FromInt32(brpcPort), }, }