Skip to content

Commit

Permalink
Add http, https and multiple port support
Browse files Browse the repository at this point in the history
  • Loading branch information
Tharsanan1 committed Dec 15, 2023
1 parent 3e6e882 commit b81de35
Show file tree
Hide file tree
Showing 30 changed files with 745 additions and 270 deletions.
91 changes: 91 additions & 0 deletions adapter/internal/data_holder/data-holder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package data_holder

import (
k8types "k8s.io/apimachinery/pkg/types"
gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

// The following variables will be used to store the state of the apk.
// This data should not be utilized by operator thread as its not designed for parallel access.
var (
// This variable in the structure of gateway's namespaced name -> gateway
gatewayMap map[string]gwapiv1b1.Gateway
)

func init() {
gatewayMap = make(map[string]gwapiv1b1.Gateway)
}

// GetGatewayMap returns list of cached gateways
func GetGatewayMap() map[string]gwapiv1b1.Gateway {
return gatewayMap
}

// UpdateGateway caches the gateway
func UpdateGateway(gateway gwapiv1b1.Gateway) {
gatewayMap[k8types.NamespacedName{Name: gateway.Name, Namespace: gateway.Namespace}.String()] = gateway
}

// RemoveGateway removes the gateway from the cache
func RemoveGateway(gateway gwapiv1b1.Gateway) {
delete(gatewayMap, k8types.NamespacedName{Name: gateway.Name, Namespace: gateway.Namespace}.String())
}

// GetAllGatewayListeners return the list of all the listeners that stored in the gateway cache
func GetAllGatewayListeners() []gwapiv1b1.Listener {
listeners := make([]gwapiv1b1.Listener, 0)
for _, gateway := range gatewayMap {
for _, listener := range gateway.Spec.Listeners {
listeners = append(listeners, listener)
}
}
return listeners
}


// GetListenersAsPortalPortMap returns a map that have a structure protocol -> port -> list of listeners for that port and protocol combination
// Data is derived based on the current status of the gatwayMap cache
func GetListenersAsPortalPortMap() map[string]map[uint32][]gwapiv1b1.Listener{
listenersAsPortalPortMap := make(map[string]map[uint32][]gwapiv1b1.Listener)
for _, gateway := range gatewayMap {
for _, listener := range gateway.Spec.Listeners {
protocol := string(listener.Protocol)
port := uint32(listener.Port)
if portMap, portFound := listenersAsPortalPortMap[protocol]; portFound {
if listenersList, listenerListFound := portMap[port]; listenerListFound {
if (listenersList == nil) {
listenersList = []gwapiv1b1.Listener{listener}
} else {
listenersList = append(listenersList, listener)
}
listenersAsPortalPortMap[protocol][port] = listenersList
} else {
listenerList := []gwapiv1b1.Listener{listener}
listenersAsPortalPortMap[protocol][port] = listenerList
}
} else {
listenersAsPortalPortMap[protocol] = make(map[uint32][]gwapiv1b1.Listener)
listenerList := []gwapiv1b1.Listener{listener}
listenersAsPortalPortMap[protocol][port] = listenerList
}
}
}
return listenersAsPortalPortMap
}
41 changes: 40 additions & 1 deletion adapter/internal/discovery/xds/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package common

import (
"sync"

"fmt"
"regexp"
"strings"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)

Expand Down Expand Up @@ -92,3 +94,40 @@ func GetNodeIdentifier(request *discovery.DiscoveryRequest) string {
}
return nodeIdentifier
}

// GetEnvoyListenerName prepares the envoy listener name based on the protocol and port
func GetEnvoyListenerName(protocol string, port uint32) string {
return fmt.Sprintf("%s_%d_listener", protocol, port)
}

// GetEnvoyRouteConfigName prepares Envoy route config name based on Gateway spec's listener name and section name
func GetEnvoyRouteConfigName(listenerName string, sectionName string) string {
return fmt.Sprintf("%s_%s", listenerName, sectionName)
}

// FindElement searches for an element in a slice based on a given predicate.
// It returns the element and true if the element was found.
func FindElement[T any](collection []T, predicate func(item T) bool) (T, bool) {
for _, item := range collection {
if predicate(item) {
return item, true
}
}
var dummy T
return dummy, false
}

