Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Prometheus-scrapable ports to all pods in the cluster #434

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions pkg/analyzer/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,32 @@ func discoverConnections(resources []*Resource, links []*Service, logger Logger)
logger.Debugf("services matched to %v: %v", destRes.Resource.Name, deploymentServices)
for _, svc := range deploymentServices {
srcRes := findSource(resources, svc)
if len(srcRes) > 0 {
for _, r := range srcRes {
if !r.equals(destRes) {
logger.Debugf("source: %s target: %s link: %s", r.Resource.Name, destRes.Resource.Name, svc.Resource.Name)
connections = append(connections, &Connections{Source: r, Target: destRes, Link: svc})
}
for _, r := range srcRes {
if !r.equals(destRes) {
logger.Debugf("source: %s target: %s link: %s", r.Resource.Name, destRes.Resource.Name, svc.Resource.Name)
connections = append(connections, &Connections{Source: r, Target: destRes, Link: svc})
}
} else {
}
if len(srcRes) == 0 || svcHasExposedPorts(svc) { // found no sources, but some ports need to be exposed
connections = append(connections, &Connections{Target: destRes, Link: svc}) // indicates a source-less service
}
}
}
return connections
}

func svcHasExposedPorts(svc *Service) bool {
if svc.Resource.ExposeExternally {
return true
}
for _, port := range svc.Resource.Network {
if port.exposeToCluster {
return true
}
}
return false
}

// areSelectorsContained returns true if selectors2 is contained in selectors1
func areSelectorsContained(selectors1 map[string]string, selectors2 []string) bool {
elementMap := make(map[string]string)
Expand Down
51 changes: 43 additions & 8 deletions pkg/analyzer/info_to_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/cli-runtime/pkg/resource"
)
Expand Down Expand Up @@ -104,16 +105,40 @@ func k8sServiceFromInfo(info *resource.Info) (*Service, error) {
serviceCtx.Resource.Type = svcObj.Spec.Type
serviceCtx.Resource.Selectors = matchLabelSelectorToStrLabels(svcObj.Spec.Selector)
serviceCtx.Resource.ExposeExternally = (svcObj.Spec.Type == v1.ServiceTypeLoadBalancer || svcObj.Spec.Type == v1.ServiceTypeNodePort)
serviceCtx.Resource.ExposeToCluster = false

prometheusPort, prometheusPortValid := exposedPrometheusScrapePort(svcObj.Annotations)
for _, p := range svcObj.Spec.Ports {
n := SvcNetworkAttr{Port: int(p.Port), TargetPort: p.TargetPort, Protocol: p.Protocol}
n := SvcNetworkAttr{Port: int(p.Port), TargetPort: p.TargetPort, Protocol: p.Protocol, name: p.Name}
n.exposeToCluster = prometheusPortValid && n.equals(prometheusPort)
serviceCtx.Resource.Network = append(serviceCtx.Resource.Network, n)
}

return &serviceCtx, nil
}

const defaultPrometheusScrapePort = 9090

func exposedPrometheusScrapePort(annotations map[string]string) (*intstr.IntOrString, bool) {
scrapeOn := false
scrapePort := intstr.FromInt(defaultPrometheusScrapePort)
for k, v := range annotations {
if strings.Contains(k, "prometheus") {
if strings.HasSuffix(k, "/scrape") && v == "true" {
scrapeOn = true
} else if strings.HasSuffix(k, "/port") {
scrapePort = intstr.Parse(v)
}
}
}

return &scrapePort, scrapeOn
}

func (port *SvcNetworkAttr) equals(intStrPort *intstr.IntOrString) bool {
return (port.name != "" && port.name == intStrPort.StrVal) ||
(port.Port > 0 && port.Port == int(intStrPort.IntVal))
}

