From 8e475cd4c4df1bc6fb73ebf0be6f72730cfe3f19 Mon Sep 17 00:00:00 2001 From: AmaliMatharaarachchi Date: Thu, 29 Feb 2024 10:59:02 +0530 Subject: [PATCH] remove consul --- adapter/config/default_config.go | 11 - adapter/config/types.go | 23 - adapter/internal/discovery/xds/consul.go | 202 ------- adapter/internal/logging/logging_constant.go | 5 - .../envoyconf/routes_with_clusters.go | 11 - .../oasparser/model/adapter_internal_api.go | 9 +- adapter/internal/svcdiscovery/client.go | 505 ------------------ adapter/internal/svcdiscovery/utils.go | 114 ---- adapter/internal/svcdiscovery/utils_test.go | 230 -------- adapter/internal/svcdiscovery/watcher.go | 248 --------- 10 files changed, 2 insertions(+), 1356 deletions(-) delete mode 100644 adapter/internal/discovery/xds/consul.go delete mode 100644 adapter/internal/svcdiscovery/client.go delete mode 100644 adapter/internal/svcdiscovery/utils.go delete mode 100644 adapter/internal/svcdiscovery/utils_test.go delete mode 100644 adapter/internal/svcdiscovery/watcher.go diff --git a/adapter/config/default_config.go b/adapter/config/default_config.go index c16823a2c..86630ad57 100644 --- a/adapter/config/default_config.go +++ b/adapter/config/default_config.go @@ -20,17 +20,6 @@ package config // Configuration object which is populated with default values. var defaultConfig = &Config{ Adapter: adapter{ - Consul: consul{ - Enabled: false, - URL: "https://169.254.1.1:8501", - PollInterval: 5, - ACLToken: "d3a2a719-4221-8c65-5212-58d4727427ac", - ApkServiceName: "wso2", - ServiceMeshEnabled: false, - CaCertFile: "/home/wso2/security/truststore/consul/consul-agent-ca.pem", - CertFile: "/home/wso2/security/truststore/consul/local-dc-client-consul-0.pem", - KeyFile: "/home/wso2/security/truststore/consul/local-dc-client-consul-0-key.pem", - }, Keystore: keystore{ KeyPath: "/home/wso2/security/keystore/adapter.key", CertPath: "/home/wso2/security/keystore/adapter.crt", diff --git a/adapter/config/types.go b/adapter/config/types.go index b4a4f77ee..f51f2577f 100644 --- a/adapter/config/types.go +++ b/adapter/config/types.go @@ -83,8 +83,6 @@ type Config struct { // Adapter related Configurations type adapter struct { - // Consul represents the configuration required to connect to consul service discovery - Consul consul // Keystore contains the keyFile and Cert File of the adapter Keystore keystore // Trusted Certificates @@ -164,27 +162,6 @@ type enforcer struct { Client httpClient } -type consul struct { - // Enabled whether consul service discovery should be enabled - Enabled bool - // URL url of the consul client in format: http(s)://host:port - URL string - // PollInterval how frequently consul API should be polled to get updates (in seconds) - PollInterval int - // ACLToken Access Control Token required to invoke HTTP API - ACLToken string - // ApkServiceName service name that Microgateway registered in Consul Service Mesh - ApkServiceName string - // ServiceMeshEnabled whether Consul service mesh is enabled - ServiceMeshEnabled bool - // CaCertFile path to the CA cert file(PEM encoded) required for tls connection between adapter and a consul client - CaCertFile string - // CertFile path to the cert file(PEM encoded) required for tls connection between adapter and a consul client - CertFile string - // KeyFile path to the key file(PEM encoded) required for tls connection between adapter and a consul client - KeyFile string -} - // Router to enforcer request body passing configurations type payloadPassingToEnforcer struct { MaxRequestBytes uint32 diff --git a/adapter/internal/discovery/xds/consul.go b/adapter/internal/discovery/xds/consul.go deleted file mode 100644 index 6a55ebfaa..000000000 --- a/adapter/internal/discovery/xds/consul.go +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2022, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "reflect" - "sync" - - corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" - logger "github.com/wso2/apk/adapter/internal/loggers" - logging "github.com/wso2/apk/adapter/internal/logging" - "github.com/wso2/apk/adapter/internal/svcdiscovery" - "google.golang.org/protobuf/types/known/anypb" -) - -var ( - onceUpdateMeshCerts sync.Once -) - -const ( - transportSocketName = "envoy.transport_sockets.tls" -) - -func startConsulServiceDiscovery(organizationID string) { - for apiKey, envoyInternalAPI := range orgAPIMap[organizationID] { - for _, cluster := range envoyInternalAPI.clusters { - if consulSyntax, ok := svcdiscovery.ClusterConsulKeyMap[cluster.Name]; ok { - svcdiscovery.InitConsul() //initialize consul client and load certs - onceUpdateMeshCerts.Do(func() { - if svcdiscovery.MeshEnabled { - go listenForMeshCertUpdates(organizationID) //runs in background - } - }) - query, errConSyn := svcdiscovery.ParseQueryString(consulSyntax) - if errConSyn != nil { - logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1402, logging.CRITICAL, "Consul syntax parse, error: %v", errConSyn.Error())) - return - } - logger.LoggerXds.Debugln("consul query values: ", query) - go getServiceDiscoveryData(query, cluster.Name, apiKey, organizationID) - } - } - } - -} - -func listenForMeshCertUpdates(organizationID string) { - for { - select { - case <-svcdiscovery.MeshUpdateSignal: - updateCertsForServiceMesh(organizationID) - } - } -} - -func updateCertsForServiceMesh(organizationID string) { - //update each cluster with new certs - for _, envoyInternalAPI := range orgAPIMap[organizationID] { - for _, cluster := range envoyInternalAPI.clusters { //iterate through all clusters - - if svcdiscovery.MeshCACert == "" || svcdiscovery.MeshServiceKey == "" || svcdiscovery.MeshServiceCert == "" { - logger.LoggerXds.Warn("Mesh certs are empty") - return - } - upstreamTLSContext := svcdiscovery.CreateUpstreamTLSContext(svcdiscovery.MeshCACert, - svcdiscovery.MeshServiceKey, svcdiscovery.MeshServiceCert) - - marshalledTLSContext, err := anypb.New(upstreamTLSContext) - if err != nil { - logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1403, logging.CRITICAL, "Internal Error while marshalling the upstream TLS Context, error: %v", err.Error())) - } else { - //envoy config - upstreamTransportSocket := &corev3.TransportSocket{ - Name: transportSocketName, - ConfigType: &corev3.TransportSocket_TypedConfig{ - TypedConfig: marshalledTLSContext, - }, - } - cluster.TransportSocket = upstreamTransportSocket - } - - } - } - - //send the update to Router - for apiKey := range orgAPIMap[organizationID] { - updateXDSClusterCache(apiKey, organizationID) - } -} - -func getServiceDiscoveryData(query svcdiscovery.Query, clusterName string, apiKey string, organizationID string) { - doneChan := make(chan bool) - svcdiscovery.ClusterConsulDoneChanMap[clusterName] = doneChan - resultChan := svcdiscovery.ConsulClientInstance.Poll(query, doneChan) - for { - select { - case queryResultsList, ok := <-resultChan: - if !ok { //ok==false --> result chan is closed - logger.LoggerXds.Debugln("closed the result channel for cluster name: ", clusterName) - return - } - //stop the process when API is deleted - if _, apiExists := orgAPIMap[organizationID][apiKey]; !apiExists { - logger.LoggerXds.Debugln("Consul service discovery stopped for cluster ", clusterName, " in API ", - apiKey, " upon API removal") - stopConsulDiscoveryFor(clusterName) - return - } - val := svcdiscovery.GetClusterConsulResultMap(clusterName) - if val != nil { - if !reflect.DeepEqual(val, queryResultsList) { - svcdiscovery.SetClusterConsulResultMap(clusterName, queryResultsList) - //update the envoy cluster - updateCluster(apiKey, clusterName, organizationID, queryResultsList) - } - } else { - logger.LoggerXds.Debugln("updating cluster from the consul service registry, removed the default host") - svcdiscovery.SetClusterConsulResultMap(clusterName, queryResultsList) - updateCluster(apiKey, clusterName, organizationID, queryResultsList) - } - - if svcdiscovery.MeshEnabled { - updateCertsForServiceMesh(organizationID) - } - } - } -} - -func updateCluster(apiKey string, clusterName string, organizationID string, queryResultsList []svcdiscovery.Upstream) { - if envoyInternalAPI, available := orgAPIMap[organizationID][apiKey]; available { - for _, cluster := range envoyInternalAPI.clusters { - if cluster.Name == clusterName { - var lbEndpointList []*endpointv3.LbEndpoint - for _, result := range queryResultsList { - address := &corev3.Address{Address: &corev3.Address_SocketAddress{ - SocketAddress: &corev3.SocketAddress{ - Address: result.Address, - Protocol: corev3.SocketAddress_TCP, - PortSpecifier: &corev3.SocketAddress_PortValue{ - PortValue: uint32(result.ServicePort), - }, - }, - }} - - lbEndPoint := &endpointv3.LbEndpoint{ - HostIdentifier: &endpointv3.LbEndpoint_Endpoint{ - Endpoint: &endpointv3.Endpoint{ - Address: address, - }, - }, - } - lbEndpointList = append(lbEndpointList, lbEndPoint) - } - cluster.LoadAssignment = &endpointv3.ClusterLoadAssignment{ - ClusterName: clusterName, - Endpoints: []*endpointv3.LocalityLbEndpoints{ - { - LbEndpoints: lbEndpointList, - }, - }, - } - updateXDSClusterCache(apiKey, organizationID) - } - } - } -} - -func updateXDSClusterCache(apiKey string, organizationID string) { - for key, envoyInternalAPI := range orgAPIMap[organizationID] { - if key == apiKey { - for _, label := range envoyInternalAPI.envoyLabels { - listeners, clusters, routes, endpoints, _ := GenerateEnvoyResoucesForGateway(label) - UpdateXdsCacheWithLock(label, endpoints, clusters, routes, listeners) - logger.LoggerXds.Info("Updated XDS cache by consul service discovery for API: ", apiKey) - } - } - } -} - -func stopConsulDiscoveryFor(clusterName string) { - if doneChan, available := svcdiscovery.ClusterConsulDoneChanMap[clusterName]; available { - close(doneChan) - } - delete(svcdiscovery.ClusterConsulResultMap, clusterName) - delete(svcdiscovery.ClusterConsulKeyMap, clusterName) -} diff --git a/adapter/internal/logging/logging_constant.go b/adapter/internal/logging/logging_constant.go index ec56bf714..bc451aa1b 100644 --- a/adapter/internal/logging/logging_constant.go +++ b/adapter/internal/logging/logging_constant.go @@ -47,7 +47,6 @@ const ( const ( Error1400 = 1400 Error1401 = 1401 - Error1402 = 1402 Error1403 = 1403 Error1410 = 1410 Error1411 = 1411 @@ -169,10 +168,6 @@ var Mapper = map[int]logging.ErrorDetails{ ErrorCode: Error1401, Message: "Error in Stream request type.", }, - Error1402: { - ErrorCode: Error1402, - Message: "Consul syntax parse error.", - }, Error1403: { ErrorCode: Error1403, Message: "Internal Error while marshalling the upstream TLS Context.", diff --git a/adapter/internal/oasparser/envoyconf/routes_with_clusters.go b/adapter/internal/oasparser/envoyconf/routes_with_clusters.go index 2addbd7a2..fb5e5f4d3 100644 --- a/adapter/internal/oasparser/envoyconf/routes_with_clusters.go +++ b/adapter/internal/oasparser/envoyconf/routes_with_clusters.go @@ -52,7 +52,6 @@ import ( logging "github.com/wso2/apk/adapter/internal/logging" "github.com/wso2/apk/adapter/internal/oasparser/constants" "github.com/wso2/apk/adapter/internal/oasparser/model" - "github.com/wso2/apk/adapter/internal/svcdiscovery" dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2" "google.golang.org/protobuf/proto" gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -563,16 +562,6 @@ func processEndpoints(clusterName string, clusterDetails *model.EndpointCluster, } } - // service discovery itself will be handling loadbancing etc. - // Therefore mutiple endpoint support is not needed, hence consider only. - serviceDiscoveryString := clusterDetails.Endpoints[0].ServiceDiscoveryString - if serviceDiscoveryString != "" { - //add the api level cluster name to the ClusterConsulKeyMap - svcdiscovery.ClusterConsulKeyMap[clusterName] = serviceDiscoveryString - logger.LoggerOasparser.Debugln("Consul cluster added for x-wso2-endpoints: ", clusterName, " ", - serviceDiscoveryString) - } - return &cluster, addresses, nil } diff --git a/adapter/internal/oasparser/model/adapter_internal_api.go b/adapter/internal/oasparser/model/adapter_internal_api.go index 882226d31..5e5bddddc 100644 --- a/adapter/internal/oasparser/model/adapter_internal_api.go +++ b/adapter/internal/oasparser/model/adapter_internal_api.go @@ -138,10 +138,8 @@ type Endpoint struct { // Port of the endpoint. // If the port is not specified, 80 is assigned if URLType is http // 443 is assigned if URLType is https - Port uint32 - //ServiceDiscoveryQuery consul query for service discovery - ServiceDiscoveryString string - RawURL string + Port uint32 + RawURL string // Trusted CA Cerificate for the endpoint Certificate []byte // Subject Alternative Names to verify in the public certificate @@ -953,9 +951,6 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoGQLRouteCR(gqlRoute *dpv1al } func (endpoint *Endpoint) validateEndpoint() error { - if len(endpoint.ServiceDiscoveryString) > 0 { - return nil - } if endpoint.Port == 0 || endpoint.Port > 65535 { return errors.New("endpoint port value should be between 0 and 65535") } diff --git a/adapter/internal/svcdiscovery/client.go b/adapter/internal/svcdiscovery/client.go deleted file mode 100644 index 3a48753d3..000000000 --- a/adapter/internal/svcdiscovery/client.go +++ /dev/null @@ -1,505 +0,0 @@ -/* - * Copyright (c) 2021, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package svcdiscovery - -import ( - "crypto/tls" - "crypto/x509" - "encoding/json" - "io/ioutil" - "net/http" - "strconv" - "time" - - logger "github.com/wso2/apk/adapter/internal/loggers" -) - -const ( - apiVersion = "v1" - apiCatalogPath = "/health/service/" - apiMeshPath = "/health/connect/" - apiRootCertPath = "/agent/connect/ca/roots" - apiLeafCertPath = "/agent/connect/ca/leaf/" - consulTokenHeader = "X-Consul-Token" - consulIndexHeader = "X-Consul-Index" - datacenter = "dc" - passing = "passing" - passingVal = "1" - namespace = "nc" - indexQueryParam = "index" - waitQueryParam = "wait" - get = "GET" - longPollInterval = 60 //in seconds, default = 300s -) - -var ( - caReqLastIndex = 0 - leafReqLastIndex = 0 -) - -type node struct { - Address string - Datacenter string - TaggedAddresses map[string]string -} - -type service struct { - Tags []string - Address string - TaggedAddresses interface{} - Port int - ID string - Proxy Proxy -} - -// result is used to unmarshal the required components from the consul server's response -type result struct { - Node node - Service service -} - -// Upstream Data for a service instance -type Upstream struct { - Address string - ServicePort int - ID string - InMesh bool -} - -// Proxy side car proxy information -type Proxy struct { - DestinationServiceID string - LocalServiceAddress string - LocalServicePort int -} - -// Query query structure for a consul string syntax -type Query struct { - Datacenters []string - ServiceName string - Namespace string - Tags []string -} - -// Root contains root certificates -type Root struct { - RootCert string - IntermediateCerts []string - Active bool -} - -// RootCertResp structure of a response to a request to get the root certificates -type RootCertResp struct { - Roots []Root -} - -// ServiceCertResp structure of a response to get a service's certificate and private key -type ServiceCertResp struct { - CertPEM string - PrivateKeyPEM string -} - -// newHTTPClient is a golang http client with request timeout -func newHTTPClient(transport *http.Transport, timeout time.Duration) http.Client { - client := http.Client{ - Transport: transport, - Timeout: timeout, - } - return client -} - -func newTLSConfig(rootCAs *x509.CertPool, certs []tls.Certificate, insecureSkipVerify bool) tls.Config { - return tls.Config{ - RootCAs: rootCAs, - Certificates: certs, - InsecureSkipVerify: insecureSkipVerify, - } -} - -func newHTTPSTransport(config *tls.Config) http.Transport { - return http.Transport{ - TLSClientConfig: config, - } -} -func newHTTPTransport() http.Transport { - return http.Transport{} -} - -// ConsulClient wraps the HTTP API -type ConsulClient struct { - client http.Client //for upstreams - longPollClient http.Client //for certs - scheme string - host string - aclToken string - pollInterval time.Duration -} - -// NewConsulClient constructor for ConsulClient -func NewConsulClient(api http.Client, longPollClient http.Client, scheme string, host string, aclToken string) ConsulClient { - return ConsulClient{ - client: api, - longPollClient: longPollClient, - scheme: scheme, - host: host, - pollInterval: api.Timeout, - aclToken: aclToken, - } -} - -// whether source list contains one of elements list -func contains(source []string, elements []string) bool { - for _, a := range source { - for _, b := range elements { - if a == b { - return true - } - } - } - return false -} - -// construct a URL from it's elements -func constructURL(scheme, host, apiV string, otherPaths ...string) string { - url := scheme + "://" + host - if host[len(host)-1:] != "/" { - url += "/" - } - url += apiV - for _, path := range otherPaths { - url += path - } - return url -} - -// sends a get request to a consul-client -// parses the response into []Upstream -func (c ConsulClient) get(path string, dc string, nc string, tags []string) ([]Upstream, error) { - url := constructURL(c.scheme, c.host, apiVersion, apiCatalogPath, path) - req, _ := http.NewRequest(get, url, nil) - - //add query parameters - q := req.URL.Query() - if dc != "" { - q.Add(datacenter, dc) //datacenter - } - - if nc != "" { //namespace, an enterprise feature - q.Add(namespace, nc) - } - req.URL.RawQuery = q.Encode() - //set headers - req.Header.Set(consulTokenHeader, c.aclToken) - - response, errHTTP := c.client.Do(req) - if errHTTP != nil { - return []Upstream{}, errHTTP - } - - var results []result - body, errRead := ioutil.ReadAll(response.Body) - if errRead != nil { - return []Upstream{}, errRead - } - errUnmarshal := json.Unmarshal(body, &results) - var out []Upstream - for _, r := range results { - address := r.Service.Address - if address == "" { - address = r.Node.Address - } - res := Upstream{ - Address: address, - ServicePort: r.Service.Port, - ID: r.Service.ID, - InMesh: false, - } - if contains(r.Service.Tags, tags) { - out = append(out, res) - //tags haven't been defined - } else if len(tags) == 1 && tags[0] == "" { - out = append(out, res) - } - } - return out, errUnmarshal -} - -// sends a get request to a consul-client -// parses the response into []Upstream -// gets upstreams that only belongs to service mesh -func (c ConsulClient) getMeshUpstreams(path string) ([]Upstream, error) { - url := constructURL(c.scheme, c.host, apiVersion, apiMeshPath, path) - req, _ := http.NewRequest(get, url, nil) - - //add query parameters - q := req.URL.Query() - //q.Add(passing, passingVal) // health checks passing only - req.URL.RawQuery = q.Encode() - //set headers - req.Header.Set(consulTokenHeader, c.aclToken) - response, errHTTP := c.client.Do(req) - if errHTTP != nil { - return []Upstream{}, errHTTP - } - var result []result - body, errRead := ioutil.ReadAll(response.Body) - if errRead != nil { - return []Upstream{}, errRead - } - errUnmarshal := json.Unmarshal(body, &result) - var out []Upstream - for _, r := range result { - address := r.Service.Address - if address == "" { - address = r.Service.Proxy.LocalServiceAddress - } - res := Upstream{ - Address: address, - ServicePort: r.Service.Port, - ID: r.Service.Proxy.DestinationServiceID, - InMesh: true, - } - out = append(out, res) - } - return out, errUnmarshal -} - -// gets the upstreams for single query(ex: [dc1,dc2].namespace.serviceA.[tag1,tag2]) -// there can be many upstreams per query -// sends the respective upstreams through resultChan -func (c ConsulClient) getUpstreams(query Query, resultChan chan []Upstream) { - defer func() { - if r := recover(); r != nil { - logger.LoggerSvcDiscovery.Error("Recovered from a panic: ", r) - //panic: if the resultChan is closed - } - }() - - var result []Upstream - for _, dc := range query.Datacenters { - res, errGet := c.get(query.ServiceName, dc, query.Namespace, query.Tags) - if errGet == nil { - result = append(result, res...) - } else { - logger.LoggerSvcDiscovery.Error("Service registry unreachable ", errGet) - } - } - - if len(result) == 0 { - logger.LoggerSvcDiscovery.Debugln("Consul service registry query came up with empty result") - } else { - if MeshEnabled { //replace the actual address and port with proxy's address and port - resMesh, errGet := c.getMeshUpstreams(query.ServiceName) - if errGet == nil { - for i := range resMesh { - for j := range result { - if resMesh[i].ID == result[j].ID { - result[j].Address = resMesh[i].Address - result[j].ServicePort = resMesh[i].ServicePort - } - } - } - } else { - logger.LoggerSvcDiscovery.Error("Service registry unreachable ", errGet) - } - } - resultChan <- result - } -} - -// Poll periodically poll consul for updates using getUpstreams() func. -// doneChan is there to release resources -// closing the doneChan will stop polling -func (c ConsulClient) Poll(query Query, doneChan <-chan bool) <-chan []Upstream { - resultChan := make(chan []Upstream) - - //do not start polling consul if there are config errors - if errConfLoad != nil { - logger.LoggerSvcDiscovery.Error("Config errors found in consul config. ", errConfLoad, " query: ", query, " wil not be polled") - return resultChan - } - - //this routine will live until doneChan is closed - go func() { - ticker := time.NewTicker(pollInterval) - intervalChan := ticker.C //emits a signal every pollInterval(5 seconds) - - //handle panics - defer func() { - if r := recover(); r != nil { - logger.LoggerSvcDiscovery.Info("Recovered from panic: ", r) - } - }() - // release resources when this go routine exits - defer close(resultChan) - defer ticker.Stop() - - for { - select { - case <-doneChan: - //sending a signal through doneChan will cause this go routine to exit - logger.LoggerSvcDiscovery.Info("Consul stopped polling for query :", query) - return - case <-intervalChan: - c.getUpstreams(query, resultChan) - } - } - }() - - return resultChan -} - -func updateCAIndex(currentIndex int) bool { - if caReqLastIndex > currentIndex { - caReqLastIndex = 0 //Reset in case Consul's indexing messes up - return true - } else if caReqLastIndex < currentIndex { - caReqLastIndex = currentIndex - return true - } - return false -} - -func updateLeafIndex(currentIndex int) bool { - if leafReqLastIndex > currentIndex { - leafReqLastIndex = 0 //Reset - return true - } else if leafReqLastIndex < currentIndex { - leafReqLastIndex = currentIndex - return true - } - return false -} - -func (c ConsulClient) getCertRequest(url string, lastIndex int) (*http.Response, error) { - - request, errReq := http.NewRequest(get, url, nil) - if errReq != nil { - return nil, errReq - } - request.Header.Set(consulTokenHeader, aclToken) - //send the last index to activate long polling from serverside - query := request.URL.Query() - query.Add(indexQueryParam, strconv.Itoa(lastIndex)) - query.Add(waitQueryParam, strconv.Itoa(longPollInterval/60)+"s") - request.URL.RawQuery = query.Encode() - response, errClient := c.longPollClient.Do(request) - if errClient != nil { - return nil, errClient - } - return response, nil -} - -func (c ConsulClient) getRootCert(signal chan bool) { - url := constructURL(c.scheme, c.host, apiVersion, apiRootCertPath) - result := RootCertResp{} - response, errReq := c.getCertRequest(url, caReqLastIndex) - if errReq != nil { - logger.LoggerSvcDiscovery.Error("Error getting root cert: ", errReq) - return - } - body, errRead := ioutil.ReadAll(response.Body) - if errRead != nil { - logger.LoggerSvcDiscovery.Error("Error reading root cert request: ", errRead) - return - } - errUnmarshal := json.Unmarshal(body, &result) - if errUnmarshal != nil { - logger.LoggerSvcDiscovery.Error("Malformed response: ", errUnmarshal) - return - } - index, errStrConv := strconv.Atoi(response.Header.Get(consulIndexHeader)) - if errStrConv != nil { - logger.LoggerSvcDiscovery.Error("Index header not sent") - return - } - shouldUpdateRouter := updateCAIndex(index) - if shouldUpdateRouter { - //there is only one root CA cert except while rotation in progress - for _, root := range result.Roots { - if root.Active && root.RootCert != "" { //only select the active root - MeshCACert = root.RootCert - } - } - signal <- true - } -} - -func (c ConsulClient) getServiceCertAndKey(signal chan bool) { - url := constructURL(c.scheme, c.host, apiVersion, apiLeafCertPath, apkServiceName) - result := ServiceCertResp{} - response, errReq := c.getCertRequest(url, leafReqLastIndex) - if errReq != nil { - logger.LoggerSvcDiscovery.Error("Error getting leaf cert and key: ", errReq) - return - } - body, errRead := ioutil.ReadAll(response.Body) - if errRead != nil { - logger.LoggerSvcDiscovery.Error("Error reading leaf cert and key: ", errRead) - return - } - errUnmarshal := json.Unmarshal(body, &result) - if errUnmarshal != nil { - logger.LoggerSvcDiscovery.Error("Malformed response: ", errUnmarshal) - return - } - - index, errStrConv := strconv.Atoi(response.Header.Get(consulIndexHeader)) - if errStrConv != nil { - logger.LoggerSvcDiscovery.Error("Index header not sent") - return - } - shouldUpdateRouter := updateLeafIndex(index) - if shouldUpdateRouter { - MeshServiceCert = result.CertPEM - MeshServiceKey = result.PrivateKeyPEM - signal <- true - } -} - -// LongPollRootCert starts long polling root certificate -func (c ConsulClient) LongPollRootCert(signal chan bool) { - go func(signal chan bool) { - c.getRootCert(signal) - ticker := time.NewTicker(longPollInterval * time.Second) - intervalChan := ticker.C //emits a signal every longPollInterval - defer ticker.Stop() - for { - select { - case <-intervalChan: - c.getRootCert(signal) - } - } - }(signal) -} - -// LongPollServiceCertAndKey starts long polling for service cert and key -func (c ConsulClient) LongPollServiceCertAndKey(signal chan bool) { - go func(signal chan bool) { - c.getServiceCertAndKey(signal) - ticker := time.NewTicker(longPollInterval * time.Second) - intervalChan := ticker.C //emits a signal every longPollInterval - defer ticker.Stop() - for { - select { - case <-intervalChan: - c.getServiceCertAndKey(signal) - } - } - }(signal) -} diff --git a/adapter/internal/svcdiscovery/utils.go b/adapter/internal/svcdiscovery/utils.go deleted file mode 100644 index f44edc918..000000000 --- a/adapter/internal/svcdiscovery/utils.go +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright (c) 2021, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package svcdiscovery - -import ( - "errors" - "regexp" - "strings" -) - -const ( - // consulBegin - consulBegin string = "consul" -) - -// DefaultHost host and port of the default host -// Clusters are initialized with default host at the time of initialization of an api project -type DefaultHost struct { - Host string - Port string -} - -//IsDiscoveryServiceEndpoint checks whether an endpoint string is a consul syntax string -func IsDiscoveryServiceEndpoint(str string) bool { - str = strings.TrimSpace(str) - re, _ := regexp.Compile(`^consul(\s*)\(.*,.*\)$`) - return re.MatchString(str) -} - -//parse a list of datacenters or tags -func parseList(str string) []string { - parsedString := strings.Split(str, ",") - for i := range parsedString { - parsedString[i] = strings.TrimSpace(strings.ReplaceAll(parsedString[i], "[", "")) - parsedString[i] = strings.TrimSpace(strings.ReplaceAll(parsedString[i], "]", "")) - if strings.TrimSpace(parsedString[i]) == "*" { - parsedString[i] = "" - } - } - return parsedString -} - -//ParseConsulSyntax breaks the syntax string into query string and default host string -func ParseConsulSyntax(str string) (string, string, error) { - list := strings.Split(str, ",") - length := len(list) - if length < 2 { - return "", "", errors.New("default host not provided") - } - defaultHost := list[length-1] - defaultHost = strings.Replace(defaultHost, ")", "", 1) - defaultHost = strings.TrimSpace(defaultHost) - - for i := 0; i < length-1; i++ { - str += list[i] - str = strings.Join(list[0:length-1], ",") - } - str = strings.Replace(str, "(", "", 1) - str = strings.Replace(str, consulBegin, "", 1) - str = strings.TrimSpace(str) - return str, defaultHost, nil -} - -//ParseQueryString parses the string into a Query struct -func ParseQueryString(query string) (Query, error) { - //examples--> - //[dc1,dc2].namespace.serviceA.[tag1,tag2] - //dc1.serviceA.tag1 - //serviceA - // - str := strings.Split(query, ".") - qCategory := len(str) - if qCategory == 1 { //service name only - queryString := Query{ - Datacenters: parseList("*"), - ServiceName: strings.TrimSpace(str[0]), - Namespace: "", - Tags: parseList("*"), - } - return queryString, nil - } else if qCategory == 3 { //datacenters, service name, tags - queryString := Query{ - Datacenters: parseList(str[0]), - ServiceName: strings.TrimSpace(str[1]), - Namespace: "", - Tags: parseList(str[2]), - } - return queryString, nil - } else if qCategory == 4 { //datacenters, namespace, service name, tags - queryString := Query{ - Datacenters: parseList(str[0]), - ServiceName: strings.TrimSpace(str[2]), - Namespace: strings.TrimSpace(str[1]), - Tags: parseList(str[3]), - } - return queryString, nil - } - return Query{}, errors.New("bad consul query syntax") -} diff --git a/adapter/internal/svcdiscovery/utils_test.go b/adapter/internal/svcdiscovery/utils_test.go deleted file mode 100644 index aa8982976..000000000 --- a/adapter/internal/svcdiscovery/utils_test.go +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Copyright (c) 2021, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package svcdiscovery - -import ( - "errors" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestIsDiscoveryServiceEndpoint(t *testing.T) { - type isDiscoveryServiceEndpointList struct { - input string - output bool - message string - } - dataItems := []isDiscoveryServiceEndpointList{ - { - input: " consul (dc1.dev.serviceA.tag1 , http://localhost:4000 ) ", - output: true, - message: "with spaces", - }, - { - input: " consul", - output: false, - message: "invalid", - }, - { - input: "consul([dc1,dc2].dev.serviceA.[tag1,tag2],http://localhost:4000)", - output: true, - message: "valid", - }, - { - input: "", - output: false, - message: "empty", - }, - { - input: "consul([dc1,dc2].dev.serviceA.[tag1,tag2],http://localhost:4000", - output: false, - message: "missing parenthesis", - }, - { - input: "consul([dc1,dc2].dev.serviceA.[tag1,tag2] http://localhost:4000", - output: false, - message: "missing middle comma", - }, - } - - for i, item := range dataItems { - result := IsDiscoveryServiceEndpoint(item.input) - assert.Equal(t, item.output, result, item.message, i) - } -} - -func TestParseQueryString(t *testing.T) { - type parseQueryStringItem struct { - input string - result Query - err error - message string - } - dataItems := []parseQueryStringItem{ - { - input: "[dc1,dc2].dev.serviceA.[tag1,tag2]", - result: Query{ - Datacenters: []string{"dc1", "dc2"}, - ServiceName: "serviceA", - Namespace: "dev", - Tags: []string{"tag1", "tag2"}, - }, - err: nil, - message: "simple scenario with namespace", - }, - { - input: "[dc 1,dc 2].service A.[tag1,tag2]", - result: Query{ - Datacenters: []string{"dc 1", "dc 2"}, - ServiceName: "service A", - Namespace: "", - Tags: []string{"tag1", "tag2"}, - }, - err: nil, - message: "simple scenario without namespace", - }, - { - input: "[].prod.serviceA.[*]", - result: Query{ - Datacenters: []string{""}, - ServiceName: "serviceA", - Namespace: "prod", - Tags: []string{""}, - }, - err: nil, - message: "empty dcs and tags", - }, - { - input: "[].fake.another.prod.serviceA.[*]", - err: errors.New("bad consul query syntax"), - message: "5 pieces in syntax", - }, - { - input: "[dc , dc1 ]. prod . serviceA . [ * ] ", - result: Query{ - Datacenters: []string{"dc", "dc1"}, - ServiceName: "serviceA", - Namespace: "prod", - Tags: []string{""}, - }, - err: nil, - message: "spaces should be trimmed", - }, { - input: " serviceA ", - result: Query{ - Datacenters: []string{""}, - ServiceName: "serviceA", - Namespace: "", - Tags: []string{""}, - }, - err: nil, - message: "service name only", - }, - { - input: " dc1.serviceA.tagA", - result: Query{ - Datacenters: []string{"dc1"}, - ServiceName: "serviceA", - Namespace: "", - Tags: []string{"tagA"}, - }, - err: nil, - message: "without brackets", - }, - } - for i, item := range dataItems { - result, err := ParseQueryString(item.input) - assert.Equal(t, item.result, result, item.message, i) - assert.Equal(t, item.err, err, item.message) - } -} - -func TestParseList(t *testing.T) { - type parseListTestItem struct { - inputString string - resultList []string - message string - } - dataItems := []parseListTestItem{ - { - inputString: "[dc1,dc2,aws-us-central-1]", - resultList: []string{"dc1", "dc2", "aws-us-central-1"}, - message: "Simple scenario with 3dcs", - }, - { - inputString: "[]", - resultList: []string{""}, - message: "Empty list :(all)", - }, - { - inputString: "[*]", - resultList: []string{""}, - message: "Empty list with * :(all)", - }, - { - inputString: "[abc]", - resultList: []string{"abc"}, - message: "List with one dc", - }, - } - for _, item := range dataItems { - result := parseList(item.inputString) - assert.Equal(t, item.resultList, result, item.message) - } -} - -func TestParseConsulSyntax(t *testing.T) { - type parseConsulSyntaxTestItem struct { - inputString string - result1 string - result2 string - err error - message string - } - - dataItems := []parseConsulSyntaxTestItem{ - { - inputString: "consul([dc 1,dc 2].service A.[tag1,tag2],http://192.168.0.1:80)", - result1: "[dc 1,dc 2].service A.[tag1,tag2]", - result2: "http://192.168.0.1:80", - err: nil, - message: "valid case", - }, - { - inputString: " consul ( [dc 1,dc 2].service A.[tag1,tag2] , http://192.168.0.1:80 ) ", - result1: "[dc 1,dc 2].service A.[tag1,tag2]", - result2: "http://192.168.0.1:80", - err: nil, - message: "valid case with extra spaces", - }, - { - inputString: "", - result1: "", - result2: "", - err: errors.New("default host not provided"), - message: "empty string", - }, - } - - for _, item := range dataItems { - result1, result2, err := ParseConsulSyntax(item.inputString) - assert.Equal(t, item.result1, result1, item.message) - assert.Equal(t, item.result2, result2, item.message) - assert.Equal(t, item.err, err, item.message) - } - -} diff --git a/adapter/internal/svcdiscovery/watcher.go b/adapter/internal/svcdiscovery/watcher.go deleted file mode 100644 index 8101c4cc2..000000000 --- a/adapter/internal/svcdiscovery/watcher.go +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Copyright (c) 2021, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package svcdiscovery - -import ( - "crypto/tls" - "crypto/x509" - "io/ioutil" - "net/url" - "strings" - "sync" - "time" - - corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - tlsv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" - - "github.com/wso2/apk/adapter/config" - logger "github.com/wso2/apk/adapter/internal/loggers" -) - -var ( - //IsServiceDiscoveryEnabled whether Consul service discovery should be enabled - IsServiceDiscoveryEnabled bool - onceConfigLoad sync.Once - mutexForResultMap sync.RWMutex - conf *config.Config - pollInterval time.Duration - errConfLoad error - //MeshEnabled whether Consul service mesh is enabled or not - MeshEnabled bool - apkServiceName string - //MeshCACert a CA cert of Consul Mesh - MeshCACert string - //MeshServiceCert public cert of Router - MeshServiceCert string - //MeshServiceKey private key of Router - MeshServiceKey string - //MeshUpdateSignal send a signal to notify that a cert update should be propagated to envoy - MeshUpdateSignal chan bool - //ssl certs - caCert []byte - cert []byte - key []byte - aclToken string - - //ConsulClientInstance instance for consul client - ConsulClientInstance ConsulClient - //ClusterConsulKeyMap Cluster Name -> consul syntax key - ClusterConsulKeyMap map[string]string - //ClusterConsulResultMap Cluster Name -> Upstream - //saves the last result with respected to a cluster - ClusterConsulResultMap map[string][]Upstream - //ClusterConsulDoneChanMap Cluster Name -> doneChan for respective go routine - //when the cluster is removed we can stop the respective go routine to stop polling and release resources - ClusterConsulDoneChanMap map[string]chan bool - //ServiceConsulMeshMap Service -> whether in Mesh - ServiceConsulMeshMap map[string]bool -) - -func init() { - ClusterConsulKeyMap = make(map[string]string) - ClusterConsulResultMap = make(map[string][]Upstream) - ClusterConsulDoneChanMap = make(map[string]chan bool) - ServiceConsulMeshMap = make(map[string]bool) - //Read config - conf = config.ReadConfigs() - IsServiceDiscoveryEnabled = conf.Adapter.Consul.Enabled - aclToken = strings.TrimSpace(conf.Adapter.Consul.ACLToken) - apkServiceName = conf.Adapter.Consul.ApkServiceName - MeshEnabled = conf.Adapter.Consul.ServiceMeshEnabled - MeshUpdateSignal = make(chan bool) -} - -// SetClusterConsulResultMap avoid concurrent map writes when writing to ClusterConsulResultMap -func SetClusterConsulResultMap(key string, value []Upstream) { - mutexForResultMap.Lock() - defer mutexForResultMap.Unlock() - ClusterConsulResultMap[key] = value -} - -// GetClusterConsulResultMap avoid concurrent map reads/writes when reading from ClusterConsulResultMap -func GetClusterConsulResultMap(key string) []Upstream { - mutexForResultMap.RLock() - defer mutexForResultMap.RUnlock() - return ClusterConsulResultMap[key] -} - -// read the certs and access token required for tls into respective global variables -func readCerts() error { - // TODO: (VirajSalaka) Replace with common CA cert pool - caFileContent, readErr := ioutil.ReadFile(conf.Adapter.Consul.CaCertFile) - if readErr != nil { - return readErr - } - - certFileContent, readErr := ioutil.ReadFile(conf.Adapter.Consul.CertFile) - if readErr != nil { - return readErr - } - - keyFileContent, readErr := ioutil.ReadFile(conf.Adapter.Consul.KeyFile) - if readErr != nil { - return readErr - } - - caCert = caFileContent - cert = certFileContent - key = keyFileContent - - return nil -} - -// InitConsul loads certs and initialize a ConsulClient -// lazy loading -func InitConsul() { - onceConfigLoad.Do(func() { - conf = config.ReadConfigs() - pollInterval = time.Duration(conf.Adapter.Consul.PollInterval) * time.Second - urlStructure, errURLParse := url.Parse(conf.Adapter.Consul.URL) - if errURLParse != nil { - errConfLoad = errURLParse - logger.LoggerSvcDiscovery.Error("Invalid URL to Consul Client ", errURLParse) - return - } - if urlStructure.Scheme == "https" { //communicate to consul through https - errCertRead := readCerts() - if errCertRead != nil { - errConfLoad = errCertRead - logger.LoggerSvcDiscovery.Error("Consul Certs read error ", errCertRead) - return - } - pool := x509.NewCertPool() - pool.AppendCertsFromPEM(caCert) - clientCert, errKeyPairLoad := tls.X509KeyPair(cert, key) - if errKeyPairLoad != nil { - errConfLoad = errKeyPairLoad - logger.LoggerSvcDiscovery.Error("Key pair error", errKeyPairLoad) - return - } - tlsConfig := newTLSConfig(pool, []tls.Certificate{clientCert}, false) - transport := newHTTPSTransport(&tlsConfig) - client := newHTTPClient(&transport, pollInterval) - longPollClient := newHTTPClient(&transport, time.Duration(longPollInterval*2)*time.Second) - ConsulClientInstance = NewConsulClient(client, longPollClient, urlStructure.Scheme, urlStructure.Host, aclToken) - ConsulClientInstance.LongPollRootCert(MeshUpdateSignal) - ConsulClientInstance.LongPollServiceCertAndKey(MeshUpdateSignal) - } else { - //communicate to consul through http - transport := newHTTPTransport() - client := newHTTPClient(&transport, pollInterval) - longPollClient := newHTTPClient(&transport, time.Duration(longPollInterval*2)*time.Second) - ConsulClientInstance = NewConsulClient(client, longPollClient, urlStructure.Scheme, urlStructure.Host, aclToken) - - if conf.Adapter.Consul.ServiceMeshEnabled { - ConsulClientInstance.LongPollRootCert(MeshUpdateSignal) - ConsulClientInstance.LongPollServiceCertAndKey(MeshUpdateSignal) - } - - } - }) -} - -// generate TLS certs as inline strings -func generateTLSCertWithStr(privateKey string, publicKey string) *tlsv3.TlsCertificate { - var tlsCert tlsv3.TlsCertificate - tlsCert = tlsv3.TlsCertificate{ - PrivateKey: &corev3.DataSource{ - Specifier: &corev3.DataSource_InlineString{ - InlineString: privateKey, - }, - }, - CertificateChain: &corev3.DataSource{ - Specifier: &corev3.DataSource_InlineString{ - InlineString: publicKey, - }, - }, - } - return &tlsCert -} - -// CreateUpstreamTLSContext create a new TLS context using CA, private key and public key -func CreateUpstreamTLSContext(upstreamCACert, privateKey, publicKey string) *tlsv3.UpstreamTlsContext { - tlsCert := generateTLSCertWithStr(privateKey, publicKey) - // Convert the cipher string to a string array - ciphersArray := strings.Split(conf.Envoy.Upstream.TLS.Ciphers, ",") - for i := range ciphersArray { - ciphersArray[i] = strings.TrimSpace(ciphersArray[i]) - } - upstreamTLSContext := &tlsv3.UpstreamTlsContext{ - CommonTlsContext: &tlsv3.CommonTlsContext{ - TlsParams: &tlsv3.TlsParameters{ - TlsMinimumProtocolVersion: createTLSProtocolVersion(conf.Envoy.Upstream.TLS.MinimumProtocolVersion), - TlsMaximumProtocolVersion: createTLSProtocolVersion(conf.Envoy.Upstream.TLS.MaximumProtocolVersion), - CipherSuites: ciphersArray, - }, - TlsCertificates: []*tlsv3.TlsCertificate{tlsCert}, - }, - } - if !conf.Envoy.Upstream.TLS.DisableSslVerification { - var trustedCASrc *corev3.DataSource - trustedCASrc = &corev3.DataSource{ - Specifier: &corev3.DataSource_InlineString{ - InlineString: upstreamCACert, - }, - } - upstreamTLSContext.CommonTlsContext.ValidationContextType = &tlsv3.CommonTlsContext_ValidationContext{ - ValidationContext: &tlsv3.CertificateValidationContext{ - TrustedCa: trustedCASrc, - }, - } - } - //Note: Cert verification done through CA root - //A CN is available in Consul generated certs but no SNI - //Therefore hostname verification is ignored - return upstreamTLSContext -} - -// copied from oasparser/envoyconf/routes_with_clusters.go -// reason: to avoid cyclic import -func createTLSProtocolVersion(tlsVersion string) tlsv3.TlsParameters_TlsProtocol { - switch tlsVersion { - case "TLS1_0": - return tlsv3.TlsParameters_TLSv1_0 - case "TLS1_1": - return tlsv3.TlsParameters_TLSv1_1 - case "TLS1_2": - return tlsv3.TlsParameters_TLSv1_2 - case "TLS1_3": - return tlsv3.TlsParameters_TLSv1_3 - default: - return tlsv3.TlsParameters_TLS_AUTO - } -}