// MatchesHostname check whether the domain matches the hostname pattern
func MatchesHostname(domain, pattern string) bool {
// Escape special characters in the pattern and replace wildcard with regex pattern
pattern = strings.ReplaceAll(regexp.QuoteMeta(pattern), `\*`, `.*`)
// Append start and end of line anchors
pattern = "^" + pattern + "$"

matched, err := regexp.MatchString(pattern, domain)
if err != nil {
return false
}

return matched
}
89 changes: 69 additions & 20 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/wso2/apk/adapter/config"
"github.com/wso2/apk/adapter/internal/discovery/xds/common"
logger "github.com/wso2/apk/adapter/internal/loggers"
logging "github.com/wso2/apk/adapter/internal/logging"
oasParser "github.com/wso2/apk/adapter/internal/oasparser"
Expand All @@ -53,6 +54,7 @@ import (
eventhubTypes "github.com/wso2/apk/adapter/pkg/eventhub/types"
"github.com/wso2/apk/adapter/pkg/utils/stringutils"
gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"
"github.com/wso2/apk/adapter/internal/data_holder"
)

// EnvoyInternalAPI struct use to hold envoy resources and adapter internal resources
Expand All @@ -67,8 +69,8 @@ type EnvoyInternalAPI struct {

// EnvoyGatewayConfig struct use to hold envoy gateway resources
type EnvoyGatewayConfig struct {
listener *listenerv3.Listener
routeConfig *routev3.RouteConfiguration
listeners []*listenerv3.Listener
routeConfigs []*routev3.RouteConfiguration
clusters []*clusterv3.Cluster
endpoints []*corev3.Address
customRateLimitPolicies []*model.CustomRateLimitPolicy
Expand Down Expand Up @@ -125,6 +127,10 @@ const (
apiController string = "APIController"
)

type envoyRoutesWithSectionName struct {
routes []*routev3.Route
}

func maxRandomBigInt() *big.Int {
return big.NewInt(int64(maxRandomInt))
}
Expand Down Expand Up @@ -399,23 +405,61 @@ func GenerateEnvoyResoucesForGateway(gatewayName string) ([]types.Resource,
}

envoyGatewayConfig, gwFound := gatewayLabelConfigMap[gatewayName]
listener := envoyGatewayConfig.listener
if !gwFound || listener == nil {
listeners := envoyGatewayConfig.listeners
if !gwFound || listeners == nil || len(listeners) == 0 {
return nil, nil, nil, nil, nil
}
routesFromListener := listenerToRouteArrayMap[listener.Name]
var vhostToRouteArrayFilteredMap = make(map[string][]*routev3.Route)
for vhost, routes := range vhostToRouteArrayMap {
if vhost == systemHost || checkRoutes(routes, routesFromListener) {
vhostToRouteArrayFilteredMap[vhost] = routes
routeConfigs := make([]*routev3.RouteConfiguration, 0)
for _, listener := range listeners {
for vhost, routes := range vhostToRouteArrayMap {
matchedListener, found := common.FindElement(data_holder.GetAllGatewayListeners(), func(listenerLocal gwapiv1b1.Listener) bool {
if (listenerLocal.Hostname != nil && common.MatchesHostname(vhost, string(*listenerLocal.Hostname))) {
if (listener.Name == common.GetEnvoyListenerName(string(listenerLocal.Protocol), uint32(listenerLocal.Port))) {
return true
}
}
return false
})
if found {
// Prepare the route config name based on the gateway listener section name.
routeConfigName := common.GetEnvoyRouteConfigName(listener.Name, string(matchedListener.Name));
routesConfig := oasParser.GetRouteConfigs(map[string][]*routev3.Route{vhost:routes}, routeConfigName, envoyGatewayConfig.customRateLimitPolicies)

routeConfigMatched, alreadyExistsInRouteConfigList := common.FindElement(routeConfigs, func(routeConf *routev3.RouteConfiguration) bool {
if (routeConf.Name == routesConfig.Name) {
return true
}
return false
})
if alreadyExistsInRouteConfigList {
logger.LoggerAPKOperator.Debugf("Route already exists. %+v", routesConfig.Name)
routeConfigMatched.VirtualHosts = append(routeConfigMatched.VirtualHosts, routesConfig.VirtualHosts...)
} else {
routeConfigs = append(routeConfigs, routesConfig)
}
} else {
logger.LoggerAPKOperator.Errorf("Failed to find a matching gateway listener for this vhost: %s", vhost)
}
}
}

// Find gateway listeners that has $systemHost as its hostname and add the system routeConfig referencing those listeners
gatewayListeners := data_holder.GetAllGatewayListeners()
for _, listener := range gatewayListeners {
if (systemHost == string(*listener.Hostname)) {
var vhostToRouteArrayFilteredMapForSystemEndpoints = make(map[string][]*routev3.Route)
vhostToRouteArrayFilteredMapForSystemEndpoints[systemHost] = vhostToRouteArrayMap[systemHost]
routeConfigName := common.GetEnvoyRouteConfigName(common.GetEnvoyListenerName(string(listener.Protocol), uint32(listener.Port)), string(listener.Name))
systemRoutesConfig := oasParser.GetRouteConfigs(vhostToRouteArrayFilteredMapForSystemEndpoints, routeConfigName, envoyGatewayConfig.customRateLimitPolicies)
routeConfigs = append(routeConfigs, systemRoutesConfig)
}
}
routesConfig := oasParser.GetRouteConfigs(vhostToRouteArrayFilteredMap, listener.Name, envoyGatewayConfig.customRateLimitPolicies)
envoyGatewayConfig.routeConfig = routesConfig

envoyGatewayConfig.routeConfigs = routeConfigs
clusterArray = append(clusterArray, envoyGatewayConfig.clusters...)
endpointArray = append(endpointArray, envoyGatewayConfig.endpoints...)
endpoints, clusters, listeners, routeConfigs := oasParser.GetCacheResources(endpointArray, clusterArray, listener, routesConfig)
return endpoints, clusters, listeners, routeConfigs, apis
listeners_, clusters, routeConfigs_, endpoints := oasParser.GetCacheResources(endpointArray, clusterArray, listeners, routeConfigs)
return listeners_, clusters, routeConfigs_, endpoints, apis
}

// function to check routes []*routev3.Route equlas routes []*routev3.Route
Expand Down Expand Up @@ -488,7 +532,6 @@ func updateXdsCache(label string, endpoints []types.Resource, clusters []types.R
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1414, logging.MAJOR, "Error while setting the snapshot : %v", errSetSnap.Error()))
return false
}
logger.LoggerXds.Infof("New Router cache updated for the label: " + label + " version: " + fmt.Sprint(version))
return true
}

Expand Down Expand Up @@ -641,7 +684,7 @@ func RemoveAPIFromOrgAPIMap(uuid string, orgID string) {
}

// UpdateAPICache updates the xDS cache related to the API Lifecycle event.
func UpdateAPICache(vHosts []string, newLabels []string, newlistenersForRoutes []string, adapterInternalAPI model.AdapterInternalAPI) error {
func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string, adapterInternalAPI model.AdapterInternalAPI) error {
mutexForInternalMapUpdate.Lock()
defer mutexForInternalMapUpdate.Unlock()

Expand Down Expand Up @@ -671,6 +714,7 @@ func UpdateAPICache(vHosts []string, newLabels []string, newlistenersForRoutes [

// Create internal mappigs for new vHosts
for _, vHost := range vHosts {
logger.LoggerAPKOperator.Debugf("Creating internal mapping for vhost: %s", vHost)
apiUUID := adapterInternalAPI.UUID
apiIdentifier := GenerateIdentifierForAPIWithUUID(vHost, apiUUID)
var oldLabels []string
Expand Down Expand Up @@ -700,10 +744,15 @@ func UpdateAPICache(vHosts []string, newLabels []string, newlistenersForRoutes [
endpointAddresses: endpoints,
enforcerAPI: oasParser.GetEnforcerAPI(adapterInternalAPI, vHost),
}
if _, ok := listenerToRouteArrayMap[newlistenersForRoutes[0]]; ok {
listenerToRouteArrayMap[newlistenersForRoutes[0]] = append(listenerToRouteArrayMap[newlistenersForRoutes[0]], routes...)
if _, ok := listenerToRouteArrayMap[listener]; ok {
routesList := listenerToRouteArrayMap[listener]
if (routesList == nil) {
routesList = make([]*routev3.Route, 0)
}
routesList = append(routesList, routes...)
listenerToRouteArrayMap[listener] = routesList
} else {
listenerToRouteArrayMap[newlistenersForRoutes[0]] = routes
listenerToRouteArrayMap[listener] = routes
}

revisionStatus := updateXdsCacheOnAPIChange(oldLabels, newLabels)
Expand All @@ -715,8 +764,8 @@ func UpdateAPICache(vHosts []string, newLabels []string, newlistenersForRoutes [
// UpdateGatewayCache updates the xDS cache related to the Gateway Lifecycle event.
func UpdateGatewayCache(gateway *gwapiv1b1.Gateway, resolvedListenerCerts map[string]map[string][]byte,
gwLuaScript string, customRateLimitPolicies []*model.CustomRateLimitPolicy) error {
listener := oasParser.GetProductionListener(gateway, resolvedListenerCerts, gwLuaScript)
gatewayLabelConfigMap[gateway.Name].listener = listener
listeners := oasParser.GetProductionListener(gateway, resolvedListenerCerts, gwLuaScript)
gatewayLabelConfigMap[gateway.Name].listeners = listeners
conf := config.ReadConfigs()
if conf.Envoy.RateLimit.Enabled {
gatewayLabelConfigMap[gateway.Name].customRateLimitPolicies = customRateLimitPolicies
Expand Down
2 changes: 1 addition & 1 deletion adapter/internal/discovery/xds/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestUpdateAPICache(t *testing.T) {
for _, label := range test.labels {
SanitizeGateway(label, true)
}
UpdateAPICache(test.vHosts, test.labels, test.listeners, test.adapterInternalAPI)
UpdateAPICache(test.vHosts, test.labels, test.listeners[0], "httpslistener", test.adapterInternalAPI)
identifier := GetvHostsIdentifier(test.adapterInternalAPI.UUID, "prod")
actualvHosts, ok := orgIDAPIvHostsMap[test.adapterInternalAPI.OrganizationID][identifier]
if !ok {
Expand Down
18 changes: 12 additions & 6 deletions adapter/internal/oasparser/config_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func GetGlobalClusters() ([]*clusterv3.Cluster, []*corev3.Address) {
// The provided set of envoy routes will be assigned under the virtual host
//
// The RouteConfiguration is named as "default"
func GetProductionListener(gateway *gwapiv1b1.Gateway, resolvedListenerCerts map[string]map[string][]byte, gwLuaScript string) *listenerv3.Listener {
func GetProductionListener(gateway *gwapiv1b1.Gateway, resolvedListenerCerts map[string]map[string][]byte, gwLuaScript string) []*listenerv3.Listener {
listeners := envoy.CreateListenerByGateway(gateway, resolvedListenerCerts, gwLuaScript)
return listeners
}
Expand All @@ -98,10 +98,10 @@ func GetProductionListener(gateway *gwapiv1b1.Gateway, resolvedListenerCerts map
// The provided set of envoy routes will be assigned under the virtual host
//
// The RouteConfiguration is named as "default"
func GetRouteConfigs(vhostToRouteArrayMap map[string][]*routev3.Route, httpListener string,
func GetRouteConfigs(vhostToRouteArrayMap map[string][]*routev3.Route, routeConfigName string,
customRateLimitPolicies []*model.CustomRateLimitPolicy) *routev3.RouteConfiguration {
vHosts := envoy.CreateVirtualHosts(vhostToRouteArrayMap, customRateLimitPolicies)
routeConfig := envoy.CreateRoutesConfigForRds(vHosts, httpListener)
routeConfig := envoy.CreateRoutesConfigForRds(vHosts, routeConfigName)
return routeConfig
}

Expand All @@ -110,7 +110,7 @@ func GetRouteConfigs(vhostToRouteArrayMap map[string][]*routev3.Route, httpListe
//
// The returned resources are listeners, clusters, routeConfigurations, endpoints
func GetCacheResources(endpoints []*corev3.Address, clusters []*clusterv3.Cluster,
listeners *listenerv3.Listener, routeConfig *routev3.RouteConfiguration) (
listeners []*listenerv3.Listener, routeConfigs []*routev3.RouteConfiguration) (
listenerRes []types.Resource, clusterRes []types.Resource, routeConfigRes []types.Resource,
endpointRes []types.Resource) {

Expand All @@ -122,8 +122,14 @@ func GetCacheResources(endpoints []*corev3.Address, clusters []*clusterv3.Cluste
for _, endpoint := range endpoints {
endpointRes = append(endpointRes, endpoint)
}
listenerRes = []types.Resource{listeners}
routeConfigRes = []types.Resource{routeConfig}
listenerRes = []types.Resource{}
for _, listener := range listeners {
listenerRes = append(listenerRes, listener)
}
routeConfigRes = []types.Resource{}
for _, routeConfig := range routeConfigs {
routeConfigRes = append(routeConfigRes, routeConfig)
}
return listenerRes, clusterRes, routeConfigRes, endpointRes
}

Expand Down
4 changes: 0 additions & 4 deletions adapter/internal/oasparser/envoyconf/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ const (
compressorFilterName string = "envoy.filters.http.compressor"
)

const (
defaultHTTPSListenerName string = "httpslistener"
)

// cluster prefixes
const (
requestInterceptClustersNamePrefix string = "reqInterceptor"
Expand Down
Loading

0 comments on commit b81de35

Please sign in to comment.