From c7081e266a1a1f7debdd100dc13684deb91baa12 Mon Sep 17 00:00:00 2001 From: Aristotle <11413392+tuxranger@users.noreply.github.com> Date: Wed, 17 Mar 2021 16:18:35 -0400 Subject: [PATCH] motiv-labs/janus#463 filled out logic for cassandra oauth plugin. fixed broken queries. got cassandra generally working it seems --- Dockerfile | 2 + Dockerfile.dev | 10 + cassandra/schema.sql | 5 + cassandra/session.go | 12 +- .../front-proxy-cassandra/apis/example.json | 14 ++ .../front-proxy-cassandra/docker-compose.yml | 34 +++ examples/front-proxy-cassandra/janus.toml | 22 ++ examples/front-proxy-mongo/apis/example.json | 33 +-- examples/front-proxy-mongo/docker-compose.yml | 24 +- go.mod | 1 + pkg/api/cassandra_repository.go | 154 +++++++++--- pkg/api/repository.go | 4 +- pkg/plugin/oauth2/cassandra_repository.go | 229 ++++++++++++++++++ pkg/plugin/oauth2/setup.go | 9 +- 14 files changed, 472 insertions(+), 81 deletions(-) create mode 100644 examples/front-proxy-cassandra/apis/example.json create mode 100644 examples/front-proxy-cassandra/docker-compose.yml create mode 100644 examples/front-proxy-cassandra/janus.toml create mode 100644 pkg/plugin/oauth2/cassandra_repository.go diff --git a/Dockerfile b/Dockerfile index 3905a52cf..05c60a15d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,7 @@ FROM ubuntu:20.04 +COPY cassandra/schema.sql /usr/local/bin + ADD janus /bin/janus RUN chmod a+x /bin/janus && \ mkdir -p /etc/janus/apis && \ diff --git a/Dockerfile.dev b/Dockerfile.dev index aef85f06d..99b8acd37 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -6,6 +6,15 @@ WORKDIR /janus COPY . ./ +######## Set Up github credentials to we can access private modules ######## +RUN apk add git +ARG GITHUB_USER +ENV GITHUB_USER=$GITHUB_USER +RUN echo $GITHUB_USER +ARG GITHUB_TOKEN +ENV GITHUB_TOKEN=$GITHUB_TOKEN +RUN git config --global url."https://$GITHUB_USER:$GITHUB_TOKEN@github.com".insteadOf "https://github.com" + # Add tooling to install GCC RUN apk add build-base # Add cqlsh to the image. @@ -38,6 +47,7 @@ HEALTHCHECK --interval=5s --timeout=5s --retries=3 CMD curl -f http://localhost: FROM build-debug-common as dev EXPOSE 8080 8081 8443 8444 40000 COPY entry-dev.sh /usr/local/bin +COPY cassandra/schema.sql /usr/local/bin RUN chmod 755 /usr/local/bin/entry-dev.sh ENTRYPOINT ["/usr/local/bin/entry-dev.sh"] #ENTRYPOINT ["/janus", "start"] diff --git a/cassandra/schema.sql b/cassandra/schema.sql index 2f481f3d4..09bab51a2 100644 --- a/cassandra/schema.sql +++ b/cassandra/schema.sql @@ -10,3 +10,8 @@ CREATE TABLE IF NOT EXISTS janus.api_definition ( name text, definition text, PRIMARY KEY (name)); + +CREATE TABLE IF NOT EXISTS janus.oauth ( + name text, + oauth text, + PRIMARY KEY (name)); diff --git a/cassandra/session.go b/cassandra/session.go index c3c6512b0..6579b7cfd 100644 --- a/cassandra/session.go +++ b/cassandra/session.go @@ -2,6 +2,7 @@ package cassandra import ( "github.com/motiv-labs/cassandra" + "github.com/opentracing/opentracing-go" ) const ( @@ -19,15 +20,12 @@ const ( var sessionHolder cassandra.Holder func GetSession() cassandra.SessionInterface { - //span := opentracing.StartSpan("GetSession", opentracing.ChildOf(parentSpan.Context())) - //defer span.Finish() - //span.SetTag("Package", "cassandra") - return sessionHolder.GetSession(nil) + span := opentracing.StartSpan("GetSession") + defer span.Finish() + span.SetTag("Package", "cassandra") + return sessionHolder.GetSession(span) } func SetSessionHolder(holder cassandra.Holder) { - //span := opentracing.StartSpan("SetSessionHolder", opentracing.ChildOf(parentSpan.Context())) - //defer span.Finish() - //span.SetTag("Package", "cassandra") sessionHolder = holder } diff --git a/examples/front-proxy-cassandra/apis/example.json b/examples/front-proxy-cassandra/apis/example.json new file mode 100644 index 000000000..603a6ac8c --- /dev/null +++ b/examples/front-proxy-cassandra/apis/example.json @@ -0,0 +1,14 @@ +{ + "name" : "my-endpoint", + "active" : true, + "proxy" : { + "listen_path" : "/example/*", + "upstreams" : { + "balancing": "roundrobin", + "targets": [ + {"target": "http://www.mocky.io/v2/595625d22900008702cd71e8"} + ] + }, + "methods" : ["GET"] + } +} diff --git a/examples/front-proxy-cassandra/docker-compose.yml b/examples/front-proxy-cassandra/docker-compose.yml new file mode 100644 index 000000000..b178bba3f --- /dev/null +++ b/examples/front-proxy-cassandra/docker-compose.yml @@ -0,0 +1,34 @@ +version: '3' +services: + + janus: + image: motivlabs/janus:debug + ports: + - "8080:8080" + - "8081:8081" + - "40000:40000" + environment: + - debug=1 + depends_on: + - service1 + - janus-database + volumes: + - ./janus.toml:/etc/janus/janus.toml + - ~/dev/motiv/janus:/janus + + janus-database: + image: cassandra:latest + container_name: db + ports: + - "9042:9042" + environment: + - MAX_HEAP_SIZE=1G + - HEAP_NEWSIZE=250M + - JAVA_OPTS="-Dcassandra.skip_wait_for_gossip_to_settle=0 -Dcassandra.num_tokens=1 -Dcassandra.initial_token=1" + + service1: + image: rodolpheche/wiremock + ports: + - '9089:8080' + volumes: + - ../front-proxy/stubs:/home/wiremock/mappings diff --git a/examples/front-proxy-cassandra/janus.toml b/examples/front-proxy-cassandra/janus.toml new file mode 100644 index 000000000..aea3794b5 --- /dev/null +++ b/examples/front-proxy-cassandra/janus.toml @@ -0,0 +1,22 @@ +################################################################ +# Global configuration +################################################################ +port = 8080 + +[log] + level = "debug" + +################################################################ +# API configuration backend +################################################################ +[web] + port = 8081 + + [web.credentials] + secret = "secret" + + [web.credentials.basic] + users = {admin = "admin"} + +[database] + dsn = "cassandra:///db/system/janus/300" diff --git a/examples/front-proxy-mongo/apis/example.json b/examples/front-proxy-mongo/apis/example.json index 433984a22..603a6ac8c 100644 --- a/examples/front-proxy-mongo/apis/example.json +++ b/examples/front-proxy-mongo/apis/example.json @@ -1,23 +1,14 @@ -[ - { - "name" : "example", - "active" : true, - "proxy" : { - "preserve_host" : false, - "listen_path" : "/example/*", - "upstreams" : { - "balancing": "roundrobin", - "targets": [ - {"target": "http://service1:8080/"} - ] - }, - "strip_path" : false, - "append_path" : false, - "methods" : ["GET"] +{ + "name" : "my-endpoint", + "active" : true, + "proxy" : { + "listen_path" : "/example/*", + "upstreams" : { + "balancing": "roundrobin", + "targets": [ + {"target": "http://www.mocky.io/v2/595625d22900008702cd71e8"} + ] }, - "health_check": { - "url": "http://service1:8080/status", - "timeout": 3 - } + "methods" : ["GET"] } -] +} diff --git a/examples/front-proxy-mongo/docker-compose.yml b/examples/front-proxy-mongo/docker-compose.yml index 36d97c536..97073b804 100644 --- a/examples/front-proxy-mongo/docker-compose.yml +++ b/examples/front-proxy-mongo/docker-compose.yml @@ -2,15 +2,19 @@ version: '3' services: janus: - image: hellofreshtech/janus + image: motivlabs/janus:debug ports: - - "8080:8080" - - "8081:8081" + - "8082:8080" + - "8083:8081" + - "40001:40000" depends_on: - service1 - janus-database + environment: + - debug=1 volumes: - ./janus.toml:/etc/janus/janus.toml + - ~/dev/motiv/janus:/janus janus-database: image: mongo @@ -18,16 +22,16 @@ services: - "27017:27017" # This container is just a helper to seed the database - mongo-seed: - build: - context: . - dockerfile: seed.Dockerfile - depends_on: - - janus-database +# mongo-seed: +# build: +# context: . +# dockerfile: seed.Dockerfile +# depends_on: +# - janus-database service1: image: rodolpheche/wiremock ports: - - '9089:8080' + - '9090:8080' volumes: - ../front-proxy/stubs:/home/wiremock/mappings diff --git a/go.mod b/go.mod index 05193e248..c526ae1a1 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.1.2 github.com/motiv-labs/cassandra v0.0.0-20210126221137-4ac871dd211e + github.com/opentracing/opentracing-go v1.2.0 github.com/rafaeljesus/retry-go v0.0.0-20171214204623-5981a380a879 github.com/rs/cors v1.4.0 github.com/sirupsen/logrus v1.7.0 diff --git a/pkg/api/cassandra_repository.go b/pkg/api/cassandra_repository.go index dd31227b7..e63362b7a 100644 --- a/pkg/api/cassandra_repository.go +++ b/pkg/api/cassandra_repository.go @@ -1,8 +1,11 @@ package api import ( + "context" + "encoding/json" cass "github.com/hellofresh/janus/cassandra" cassmod "github.com/motiv-labs/cassandra" + "github.com/opentracing/opentracing-go" log "github.com/sirupsen/logrus" "strconv" "strings" @@ -18,12 +21,13 @@ type CassandraRepository struct { } func NewCassandraRepository(dsn string, refreshTime time.Duration) (*CassandraRepository, error) { - log.Debugf("getting new cassandra repo") - //span := opentracing.StartSpan("NewCassandraRepository") - //defer span.Finish() - //span.SetTag("Interface", "CassandraRepository") + log.Debugf("getting new api cassandra repo") + span := opentracing.StartSpan("NewCassandraRepository") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") // parse the dsn string for the cluster host, system key space, app key space and connection timeout. + log.Infof("dsn is %s", dsn) clusterHost, systemKeyspace, appKeyspace, connectionTimeout := parseDSN(dsn) if clusterHost == "" { clusterHost = cass.ClusterHostName @@ -39,17 +43,17 @@ func NewCassandraRepository(dsn string, refreshTime time.Duration) (*CassandraRe } // Wait for Cassandra to start, setup Cassandra keyspace if required - cassmod.Initialize(cass.ClusterHostName, cass.SystemKeyspace, cass.AppKeyspace, cass.Timeout*time.Second, nil) + cassmod.Initialize(cass.ClusterHostName, cass.SystemKeyspace, cass.AppKeyspace, cass.Timeout*time.Second, span) // Getting a cassandra connection initializer - initializer := cassmod.New(cass.ClusterHostName, cass.AppKeyspace, nil) + initializer := cassmod.New(cass.ClusterHostName, cass.AppKeyspace, span) // Starting a new cassandra session - sessionHolder, err := initializer.NewSession(nil) + sessionHolder, err := initializer.NewSession(span) if err != nil { panic(err) } - // Global session for Janus + // api cassandra repo session cass.SetSessionHolder(sessionHolder) return &CassandraRepository{ @@ -60,30 +64,88 @@ func NewCassandraRepository(dsn string, refreshTime time.Duration) (*CassandraRe } func (r *CassandraRepository) Close() error { - //span := opentracing.StartSpan("Close") - //defer span.Finish() - //span.SetTag("Interface", "CassandraRepository") + span := opentracing.StartSpan("Close") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") // Close the session - r.session.CloseSession(nil) + r.session.CloseSession(span) return nil } +// Listen watches for changes on the configuration +func (r *CassandraRepository) Listen(ctx context.Context, cfgChan <-chan ConfigurationMessage) { + go func() { + log.Debug("Listening for changes on the provider...") + for { + select { + case cfg := <-cfgChan: + switch cfg.Operation { + case AddedOperation: + err := r.add(cfg.Configuration) + if err != nil { + log.WithError(err).Error("Could not add the configuration on the provider") + } + case UpdatedOperation: + err := r.add(cfg.Configuration) + if err != nil { + log.WithError(err).Error("Could not update the configuration on the provider") + } + case RemovedOperation: + err := r.remove(cfg.Configuration.Name) + if err != nil { + log.WithError(err).Error("Could not remove the configuration from the provider") + } + } + case <-ctx.Done(): + return + } + } + }() +} + +// Watch watches for changes on the database +func (r *CassandraRepository) Watch(ctx context.Context, cfgChan chan<- ConfigurationChanged) { + t := time.NewTicker(r.refreshTime) + go func(refreshTicker *time.Ticker) { + defer refreshTicker.Stop() + log.Debug("Watching Provider...") + for { + select { + case <-refreshTicker.C: + defs, err := r.FindAll() + if err != nil { + log.WithError(err).Error("Failed to get configurations on watch") + continue + } + + cfgChan <- ConfigurationChanged{ + Configurations: &Configuration{Definitions: defs}, + } + case <-ctx.Done(): + return + } + } + }(t) +} + // FindAll fetches all the API definitions available func (r *CassandraRepository) FindAll() ([]*Definition, error) { - //span := opentracing.StartSpan("FindAll") - //defer span.Finish() - //span.SetTag("Interface", "CassandraRepository") + span := opentracing.StartSpan("FindAll") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") + + log.Infof("finding all definitions") var results []*Definition - // todo fill in the select with the actual column names - iter := r.session.GetSession(nil).Query(nil, - "SELECT name, definition FROM api_definition").Iter(nil) + + iter := r.session.GetSession(span).Query(span, + "SELECT definition FROM api_definition").Iter(span) var savedDef string var definition *Definition - for iter.Scan(nil, definition) { - err := definition.UnmarshalJSON([]byte(savedDef)) + for iter.Scan(span, &savedDef) { + err := json.Unmarshal([]byte(savedDef), &definition) if err != nil { log.Errorf("error trying to unmarshal definition json: %v", err) return nil, err @@ -91,7 +153,7 @@ func (r *CassandraRepository) FindAll() ([]*Definition, error) { results = append(results, definition) } - err := iter.Close(nil) + err := iter.Close(span) if err != nil { log.Errorf("error getting all definitions: %v", err) } @@ -100,9 +162,13 @@ func (r *CassandraRepository) FindAll() ([]*Definition, error) { // Add adds an API definition to the repository func (r *CassandraRepository) add(definition *Definition) error { - //span := opentracing.StartSpan("add") - //defer span.Finish() - //span.SetTag("Interface", "CassandraRepository") + span := opentracing.StartSpan("add") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") + + log.Infof("adding: %s", definition.Name) + + log.Infof("definition is: %v", *definition) isValid, err := definition.Validate() if false == isValid && err != nil { @@ -110,13 +176,17 @@ func (r *CassandraRepository) add(definition *Definition) error { return err } - // todo I might need to marshal the definition before saving it. - err = r.session.GetSession(nil).Query(nil, + saveDef, err := json.Marshal(definition) + if err != nil { + log.Errorf("error marshaling oauth: %v", err) + return err + } + + err = r.session.GetSession(span).Query(span, "UPDATE api_definition " + - "SET name = ?, " + - "definition = ? " + + "SET definition = ? " + "WHERE name = ?", - definition.Name, definition).Exec(nil) + saveDef, definition.Name).Exec(span) if err != nil { log.Errorf("error saving definition %s: %v", definition.Name, err) @@ -129,13 +199,14 @@ func (r *CassandraRepository) add(definition *Definition) error { // Remove removes an API definition from the repository func (r *CassandraRepository) remove(name string) error { - //span := opentracing.StartSpan("remove") - //defer span.Finish() - //span.SetTag("Interface", "CassandraRepository") + span := opentracing.StartSpan("remove") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") + + log.Infof("removing: %s", name) - // todo I might need to marshal the definition before saving it. - err := r.session.GetSession(nil).Query(nil, - "DELETE FROM api_definition WHERE name = ?", name).Exec(nil) + err := r.session.GetSession(span).Query(span, + "DELETE FROM api_definition WHERE name = ?", name).Exec(span) if err != nil { log.Errorf("error saving definition %s: %v", name, err) @@ -148,21 +219,24 @@ func (r *CassandraRepository) remove(name string) error { func parseDSN(dsn string) (clusterHost string, systemKeyspace string, appKeyspace string, connectionTimeout int) { trimDSN := strings.TrimSpace(dsn) + log.Infof("trimDSN: %s", trimDSN) if len(trimDSN) == 0 { return "", "", "", 0 } // split each `:` - splitDSN := strings.Split(trimDSN, ":") + splitDSN := strings.Split(trimDSN, "/") // list of info for i, v := range splitDSN { + log.Infof("splitDSN i is %d and v is %s", i, v) + // start at 1 because the dsn path comes in with a leading / switch i { - case 0: - clusterHost = v case 1: - systemKeyspace = v + clusterHost = v case 2: - appKeyspace = v + systemKeyspace = v case 3: + appKeyspace = v + case 4: timeout, err := strconv.Atoi(v) if err != nil { log.Error("timeout is not an int") diff --git a/pkg/api/repository.go b/pkg/api/repository.go index bd06b9dec..6faee61fb 100644 --- a/pkg/api/repository.go +++ b/pkg/api/repository.go @@ -45,8 +45,8 @@ func BuildRepository(dsn string, refreshTime time.Duration) (Repository, error) log.Debug("MongoDB configuration chosen") return NewMongoAppRepository(dsn, refreshTime) case cassandra: - log.Debugf("Casssandra configuration chosen") - return NewCassandraRepository(dsn, refreshTime) + log.Debugf("Casssandra configuration chosen: dsn is %s, dsnURL is %s", dsnURL.Path, dsnURL) + return NewCassandraRepository(dsnURL.Path, refreshTime) case file: log.Debug("File system based configuration chosen") apiPath := fmt.Sprintf("%s/apis", dsnURL.Path) diff --git a/pkg/plugin/oauth2/cassandra_repository.go b/pkg/plugin/oauth2/cassandra_repository.go new file mode 100644 index 000000000..43da2ff95 --- /dev/null +++ b/pkg/plugin/oauth2/cassandra_repository.go @@ -0,0 +1,229 @@ +package oauth2 + +import ( + "encoding/json" + cass "github.com/hellofresh/janus/cassandra" + cassmod "github.com/motiv-labs/cassandra" + "github.com/opentracing/opentracing-go" + log "github.com/sirupsen/logrus" + "strconv" + "strings" + "time" +) + +// CassandraRepository represents a cassandra repository +type CassandraRepository struct { + session cassmod.Holder +} + +func NewCassandraRepository(dsn string) (*CassandraRepository, error) { + log.Debugf("getting new oauth cassandra repo") + span := opentracing.StartSpan("NewCassandraRepository") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") + + // parse the dsn string for the cluster host, system key space, app key space and connection timeout. + log.Infof("dsn is %s", dsn) + clusterHost, systemKeyspace, appKeyspace, connectionTimeout := parseDSN(dsn) + if clusterHost == "" { + clusterHost = cass.ClusterHostName + } + if systemKeyspace == "" { + systemKeyspace = cass.SystemKeyspace + } + if appKeyspace == "" { + appKeyspace = cass.AppKeyspace + } + if connectionTimeout == 0 { + connectionTimeout = cass.Timeout + } + + // Wait for Cassandra to start, setup Cassandra keyspace if required + cassmod.Initialize(cass.ClusterHostName, cass.SystemKeyspace, cass.AppKeyspace, cass.Timeout*time.Second, span) + + // Getting a cassandra connection initializer + initializer := cassmod.New(cass.ClusterHostName, cass.AppKeyspace, span) + + // Starting a new cassandra session + sessionHolder, err := initializer.NewSession(span) + if err != nil { + panic(err) + } + // set oauth cassandra repo session + cass.SetSessionHolder(sessionHolder) + + return &CassandraRepository{ + session: sessionHolder, + }, nil + +} + +// FindAll fetches all the OAuth Servers available +func (r *CassandraRepository) FindAll() ([]*OAuth, error) { + span := opentracing.StartSpan("FindAll") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") + + log.Infof("finding all oauth servers") + + var results []*OAuth + + iter := r.session.GetSession(span).Query(span, + "SELECT name, oauth FROM oauth").Iter(span) + + var savedDef string + var oauth *OAuth + + for iter.Scan(span, &savedDef) { + err := json.Unmarshal([]byte(savedDef), &oauth) + if err != nil { + log.Errorf("error trying to unmarshal oauth json: %v", err) + return nil, err + } + results = append(results, oauth) + } + + err := iter.Close(span) + if err != nil { + log.Errorf("error getting all oauths: %v", err) + } + return results, err +} + +// FindByName find an OAuth Server by name +func (r *CassandraRepository) FindByName(name string) (*OAuth, error) { + span := opentracing.StartSpan("add") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") + + log.Infof("finding: %s", name) + + var oauth *OAuth + + err := r.session.GetSession(span).Query(span, + "SELECT oauth = ? " + + "FROM oauth" + + "WHERE name = ?", + oauth, name).Exec(span) + + if err != nil { + log.Errorf("error selecting oauth %s: %v", name, err) + } else { + log.Debugf("successfully found oauth %s", name) + } + + return oauth, err +} + +// Add add a new OAuth Server to the repository +// Add is the same as Save because Cassandra only upserts and I didn't want to write an existence checker +func (r *CassandraRepository) Add(oauth *OAuth) error { + span := opentracing.StartSpan("add") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") + + log.Infof("adding: %s", oauth.Name) + + log.Infof("oauth is: %v", *oauth) + + saveOauth, err := json.Marshal(oauth) + if err != nil { + log.Errorf("error marshaling oauth: %v", err) + return err + } + err = r.session.GetSession(span).Query(span, + "UPDATE oauth " + + "SET oauth = ? " + + "WHERE name = ?", + saveOauth, oauth.Name).Exec(span) + + if err != nil { + log.Errorf("error saving oauth %s: %v", oauth.Name, err) + } else { + log.Debugf("successfully saved oauth %s", oauth.Name) + } + + return err +} + +// Save saves OAuth Server to the repository +func (r *CassandraRepository) Save(oauth *OAuth) error { + span := opentracing.StartSpan("add") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") + + log.Infof("adding: %s", oauth.Name) + + log.Infof("oauth is: %v", *oauth) + + saveOauth, err := json.Marshal(oauth) + if err != nil { + log.Errorf("error marshaling oauth: %v", err) + return err + } + err = r.session.GetSession(span).Query(span, + "UPDATE oauth " + + "SET oauth = ? " + + "WHERE name = ?", + saveOauth, oauth.Name).Exec(span) + + if err != nil { + log.Errorf("error saving oauth %s: %v", oauth.Name, err) + } else { + log.Debugf("successfully saved oauth %s", oauth.Name) + } + + return err +} + +// Remove removes an OAuth Server from the repository +func (r *CassandraRepository) Remove(name string) error { + span := opentracing.StartSpan("remove") + defer span.Finish() + span.SetTag("Interface", "CassandraRepository") + + log.Infof("removing: %s", name) + + err := r.session.GetSession(span).Query(span, + "DELETE FROM oauth WHERE name = ?", name).Exec(span) + + if err != nil { + log.Errorf("error removing oauth %s: %v", name, err) + } else { + log.Debugf("successfully removed oauth %s", name) + } + + return err +} + +func parseDSN(dsn string) (clusterHost string, systemKeyspace string, appKeyspace string, connectionTimeout int) { + trimDSN := strings.TrimSpace(dsn) + log.Infof("trimDSN: %s", trimDSN) + if len(trimDSN) == 0 { + return "", "", "", 0 + } + // split each `:` + splitDSN := strings.Split(trimDSN, "/") + // list of info + for i, v := range splitDSN { + log.Infof("splitDSN i is %d and v is %s", i, v) + // start at 1 because the dsn path comes in with a leading / + switch i { + case 1: + clusterHost = v + case 2: + systemKeyspace = v + case 3: + appKeyspace = v + case 4: + timeout, err := strconv.Atoi(v) + if err != nil { + log.Error("timeout is not an int") + timeout = 0 + } + connectionTimeout = timeout + } + } + return clusterHost, systemKeyspace, appKeyspace, connectionTimeout +} + diff --git a/pkg/plugin/oauth2/setup.go b/pkg/plugin/oauth2/setup.go index fb7a12444..6f29ef5d8 100644 --- a/pkg/plugin/oauth2/setup.go +++ b/pkg/plugin/oauth2/setup.go @@ -23,6 +23,7 @@ import ( const ( mongodb = "mongodb" file = "file" + cassandra = "cassandra" mongoIdxTimeout = 10 * time.Second ) @@ -102,7 +103,13 @@ func onStartup(event interface{}) error { ); err != nil { return fmt.Errorf("failed to create indexes for oauth servers repository: %w", err) } - + case cassandra: + repo, err = NewCassandraRepository(dsnURL.Path) + if err != nil { + log.Errorf("error creating new cassandra repo") + return err + } + // todo more setup may be required but I don't think so case file: authPath := fmt.Sprintf("%s/auth", dsnURL.Path) log.WithField("path", authPath).Debug("Trying to load Auth configuration files")