// ocRouteFromInfo updates servicesToExpose based on an OpenShift Route object
func ocRouteFromInfo(info *resource.Info, toExpose servicesToExpose) error {
routeObj := parseResourceFromInfo[ocroutev1.Route](info)
Expand All @@ -123,12 +148,12 @@ func ocRouteFromInfo(info *resource.Info, toExpose servicesToExpose) error {

exposedServicesInNamespace, ok := toExpose[routeObj.Namespace]
if !ok {
toExpose[routeObj.Namespace] = map[string]bool{}
toExpose[routeObj.Namespace] = map[string][]*intstr.IntOrString{}
exposedServicesInNamespace = toExpose[routeObj.Namespace]
}
exposedServicesInNamespace[routeObj.Spec.To.Name] = false
appendToSliceInMap(exposedServicesInNamespace, routeObj.Spec.To.Name, &routeObj.Spec.Port.TargetPort)
for _, backend := range routeObj.Spec.AlternateBackends {
exposedServicesInNamespace[backend.Name] = false
appendToSliceInMap(exposedServicesInNamespace, backend.Name, &routeObj.Spec.Port.TargetPort)
}

return nil
Expand All @@ -143,13 +168,14 @@ func k8sIngressFromInfo(info *resource.Info, toExpose servicesToExpose) error {

exposedServicesInNamespace, ok := toExpose[ingressObj.Namespace]
if !ok {
toExpose[ingressObj.Namespace] = map[string]bool{}
toExpose[ingressObj.Namespace] = map[string][]*intstr.IntOrString{}
exposedServicesInNamespace = toExpose[ingressObj.Namespace]
}

defaultBackend := ingressObj.Spec.DefaultBackend
if defaultBackend != nil && defaultBackend.Service != nil {
exposedServicesInNamespace[defaultBackend.Service.Name] = false
portToAppend := portFromServiceBackendPort(&defaultBackend.Service.Port)
appendToSliceInMap(exposedServicesInNamespace, defaultBackend.Service.Name, portToAppend)
}

for ruleIdx := range ingressObj.Spec.Rules {
Expand All @@ -158,7 +184,8 @@ func k8sIngressFromInfo(info *resource.Info, toExpose servicesToExpose) error {
for pathIdx := range rule.HTTP.Paths {
svc := rule.HTTP.Paths[pathIdx].Backend.Service
if svc != nil {
exposedServicesInNamespace[svc.Name] = false
portToAppend := portFromServiceBackendPort(&svc.Port)
appendToSliceInMap(exposedServicesInNamespace, svc.Name, portToAppend)
}
}
}
Expand All @@ -167,6 +194,14 @@ func k8sIngressFromInfo(info *resource.Info, toExpose servicesToExpose) error {
return nil
}

func portFromServiceBackendPort(sbp *networkv1.ServiceBackendPort) *intstr.IntOrString {
res := intstr.FromInt32(sbp.Number)
if sbp.Number == 0 {
res = intstr.FromString(sbp.Name)
}
return &res
}

func parseDeployResource(podSpec *v1.PodTemplateSpec, obj metaV1.Object, resourceCtx *Resource) {
resourceCtx.Resource.Name = obj.GetName()
resourceCtx.Resource.Namespace = obj.GetNamespace()
Expand Down
2 changes: 1 addition & 1 deletion pkg/analyzer/policies_synthesizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestExtractConnectionsCustomWalk2(t *testing.T) {
synthesizer := NewPoliciesSynthesizer(WithWalkFn(filepath.WalkDir))
resources, conns, errs := synthesizer.extractConnectionsFromFolderPaths([]string{dirPath})
require.Len(t, errs, 0)
require.Len(t, conns, 14)
require.Len(t, conns, 15)
require.Len(t, resources, 14)
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/analyzer/resource_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,16 @@ func (ra *resourceAccumulator) exposeServices() {
if !ok {
continue
}
if exposeExternally, ok := exposedServicesInNamespace[svc.Resource.Name]; ok {
if exposeExternally {
svc.Resource.ExposeExternally = true
} else {
svc.Resource.ExposeToCluster = true
portsToExpose, ok := exposedServicesInNamespace[svc.Resource.Name]
if !ok {
continue
}
for i := range svc.Resource.Network {
port := &svc.Resource.Network[i]
for _, portToExpose := range portsToExpose {
if port.equals(portToExpose) {
port.exposeToCluster = true
}
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/analyzer/synth_netpols.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,12 @@ func determineConnectivityPerDeployment(connections []*Connections) []*deploymen
for _, conn := range connections {
srcDeploy := findOrAddDeploymentConn(conn.Source, deploysConnectivity)
dstDeploy := findOrAddDeploymentConn(conn.Target, deploysConnectivity)
targetPorts := toNetpolPorts(conn.Link.Resource.Network)
targetPorts := toNetpolPorts(conn.Link.Resource.Network, srcDeploy == nil && !conn.Link.Resource.ExposeExternally)
if conn.Source != nil && len(conn.Source.Resource.UsedPorts) > 0 {
targetPorts = toNetpolPorts(conn.Source.Resource.UsedPorts)
targetPorts = toNetpolPorts(conn.Source.Resource.UsedPorts, false)
}
if len(targetPorts) == 0 {
continue
}

if srcDeploy != nil {
Expand All @@ -112,10 +115,10 @@ func determineConnectivityPerDeployment(connections []*Connections) []*deploymen
switch {
case conn.Link.Resource.ExposeExternally:
dstDeploy.addIngressRule([]network.NetworkPolicyPeer{}, targetPorts) // allowing traffic from all sources
case conn.Link.Resource.ExposeToCluster:
case srcDeploy == nil:
peer := network.NetworkPolicyPeer{NamespaceSelector: &metaV1.LabelSelector{}}
dstDeploy.addIngressRule([]network.NetworkPolicyPeer{peer}, targetPorts) // allowing traffic from all cluster sources
case conn.Source != nil:
default:
netpolPeer := getNetpolPeer(dstDeploy, srcDeploy)
dstDeploy.addIngressRule([]network.NetworkPolicyPeer{netpolPeer}, targetPorts) // allow traffic only from this specific source
}
Expand Down Expand Up @@ -161,9 +164,12 @@ func getDeployConnSelector(deployConn *deploymentConnectivity) *metaV1.LabelSele
return &metaV1.LabelSelector{MatchLabels: deployConn.Resource.Resource.Labels}
}

func toNetpolPorts(ports []SvcNetworkAttr) []network.NetworkPolicyPort {
func toNetpolPorts(ports []SvcNetworkAttr, exposedOnly bool) []network.NetworkPolicyPort {
netpolPorts := make([]network.NetworkPolicyPort, 0, len(ports))
for _, port := range ports {
if exposedOnly && !port.exposeToCluster {
continue
}
protocol := port.Protocol
if protocol == "" {
protocol = core.ProtocolTCP
Expand Down
15 changes: 8 additions & 7 deletions pkg/analyzer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ func (r1 *Resource) equals(r2 *Resource) bool {

// SvcNetworkAttr is used to store port information
type SvcNetworkAttr struct {
Port int `json:"port,omitempty"`
TargetPort intstr.IntOrString `json:"target_port,omitempty"`
Protocol corev1.Protocol `json:"protocol,omitempty"`
name string
Port int `json:"port,omitempty"`
TargetPort intstr.IntOrString `json:"target_port,omitempty"`
Protocol corev1.Protocol `json:"protocol,omitempty"`
exposeToCluster bool
}

// Service is used to store information about a K8s Service
Expand All @@ -64,7 +66,6 @@ type Service struct {
FilePath string `json:"filepath,omitempty"`
Kind string `json:"kind,omitempty"`
Network []SvcNetworkAttr `json:"network,omitempty"`
ExposeToCluster bool `json:"-"`
ExposeExternally bool `json:"-"`
} `json:"resource,omitempty"`
}
Expand All @@ -76,6 +77,6 @@ type Connections struct {
Link *Service `json:"link"`
}

// A map from namespaces to a map of service names in each namespaces.
// For each service we also hold whether they should be exposed externally (true) or just globally inside the cluster (false)
type servicesToExpose map[string]map[string]bool
// A map from namespaces to a map of service names in each namespaces, which we want to expose within the cluster.
// For each service we hold the ports that should be exposed
type servicesToExpose map[string]map[string][]*intstr.IntOrString
4 changes: 4 additions & 0 deletions pkg/analyzer/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ func appendAndLogNewError(errs []FileProcessingError, newErr *FileProcessingErro
errs = append(errs, *newErr)
return errs
}

func appendToSliceInMap[K comparable, V any](m map[K][]V, key K, newVal V) {
m[key] = append(m[key], newVal)
}
51 changes: 51 additions & 0 deletions tests/sockshop/expected_netpol_output.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
{
"protocol": "TCP",
"port": 80
},
{
"protocol": "TCP",
"port": 9090
}
],
"from": [
Expand All @@ -34,6 +38,19 @@
}
}
]
},
{
"ports": [
{
"protocol": "TCP",
"port": 9090
}
],
"from": [
{
"namespaceSelector": {}
}
]
}
],
"egress": [
Expand Down Expand Up @@ -203,6 +220,10 @@
{
"protocol": "TCP",
"port": 80
},
{
"protocol": "TCP",
"port": 9090
}
],
"to": [
Expand Down Expand Up @@ -309,6 +330,21 @@
"name": "rabbitmq"
}
},
"ingress": [
{
"ports": [
{
"protocol": "TCP",
adisos marked this conversation as resolved.
Show resolved Hide resolved
"port": "exporter"
}
],
"from": [
{
"namespaceSelector": {}
}
]
}
],
"policyTypes": [
"Ingress",
"Egress"
Expand Down Expand Up @@ -369,6 +405,21 @@
"name": "user"
}
},
"ingress": [
{
"ports": [
{
"protocol": "TCP",
"port": 7070
}
],
"from": [
{
"namespaceSelector": {}
}
]
}
],
"egress": [
{
"ports": [
Expand Down
3 changes: 3 additions & 0 deletions tests/sockshop/manifests/02-carts-svc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,8 @@ spec:
# the port that this service should serve on
- port: 80
targetPort: 80
- port: 9090
name: exporter
protocol: TCP
selector:
name: carts
5 changes: 5 additions & 0 deletions tests/sockshop/manifests/26-user-svc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ metadata:
name: user
annotations:
prometheus.io/scrape: 'true'
prometheus.io/port: '7070'
labels:
name: user
namespace: sock-shop
spec:
ports:
# the port that this service should serve on
- port: 80
name: http
targetPort: 80
- port: 7070
adisos marked this conversation as resolved.
Show resolved Hide resolved
name: metrics
targetPort: 7070
selector:
name: user