Skip to content

Commit

Permalink
Expose arrow flight sql port in disaggregated service
Browse files Browse the repository at this point in the history
  • Loading branch information
hechao committed Dec 3, 2024
1 parent 37f4926 commit 9076c16
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/common/utils/resource/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ func getProbe(port int32, path string, commands []string, pt ProbeType) corev1.P
func getTcpSocket(port int32) corev1.ProbeHandler {
return corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt(int(port)),
Port: intstr.FromInt32(port),
},
}
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/common/utils/resource/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,19 @@ func getInternalServicePort(config map[string]interface{}, componentType v1.Comp
return corev1.ServicePort{
Name: GetPortKey(QUERY_PORT),
Port: GetPort(config, QUERY_PORT),
TargetPort: intstr.FromInt(int(GetPort(config, QUERY_PORT))),
TargetPort: intstr.FromInt32(GetPort(config, QUERY_PORT)),
}
case v1.Component_BE, v1.Component_CN:
return corev1.ServicePort{
Name: GetPortKey(HEARTBEAT_SERVICE_PORT),
Port: GetPort(config, HEARTBEAT_SERVICE_PORT),
TargetPort: intstr.FromInt(int(GetPort(config, HEARTBEAT_SERVICE_PORT))),
TargetPort: intstr.FromInt32(GetPort(config, HEARTBEAT_SERVICE_PORT)),
}
case v1.Component_Broker:
return corev1.ServicePort{
Name: GetPortKey(BROKER_IPC_PORT),
Port: GetPort(config, BROKER_IPC_PORT),
TargetPort: intstr.FromInt(int(GetPort(config, BROKER_IPC_PORT))),
TargetPort: intstr.FromInt32(GetPort(config, BROKER_IPC_PORT)),
}
default:
klog.Infof("getInternalServicePort not supported the type %s", componentType)
Expand Down Expand Up @@ -173,18 +173,18 @@ func getFeServicePorts(config map[string]interface{}) (ports []corev1.ServicePor
editPort := GetPort(config, EDIT_LOG_PORT)
arrowFlightPort := GetPort(config, ARROW_FLIGHT_SQL_PORT)
ports = append(ports, corev1.ServicePort{
Port: httpPort, TargetPort: intstr.FromInt(int(httpPort)), Name: GetPortKey(HTTP_PORT),
Port: httpPort, TargetPort: intstr.FromInt32(httpPort), Name: GetPortKey(HTTP_PORT),
}, corev1.ServicePort{
Port: rpcPort, TargetPort: intstr.FromInt(int(rpcPort)), Name: GetPortKey(RPC_PORT),
Port: rpcPort, TargetPort: intstr.FromInt32(rpcPort), Name: GetPortKey(RPC_PORT),
}, corev1.ServicePort{
Port: queryPort, TargetPort: intstr.FromInt(int(queryPort)), Name: GetPortKey(QUERY_PORT),
Port: queryPort, TargetPort: intstr.FromInt32(queryPort), Name: GetPortKey(QUERY_PORT),
}, corev1.ServicePort{
Port: editPort, TargetPort: intstr.FromInt(int(editPort)), Name: GetPortKey(EDIT_LOG_PORT),
Port: editPort, TargetPort: intstr.FromInt32(editPort), Name: GetPortKey(EDIT_LOG_PORT),
})

if arrowFlightPort != -1 {
ports = append(ports, corev1.ServicePort{
Port: arrowFlightPort, TargetPort: intstr.FromInt(int(arrowFlightPort)), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT),
Port: arrowFlightPort, TargetPort: intstr.FromInt32(arrowFlightPort), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT),
})
}

