diff --git a/management-server/cmd/main.go b/management-server/cmd/main.go index 9b10a9e5b..2637c0464 100644 --- a/management-server/cmd/main.go +++ b/management-server/cmd/main.go @@ -21,7 +21,6 @@ import ( "os" "os/signal" - "github.com/wso2/apk/management-server/internal/database" server "github.com/wso2/apk/management-server/internal/grpc-server" "github.com/wso2/apk/management-server/internal/logger" "github.com/wso2/apk/management-server/internal/notification" @@ -33,9 +32,7 @@ func main() { logger.LoggerServer.Info("Starting Management server ...") sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt) - // connect to the postgres database - database.ConnectToDB() - defer database.CloseDBConn() + go xds.InitAPKMgtServer() go synchronizer.ProcessApplicationEvents() diff --git a/management-server/go.mod b/management-server/go.mod index aa6d910ce..991614566 100644 --- a/management-server/go.mod +++ b/management-server/go.mod @@ -4,11 +4,10 @@ go 1.19 require ( github.com/envoyproxy/go-control-plane v0.11.2-0.20230802074621-eea0b3bd0f81 - github.com/jackc/pgx/v5 v5.3.1 github.com/pelletier/go-toml v1.9.5 github.com/sirupsen/logrus v1.9.0 - github.com/wso2/apk/adapter v0.0.0-20230313062104-25216c8acbc5 - google.golang.org/grpc v1.57.0 + github.com/wso2/apk/adapter v0.0.0-20231214082511-af2c8b8a19f1 + google.golang.org/grpc v1.58.3 google.golang.org/protobuf v1.31.0 ) @@ -17,16 +16,11 @@ replace github.com/wso2/apk/adapter => ../adapter require ( github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 // indirect - github.com/envoyproxy/protoc-gen-validate v1.0.1 // indirect + github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/puddle/v2 v2.2.0 // indirect - golang.org/x/crypto v0.7.0 // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/net v0.19.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/genproto v0.0.0-20230731193218-e0aa005b6bdf // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d // indirect diff --git a/management-server/go.sum b/management-server/go.sum index a441e3a17..9ac948109 100644 --- a/management-server/go.sum +++ b/management-server/go.sum @@ -13,8 +13,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.11.2-0.20230802074621-eea0b3bd0f81 h1:dx15VeDt3L5Z0Wx28jXbwgpeTrLsVvqC/wSvNgYPb/k= github.com/envoyproxy/go-control-plane v0.11.2-0.20230802074621-eea0b3bd0f81/go.mod h1:zV+ml0OfGpQxGvM1qlmhvZzE9ShvBO7CPWzGb3q5cog= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v1.0.1 h1:kt9FtLiooDc0vbwTLhdg3dyNX1K9Qwa1EK9LcD4jVUQ= -github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs= +github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= +github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -25,14 +25,6 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU= -github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= -github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk= -github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -41,12 +33,9 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= -golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -55,22 +44,20 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= @@ -89,8 +76,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d/go. google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= +google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= diff --git a/management-server/internal/database/app_cache.go b/management-server/internal/database/app_cache.go deleted file mode 100644 index 43c13d4f6..000000000 --- a/management-server/internal/database/app_cache.go +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright (c) 2022, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 LLC. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package database - -import ( - "errors" - "sync" - "time" - - apkmgt "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/apkmgt" - "github.com/wso2/apk/management-server/internal/config" - "github.com/wso2/apk/management-server/internal/logger" -) - -// CachedApplication is an application with an expiry timestamp -type CachedApplication struct { - application *apkmgt.Application - expireAtTimestamp int64 -} - -// ApplicationLocalCache is data holder for cached applications -type ApplicationLocalCache struct { - stop chan struct{} - - wg sync.WaitGroup - mu sync.RWMutex - apps map[string]CachedApplication -} - -var cleanupInterval time.Duration -var ttl time.Duration - -func init() { - conf := config.ReadConfigs() - cleanupInterval, _ = time.ParseDuration(conf.Database.DbCache.CleanupInterval) - ttl, _ = time.ParseDuration(conf.Database.DbCache.TTL) - DbCache = NewApplicationLocalCache(cleanupInterval) -} - -// NewApplicationLocalCache creates an application local cache -func NewApplicationLocalCache(cleanupInterval time.Duration) *ApplicationLocalCache { - lc := &ApplicationLocalCache{ - apps: make(map[string]CachedApplication), - stop: make(chan struct{}), - } - - lc.wg.Add(1) - go func(cleanupInterval time.Duration) { - defer lc.wg.Done() - lc.cleanupLoop(cleanupInterval) - }(cleanupInterval) - return lc -} - -func (lc *ApplicationLocalCache) cleanupLoop(interval time.Duration) { - t := time.NewTicker(interval) - defer t.Stop() - - for { - select { - case <-lc.stop: - return - case <-t.C: - lc.mu.Lock() - for uid, cu := range lc.apps { - if cu.expireAtTimestamp <= time.Now().Unix() { - delete(lc.apps, uid) - } - } - lc.mu.Unlock() - } - } -} - -func (lc *ApplicationLocalCache) stopCleanup() { - close(lc.stop) - lc.wg.Wait() -} - -// Update updates the expiry timestamp of an existing application in the cache -func (lc *ApplicationLocalCache) Update(u *apkmgt.Application, expireAtTimestamp int64) { - lc.mu.Lock() - defer lc.mu.Unlock() - - lc.apps[u.Uuid] = CachedApplication{ - application: u, - expireAtTimestamp: expireAtTimestamp, - } - logger.LoggerDatabase.Infof("Cache updated successfully.. cache: %v", lc.apps) -} - -// UpdateSubscriptionInApplication updates the subscription of an application in the cache -func (lc *ApplicationLocalCache) UpdateSubscriptionInApplication(appUUID string, s *apkmgt.Subscription) error { - if app, ok := lc.apps[appUUID]; ok { - for i, sub := range app.application.Subscriptions { - if sub.Uuid == s.Uuid { - lc.apps[appUUID].application.Subscriptions[i] = s - return nil - } - } - app.expireAtTimestamp = time.Now().Unix() + ttl.Microseconds() - } else { - return ErrApplicationNotInCache - } - return ErrSubscriptionNotInAppCache -} - -// AddSubscriptionForApplication adds a subscription to an application in the cache -func (lc *ApplicationLocalCache) AddSubscriptionForApplication(appUUID string, s *apkmgt.Subscription) error { - if app, ok := lc.apps[appUUID]; ok { - app.application.Subscriptions = append(app.application.Subscriptions, s) - app.expireAtTimestamp = time.Now().Unix() + ttl.Microseconds() - return nil - } - return ErrApplicationNotInCache -} - -// DeleteSubscriptionFromApplication deletes a subscription from an application in the cache -func (lc *ApplicationLocalCache) DeleteSubscriptionFromApplication(appUUID, subUUID string) error { - if app, ok := lc.apps[appUUID]; ok { - for i, sub := range app.application.Subscriptions { - if sub.Uuid == subUUID { - app.application.Subscriptions[i] = app.application.Subscriptions[len(app.application.Subscriptions)-1] - app.application.Subscriptions = app.application.Subscriptions[:len(app.application.Subscriptions)-1] - return nil - } - } - app.expireAtTimestamp = time.Now().Unix() + ttl.Microseconds() - } else { - return ErrApplicationNotInCache - } - return ErrSubscriptionNotInAppCache -} - -var ( - // ErrApplicationNotInCache is the error returned when application is not present in the cache - ErrApplicationNotInCache = errors.New("unable to find application in cache") - // ErrSubscriptionNotInAppCache is the error returned when subscription is not present in the cache - ErrSubscriptionNotInAppCache = errors.New("unable to find subscription in application cache") -) - -// Read returns an applicaiton found in the in the cache with the given application id -func (lc *ApplicationLocalCache) Read(id string) (*apkmgt.Application, error) { - lc.mu.RLock() - defer lc.mu.RUnlock() - - cu, ok := lc.apps[id] - if !ok { - return &apkmgt.Application{}, ErrApplicationNotInCache - } - - return cu.application, nil -} - -// ReadAll returns all the applications in the cache -func (lc *ApplicationLocalCache) ReadAll() ([]*apkmgt.Application, error) { - var apps []*apkmgt.Application - - for _, app := range lc.apps { - apps = append(apps, app.application) - } - return apps, nil -} - -// Delete deletes an applicaton in the cache using the given id -func (lc *ApplicationLocalCache) Delete(id string) { - lc.mu.Lock() - defer lc.mu.Unlock() - - delete(lc.apps, id) -} diff --git a/management-server/internal/database/dao.go b/management-server/internal/database/dao.go deleted file mode 100644 index b2ac5a0cc..000000000 --- a/management-server/internal/database/dao.go +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Copyright (c) 2022, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package database - -import ( - "encoding/json" - "fmt" - "time" - - apkmgt "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/apkmgt" - apiProtos "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/service/apkmgt" - "github.com/wso2/apk/adapter/pkg/logging" - "github.com/wso2/apk/management-server/internal/logger" -) - -// DbCache is a pointer to an ApplicationLocalCache -var DbCache *ApplicationLocalCache - -func init() { - DbCache = NewApplicationLocalCache(cleanupInterval) -} - -type artifact struct { - APIName string `json:"apiName"` - ID string `json:"id"` - Context string `json:"context"` - Version string `json:"version"` - ProviderName string `json:"providerName"` - Status string `json:"status"` -} - -// GetApplicationByUUID retrives an application using uuid and returns it -func GetApplicationByUUID(uuid string) (*apkmgt.Application, error) { - rows, _ := ExecDBQuery(queryGetApplicationByUUID, uuid) - rows.Next() - values, err := rows.Values() - if err != nil { - return nil, err - } - subs, _ := getSubscriptionsForApplication(uuid) - keys, _ := getConsumerKeysForApplication(uuid) - application := &apkmgt.Application{ - Uuid: values[0].(string), - Name: values[1].(string), - Owner: "", //ToDo : Check how to get Owner from db - Attributes: nil, //ToDo : check the values for Attributes - Subscriber: "", - Organization: values[3].(string), - Subscriptions: subs, - ConsumerKeys: keys, - } - DbCache.Update(application, time.Now().Unix()+ttl.Microseconds()) - return application, nil -} - -// GetCachedApplicationByUUID returns the Application details from the cache. -// If the application is not available in the cache, it will fetch the application from DB. -func GetCachedApplicationByUUID(uuid string) (*apkmgt.Application, error) { - if app, ok := DbCache.Read(uuid); ok == nil { - return app, nil - } - return GetApplicationByUUID(uuid) -} - -// getSubscriptionsForApplication returns all subscriptions from DB, for a given application. -func getSubscriptionsForApplication(appUUID string) ([]*apkmgt.Subscription, error) { - rows, _ := ExecDBQuery(queryGetAllSubscriptionsForApplication, appUUID) - var subs []*apkmgt.Subscription - for rows.Next() { - values, err := rows.Values() - if err != nil { - return nil, err - } - subs = append(subs, &apkmgt.Subscription{ - Uuid: values[0].(string), - ApiUuid: values[1].(string), - PolicyId: "", - SubscriptionStatus: values[3].(string), - Organization: values[4].(string), - CreatedBy: values[5].(string), - }) - } - return subs, nil -} - -// getConsumerKeysForApplication returns all Consumer Keys from DB, for a given application. -func getConsumerKeysForApplication(appUUID string) ([]*apkmgt.ConsumerKey, error) { - rows, _ := ExecDBQuery(queryConsumerKeysForApplication, appUUID) - var keys []*apkmgt.ConsumerKey - for rows.Next() { - values, err := rows.Values() - if err != nil { - return nil, err - } - keys = append(keys, &apkmgt.ConsumerKey{ - Key: values[0].(string), - KeyManager: values[1].(string), - }) - } - return keys, nil -} - -// GetSubscriptionByUUID returns the Application details from the DB for a given subscription UUID. -func GetSubscriptionByUUID(subUUID string) (*apkmgt.Subscription, error) { - rows, _ := ExecDBQuery(querySubscriptionByUUID, subUUID) - rows.Next() - values, err := rows.Values() - if err != nil { - return nil, err - } - return &apkmgt.Subscription{ - Uuid: values[0].(string), - ApiUuid: values[1].(string), - PolicyId: "", - SubscriptionStatus: values[2].(string), - Organization: values[3].(string), - CreatedBy: values[4].(string), - }, nil -} - -// CreateAPI creates an API in the DB -func CreateAPI(api *apiProtos.API) error { - _, err := ExecDBQuery(queryCreateAPI, &api.Uuid, &api.Name, &api.Provider, - &api.Version, &api.BasePath, &api.OrganizationId, &api.CreatedBy, time.Now(), &api.Type, marshalArtifact(api), "PUBLISHED") - - if err != nil { - logger.LoggerDatabase.ErrorC(logging.ErrorDetails{ - Message: fmt.Sprintf("Error creating API %q, Error: %v", api.Uuid, err.Error()), - Severity: logging.CRITICAL, - ErrorCode: 1201, - }) - return err - } - return nil -} - -// UpdateAPI updates the given API in the DB -func UpdateAPI(api *apiProtos.API) error { - _, err := ExecDBQuery(queryUpdateAPI, &api.Uuid, &api.Name, &api.Provider, - &api.Version, &api.BasePath, &api.OrganizationId, &api.UpdatedBy, time.Now(), &api.Type, marshalArtifact(api), "PUBLISHED") - if err != nil { - logger.LoggerDatabase.ErrorC(logging.ErrorDetails{ - Message: fmt.Sprintf("Error updating API %q, Error: %v", api.Uuid, err.Error()), - Severity: logging.CRITICAL, - ErrorCode: 1202, - }) - return err - } - return nil -} - -// DeleteAPI deletes the given API in the DB -func DeleteAPI(api *apiProtos.API) error { - _, err := ExecDBQuery(queryDeleteAPI, api.Uuid) - if err != nil { - logger.LoggerDatabase.ErrorC(logging.ErrorDetails{ - Message: fmt.Sprintf("Error deleting API %q, Error: %v", api.Uuid, err.Error()), - Severity: logging.CRITICAL, - ErrorCode: 1203, - }) - return err - } - return nil -} - -func marshalArtifact(api *apiProtos.API) string { - artifact := &artifact{APIName: api.Name, - ID: api.Uuid, - Context: api.BasePath, - Version: api.Version, - ProviderName: api.Provider, - Status: "PUBLISHED", - } - jsonString, err := json.Marshal(artifact) - if err != nil { - return "{}" - } - return string(jsonString) -} diff --git a/management-server/internal/database/database.go b/management-server/internal/database/database.go deleted file mode 100644 index af4a1c98c..000000000 --- a/management-server/internal/database/database.go +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2022, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package database - -import ( - "context" - "fmt" - - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/wso2/apk/adapter/pkg/logging" - "github.com/wso2/apk/management-server/internal/config" - "github.com/wso2/apk/management-server/internal/logger" -) - -var dbPool *pgxpool.Pool - -// ConnectToDB creates the DB connection -func ConnectToDB() { - conf := config.ReadConfigs() - var err error - connString := fmt.Sprintf("postgresql://%s:%s@%s:%d/%s?pool_max_conns=%d&pool_min_conns=%d&"+ - "pool_max_conn_lifetime=%s&pool_max_conn_idle_time=%s&pool_health_check_period=%s&pool_max_conn_lifetime_jitter=%s", - conf.Database.Username, - conf.Database.Password, - conf.Database.Host, - conf.Database.Port, - conf.Database.Name, - conf.Database.PoolOptions.PoolMaxConns, - conf.Database.PoolOptions.PoolMinConns, - conf.Database.PoolOptions.PoolMaxConnLifetime, - conf.Database.PoolOptions.PoolMaxConnIdleTime, - conf.Database.PoolOptions.PoolHealthCheckPeriod, - conf.Database.PoolOptions.PoolMaxConnLifetimeJitter) - dbPool, err = pgxpool.New(context.Background(), connString) - if err != nil { - logger.LoggerDatabase.ErrorC(logging.ErrorDetails{ - Message: fmt.Sprintf("Unable to connect to database: %v", err.Error()), - Severity: logging.CRITICAL, - ErrorCode: 1100, - }) - } -} - -// ExecDBQuery executes a given database query with the arguments provided -func ExecDBQuery(query string, args ...interface{}) (pgx.Rows, error) { - rows, err := dbPool.Query(context.Background(), query, args...) - if err != nil { - return nil, err - } - return rows, nil -} - -// IsAliveConn checks whether the DB connections pool is alive -func IsAliveConn(ctx context.Context) (isAlive bool) { - if err := dbPool.Ping(ctx); err != nil { - return true - } - return isAlive -} - -// CloseDBConn closes the DB connections pool -func CloseDBConn() { - dbPool.Close() -} diff --git a/management-server/internal/database/queries.go b/management-server/internal/database/queries.go deleted file mode 100644 index 3c2b4cbed..000000000 --- a/management-server/internal/database/queries.go +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2022, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package database - -const ( - queryGetApplicationByUUID string = " SELECT " + - " APP.UUID," + - " APP.NAME," + - " APP.SUBSCRIBER_ID," + - " APP.ORGANIZATION ORGANIZATION," + - " SUB.USER_ID " + - " FROM " + - " SUBSCRIBER SUB," + - " APPLICATION APP " + - " WHERE " + - " APP.UUID = $1 " + - " AND APP.SUBSCRIBER_ID = SUB.SUBSCRIBER_ID" - - queryGetAllSubscriptionsForApplication string = "select " + - " SUB.uuid as UUID, " + - " API.api_uuid as API_UUID, " + - " API.api_version as API_VERSION, " + - " SUB.sub_status as SUB_STATUS, " + - " APP.organization as ORGANIZATION, " + - " SUB.created_by as CREATED_BY " + - " FROM " + - " APPLICATION APP, SUBSCRIPTION SUB, API API " + - " where 1 = 1 " + - " AND APP.application_id = SUB.application_id " + - " AND SUB.api_id = API.api_id " + - " AND APP.uuid = $1" - - queryConsumerKeysForApplication string = "select " + - " APPKEY.consumer_key, " + - " APPKEY.key_manager " + - " from " + - " application_key_mapping APPKEY, " + - " application APP " + - " where 1=1 " + - " AND APP.application_id = APPKEY.application_id " + - " AND APP.UUID = $1" - - querySubscriptionByUUID string = "select " + - " SUB.uuid, " + - " API.api_uuid, " + - " SUB.sub_status, " + - " API.organization, " + - " SUB.created_by " + - " from " + - " subscription SUB, " + - " api API " + - " where 1=1 " + - " AND SUB.api_id = API.api_id " + - " AND SUB.uuid = $1" - - queryCreateAPI string = "INSERT INTO API " + - "(API_UUID, API_NAME, API_PROVIDER, API_VERSION," + - "CONTEXT, ORGANIZATION, CREATED_BY, CREATED_TIME, API_TYPE, ARTIFACT, STATUS)" + - " VALUES " + "($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" - - queryDeleteAPI string = "DELETE FROM API" + - " WHERE " + - "API_UUID = $1" - - queryUpdateAPI string = "UPDATE API SET " + - "API_NAME = $2, " + - "API_PROVIDER = $3, " + - "API_VERSION = $4, " + - "CONTEXT = $5, " + - "ORGANIZATION = $6, " + - "UPDATED_BY = $7, " + - "UPDATED_TIME = $8, " + - "API_TYPE = $9, " + - "ARTIFACT = $10, " + - "STATUS = $11" + - " WHERE " + - "API_UUID = $1" -) diff --git a/management-server/internal/logger/logging.go b/management-server/internal/logger/logging.go index 681110474..bc052b78d 100644 --- a/management-server/internal/logger/logging.go +++ b/management-server/internal/logger/logging.go @@ -37,7 +37,6 @@ const ( pkgXdsServer = "github.com/wso2/apk/management-server/xds/server" pkgMGTServer = "github.com/wso2/apk/management-server/internal/grpc-server" pkgNotificationServer = "github.com/wso2/apk/management-server/internal/notification" - pkgDatabase = "github.com/wso2/apk/management-server/internal/database" ) // logger package references @@ -61,6 +60,5 @@ func UpdateLoggers() { LoggerXdsServer = logging.InitPackageLogger(pkgXdsServer) LoggerMGTServer = logging.InitPackageLogger(pkgMGTServer) LoggerNotificationServer = logging.InitPackageLogger(pkgNotificationServer) - LoggerDatabase = logging.InitPackageLogger(pkgDatabase) logrus.Info("Updated loggers") } diff --git a/management-server/internal/xds/server.go b/management-server/internal/xds/server.go index bf602ea08..e3932dda9 100644 --- a/management-server/internal/xds/server.go +++ b/management-server/internal/xds/server.go @@ -27,22 +27,17 @@ import ( "sync" "time" - "github.com/wso2/apk/management-server/internal/database" - corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - sub_service "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/service/subscription" internal_application "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/subscription" wso2_cache "github.com/wso2/apk/adapter/pkg/discovery/protocol/cache/v3" wso2_resource "github.com/wso2/apk/adapter/pkg/discovery/protocol/resource/v3" - wso2_server "github.com/wso2/apk/adapter/pkg/discovery/protocol/server/v3" "github.com/wso2/apk/adapter/pkg/logging" "github.com/wso2/apk/adapter/pkg/utils/tlsutils" "github.com/wso2/apk/management-server/internal/config" "github.com/wso2/apk/management-server/internal/logger" internal_types "github.com/wso2/apk/management-server/internal/types" "github.com/wso2/apk/management-server/internal/utils" - "github.com/wso2/apk/management-server/internal/xds/callbacks" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) @@ -107,13 +102,13 @@ func AddSingleApplication(label string, application internal_types.ApplicationEv // } // } convertedApplication := &internal_application.Application{ - Uuid: application.UUID, - Name: application.Name, + Uuid: application.UUID, + Name: application.Name, // Policy: application.Policy, - Owner: application.Owner, + Owner: application.Owner, // Organization: application.Organization, // Keys: appKeys, - Attributes: application.Attributes, + Attributes: application.Attributes, } logger.LoggerXds.Debugf("Converted Application: %v", convertedApplication) @@ -180,58 +175,6 @@ func RemoveApplication(label, appUUID string) { logger.LoggerXds.Errorf("Application : %s is not found within snapshot for label %s", appUUID, label) } -// AddMultipleApplications adds the applications specified in applicationEventArray to the xds cache -// This will ideally be used to populate all applications in the startup of the mgt server. -func AddMultipleApplications(applicationEventArray []*internal_types.ApplicationEvent) { - snapshotMap := make(map[string]*wso2_cache.Snapshot) - version := rand.Intn(maxRandomInt) - - for _, event := range applicationEventArray { - label := event.Label - appUUID := event.UUID - - application, err := database.GetApplicationByUUID(appUUID) - if err != nil { - logger.LoggerDatabase.ErrorC(logging.ErrorDetails{ - Message: fmt.Sprintf("Error retrieving application for uuid : %s from database error: %v, "+ - "hence skipping add to xdx cache", appUUID, err), - Severity: logging.MINOR, - ErrorCode: 1101, - }) - continue - } - - snapshotEntry, snapshotFound := snapshotMap[label] - var newSnapshot wso2_cache.Snapshot - - if !snapshotFound { - newSnapshot, _ = wso2_cache.NewSnapshot(fmt.Sprint(version), map[wso2_resource.Type][]types.Resource{ - wso2_resource.ApplicationType: {application}, - }) - snapshotEntry = &newSnapshot - snapshotMap[label] = &newSnapshot - } else { - // error occurs if no snapshot is under the provided label - resourceMap := snapshotEntry.GetResourcesAndTTL(wso2_resource.ApplicationType) - resourceMap[appUUID] = types.ResourceWithTTL{ - Resource: application, - } - appResources := convertResourceMapToArray(resourceMap) - newSnapshot, _ = wso2_cache.NewSnapshot(fmt.Sprint(version), map[wso2_resource.Type][]types.Resource{ - wso2_resource.ApplicationType: appResources, - }) - snapshotMap[label] = &newSnapshot - } - } - applicationCacheMutex.Lock() - defer applicationCacheMutex.Unlock() - for label, snapshotEntry := range snapshotMap { - applicationCache.SetSnapshot(context.Background(), label, *snapshotEntry) - introducedLabels[label] = true - logger.LoggerXds.Infof("Application Snaphsot is updated for label %s with the version %d.", label, version) - } -} - func convertResourceMapToArray(resourceMap map[string]types.ResourceWithTTL) []types.Resource { var appResources []types.Resource for _, res := range resourceMap { @@ -275,10 +218,8 @@ func SetEmptySnapshot(label string) error { // InitAPKMgtServer initializes the APK management server func InitAPKMgtServer() { - ctx, cancel := context.WithCancel(context.Background()) + _, cancel := context.WithCancel(context.Background()) defer cancel() - apkMgtAPIDsSrv := wso2_server.NewServer(ctx, applicationCache, &callbacks.Callbacks{}) - subSrv := wso2_server.NewServer(ctx, subscriptionCache, &callbacks.Callbacks{}) publicKeyLocation, privateKeyLocation, truststoreLocation := utils.GetKeyLocations() cert, err := tlsutils.GetServerCertificate(publicKeyLocation, privateKeyLocation) if err != nil { @@ -298,8 +239,6 @@ func InitAPKMgtServer() { }), )) grpcServer := grpc.NewServer(grpcOptions...) - sub_service.RegisterApplicationDiscoveryServiceServer(grpcServer, apkMgtAPIDsSrv) - sub_service.RegisterSubscriptionDiscoveryServiceServer(grpcServer, subSrv) config := config.ReadConfigs() port := config.ManagementServer.XDSPort @@ -326,12 +265,12 @@ func InitAPKMgtServer() { // AddSingleSubscription will update the Subscription specified by the UUID to the xds cache func AddSingleSubscription(label string, subscription internal_types.SubscriptionEvent) { convertedSubscription := &internal_application.Subscription{ - Uuid: subscription.UUID, + Uuid: subscription.UUID, // ApplicationRef: subscription.ApplicationRef, // ApiRef: subscription.APIRef, - SubStatus: subscription.SubStatus, + SubStatus: subscription.SubStatus, // PolicyId: subscription.PolicyID, - Organization: subscription.Organization, + Organization: subscription.Organization, // Subscriber: subscription.Subscriber, // Timetamp: subscription.TimeStamp, }