Expand All @@ -199,18 +199,18 @@ func getBeServicePorts(config map[string]interface{}) (ports []corev1.ServicePor
arrowFlightPort := GetPort(config, ARROW_FLIGHT_SQL_PORT)

ports = append(ports, corev1.ServicePort{
Port: bePort, TargetPort: intstr.FromInt(int(bePort)), Name: GetPortKey(BE_PORT),
Port: bePort, TargetPort: intstr.FromInt32(bePort), Name: GetPortKey(BE_PORT),
}, corev1.ServicePort{
Port: webseverPort, TargetPort: intstr.FromInt(int(webseverPort)), Name: GetPortKey(WEBSERVER_PORT),
Port: webseverPort, TargetPort: intstr.FromInt32(webseverPort), Name: GetPortKey(WEBSERVER_PORT),
}, corev1.ServicePort{
Port: heartPort, TargetPort: intstr.FromInt(int(heartPort)), Name: GetPortKey(HEARTBEAT_SERVICE_PORT),
Port: heartPort, TargetPort: intstr.FromInt32(heartPort), Name: GetPortKey(HEARTBEAT_SERVICE_PORT),
}, corev1.ServicePort{
Port: brpcPort, TargetPort: intstr.FromInt(int(brpcPort)), Name: GetPortKey(BRPC_PORT),
Port: brpcPort, TargetPort: intstr.FromInt32(brpcPort), Name: GetPortKey(BRPC_PORT),
})

if arrowFlightPort != -1 {
ports = append(ports, corev1.ServicePort{
Port: arrowFlightPort, TargetPort: intstr.FromInt(int(arrowFlightPort)), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT),
Port: arrowFlightPort, TargetPort: intstr.FromInt32(arrowFlightPort), Name: GetPortKey(ARROW_FLIGHT_SQL_PORT),
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/common/utils/resource/service_disaggregated_ms.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func getDMSServicePort(brpcPort int32, componentType mv1.ComponentType) corev1.S
return corev1.ServicePort{
Name: GetPortKey(BRPC_LISTEN_PORT),
Port: brpcPort,
TargetPort: intstr.FromInt(int(brpcPort)),
TargetPort: intstr.FromInt32(brpcPort),
}
case mv1.Component_RC:
return corev1.ServicePort{
Name: GetPortKey(BRPC_LISTEN_PORT),
Port: brpcPort,
TargetPort: intstr.FromInt(int(brpcPort)),
TargetPort: intstr.FromInt32(brpcPort),
}
default:
klog.Infof("getDMSInternalServicePort not supported the type %s", componentType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,33 @@ func newComputeServicePorts(cvs map[string]interface{}, svcConf *dv1.ExportServi
webserverPort := resource.GetPort(cvs, resource.WEBSERVER_PORT)
heartbeatPort := resource.GetPort(cvs, resource.HEARTBEAT_SERVICE_PORT)
brpcPort := resource.GetPort(cvs, resource.BRPC_PORT)
arrowFlightPort := resource.GetPort(cvs, resource.ARROW_FLIGHT_SQL_PORT)
sps := []corev1.ServicePort{{
Name: resource.GetPortKey(resource.BE_PORT),
TargetPort: intstr.FromInt(int(bePort)),
TargetPort: intstr.FromInt32(bePort),
Port: bePort,
}, {
Name: resource.GetPortKey(resource.WEBSERVER_PORT),
TargetPort: intstr.FromInt(int(webserverPort)),
TargetPort: intstr.FromInt32(webserverPort),
Port: webserverPort,
}, {
Name: resource.GetPortKey(resource.HEARTBEAT_SERVICE_PORT),
TargetPort: intstr.FromInt(int(heartbeatPort)),
TargetPort: intstr.FromInt32(heartbeatPort),
Port: heartbeatPort,
}, {
Name: resource.GetPortKey(resource.BRPC_PORT),
TargetPort: intstr.FromInt(int(brpcPort)),
TargetPort: intstr.FromInt32(brpcPort),
Port: brpcPort,
}}

if arrowFlightPort != -1 {
sps = append(sps, corev1.ServicePort{
Name: resource.GetPortKey(resource.ARROW_FLIGHT_SQL_PORT),
TargetPort: intstr.FromInt32(arrowFlightPort),
Port: arrowFlightPort,
})
}

if svcConf == nil || svcConf.Type != corev1.ServiceTypeNodePort {
return sps
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,24 @@ func newFEServicePorts(config map[string]interface{}, svcConf *dv1.ExportService
rpcPort := resource.GetPort(config, resource.RPC_PORT)
queryPort := resource.GetPort(config, resource.QUERY_PORT)
editPort := resource.GetPort(config, resource.EDIT_LOG_PORT)
arrowFlightPort := resource.GetPort(config, resource.ARROW_FLIGHT_SQL_PORT)
ports := []corev1.ServicePort{
{
Port: httpPort, TargetPort: intstr.FromInt(int(httpPort)), Name: resource.GetPortKey(resource.HTTP_PORT),
Port: httpPort, TargetPort: intstr.FromInt32(httpPort), Name: resource.GetPortKey(resource.HTTP_PORT),
}, {
Port: rpcPort, TargetPort: intstr.FromInt(int(rpcPort)), Name: resource.GetPortKey(resource.RPC_PORT),
Port: rpcPort, TargetPort: intstr.FromInt32(rpcPort), Name: resource.GetPortKey(resource.RPC_PORT),
}, {
Port: queryPort, TargetPort: intstr.FromInt(int(queryPort)), Name: resource.GetPortKey(resource.QUERY_PORT),
Port: queryPort, TargetPort: intstr.FromInt32(queryPort), Name: resource.GetPortKey(resource.QUERY_PORT),
}, {
Port: editPort, TargetPort: intstr.FromInt(int(editPort)), Name: resource.GetPortKey(resource.EDIT_LOG_PORT),
Port: editPort, TargetPort: intstr.FromInt32(editPort), Name: resource.GetPortKey(resource.EDIT_LOG_PORT),
}}

if arrowFlightPort != -1 {
ports = append(ports, corev1.ServicePort{
Port: arrowFlightPort, TargetPort: intstr.FromInt32(arrowFlightPort), Name: resource.GetPortKey(resource.ARROW_FLIGHT_SQL_PORT),
})
}

if svcConf == nil || svcConf.Type != corev1.ServiceTypeNodePort {
return ports
}
Expand Down Expand Up @@ -119,7 +126,7 @@ func (dfc *DisaggregatedFEController) newInternalService(ddc *dv1.DorisDisaggreg
func getInternalServicePort(config map[string]interface{}) corev1.ServicePort {
httpPort := resource.GetPort(config, resource.QUERY_PORT)
return corev1.ServicePort{
Port: httpPort, TargetPort: intstr.FromInt(int(httpPort)), Name: resource.GetPortKey(resource.QUERY_PORT),
Port: httpPort, TargetPort: intstr.FromInt32(httpPort), Name: resource.GetPortKey(resource.QUERY_PORT),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (dms *DisaggregatedMSController) newMSServicePorts(config map[string]interf
{
Name: resource.GetPortKey(resource.BRPC_LISTEN_PORT),
Port: brpcPort,
TargetPort: intstr.FromInt(int(brpcPort)),
TargetPort: intstr.FromInt32(brpcPort),
},
}

Expand Down

0 comments on commit 9076c16

Please sign in to comment.