forked from openware/rango
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
834 additions
and
829 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package app | ||
|
||
import ( | ||
"net/http" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
"github.com/zsmartex/pkg/v2/infrastructure/kafka_fx" | ||
"github.com/zsmartex/pkg/v2/log" | ||
"go.uber.org/fx" | ||
|
||
"github.com/zsmartex/rango/config" | ||
"github.com/zsmartex/rango/pkg/routing" | ||
) | ||
|
||
var Topic kafka_fx.Topic = "rango.events" | ||
var Ticker *time.Ticker = time.NewTicker(20 * time.Millisecond) | ||
|
||
var ( | ||
Module = fx.Module("rango_service", | ||
kafka_fx.ConsumerModule, | ||
rangoProviders, | ||
) | ||
|
||
rangoProviders = fx.Options( | ||
fx.Supply(Topic), | ||
fx.Supply(Ticker), | ||
fx.Supply(fx.Annotate(true, fx.ParamTags(`name:"at_end"`))), | ||
fx.Provide( | ||
New, | ||
func(app *App) kafka_fx.ConsumerSubscriber { | ||
return app | ||
}, | ||
), | ||
fx.Invoke(registerHooks), | ||
) | ||
) | ||
|
||
var _ kafka_fx.ConsumerSubscriber = (*App)(nil) | ||
|
||
type App struct { | ||
config *config.Config | ||
hub *routing.Hub | ||
} | ||
|
||
func New(config *config.Config, hub *routing.Hub) *App { | ||
app := &App{ | ||
config: config, | ||
hub: hub, | ||
} | ||
|
||
return app | ||
} | ||
|
||
func (s *App) OnMessage(record *kgo.Record) error { | ||
s.hub.ReceiveMsg(record) | ||
|
||
return nil | ||
} | ||
|
||
func (a *App) Run() error { | ||
pub, err := getPublicKey(a.config) | ||
if err != nil { | ||
time.Sleep(2 * time.Second) | ||
return errors.Wrap(err, "Loading public key failed") | ||
} | ||
|
||
log.Info("Starting rango...") | ||
|
||
go a.hub.ListenWebsocketEvents() | ||
|
||
wsHandler := func(w http.ResponseWriter, r *http.Request) { | ||
routing.NewClient(a.hub, w, r) | ||
} | ||
|
||
http.HandleFunc("/private", authHandler(wsHandler, pub, true)) | ||
http.HandleFunc("/public", authHandler(wsHandler, pub, false)) | ||
http.HandleFunc("/", authHandler(wsHandler, pub, false)) | ||
|
||
go func() { | ||
err := http.ListenAndServe(":4242", promhttp.Handler()) | ||
if err != nil { | ||
log.Fatalf("ListenAndServe failed: %v", err) | ||
} | ||
}() | ||
|
||
log.Infof("Listenning on %s", a.config.HTTP.Address()) | ||
err = http.ListenAndServe(a.config.HTTP.Address(), nil) | ||
if err != nil { | ||
log.Errorf("ListenAndServe failed: %v", err) | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func registerHooks(app *App) { | ||
go app.Run() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package app | ||
|
||
import ( | ||
"crypto/rsa" | ||
"fmt" | ||
"net/http" | ||
"strings" | ||
|
||
"github.com/zsmartex/rango/config" | ||
"github.com/zsmartex/rango/pkg/auth" | ||
) | ||
|
||
const prefix = "Bearer " | ||
|
||
type httpHanlder func(w http.ResponseWriter, r *http.Request) | ||
|
||
func token(r *http.Request) string { | ||
authHeader := r.Header.Get("Authorization") | ||
if !strings.HasPrefix(string(authHeader), prefix) { | ||
return "" | ||
} | ||
|
||
return authHeader[len(prefix):] | ||
} | ||
|
||
func authHandler(h httpHanlder, key *rsa.PublicKey, mustAuth bool) httpHanlder { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
auth, err := auth.ParseAndValidate(token(r), key) | ||
|
||
if err != nil && mustAuth { | ||
w.WriteHeader(http.StatusUnauthorized) | ||
return | ||
} | ||
|
||
if err == nil { | ||
r.Header.Set("JwtUID", auth.UID) | ||
r.Header.Set("JwtRole", auth.Role) | ||
} else { | ||
r.Header.Del("JwtUID") | ||
r.Header.Del("JwtRole") | ||
} | ||
h(w, r) | ||
} | ||
} | ||
|
||
func getPublicKey(config *config.Config) (pub *rsa.PublicKey, err error) { | ||
ks := auth.KeyStore{} | ||
encPem := config.JWTPublicKey | ||
ks.LoadPublicKeyFromString(encPem) | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
if ks.PublicKey == nil { | ||
return nil, fmt.Errorf("failed") | ||
} | ||
return ks.PublicKey, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,199 +1,21 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"crypto/rsa" | ||
"flag" | ||
"fmt" | ||
"net/http" | ||
"os" | ||
"strings" | ||
"time" | ||
"go.uber.org/fx" | ||
|
||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
|
||
"github.com/zsmartex/pkg/v2/log" | ||
"github.com/zsmartex/pkg/v2/utils" | ||
"github.com/zsmartex/rango/pkg/auth" | ||
"github.com/zsmartex/rango/cmd/rango/app.go" | ||
"github.com/zsmartex/rango/config" | ||
"github.com/zsmartex/rango/pkg/metrics" | ||
"github.com/zsmartex/rango/pkg/routing" | ||
) | ||
|
||
var ( | ||
wsAddr = flag.String("ws-addr", "", "http service address") | ||
pubKey = flag.String("pubKey", "config/rsa-key.pub", "Path to public key") | ||
exName = flag.String("exchange", "rango.events", "Exchange name of upstream messages") | ||
) | ||
|
||
const prefix = "Bearer " | ||
|
||
type httpHanlder func(w http.ResponseWriter, r *http.Request) | ||
|
||
func token(r *http.Request) string { | ||
authHeader := r.Header.Get("Authorization") | ||
if !strings.HasPrefix(string(authHeader), prefix) { | ||
return "" | ||
} | ||
|
||
return authHeader[len(prefix):] | ||
} | ||
|
||
func authHandler(h httpHanlder, key *rsa.PublicKey, mustAuth bool) httpHanlder { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
auth, err := auth.ParseAndValidate(token(r), key) | ||
|
||
if err != nil && mustAuth { | ||
w.WriteHeader(http.StatusUnauthorized) | ||
return | ||
} | ||
|
||
if err == nil { | ||
r.Header.Set("JwtUID", auth.UID) | ||
r.Header.Set("JwtRole", auth.Role) | ||
} else { | ||
r.Header.Del("JwtUID") | ||
r.Header.Del("JwtRole") | ||
} | ||
h(w, r) | ||
} | ||
} | ||
|
||
func getPublicKey() (pub *rsa.PublicKey, err error) { | ||
ks := auth.KeyStore{} | ||
encPem := os.Getenv("JWT_PUBLIC_KEY") | ||
|
||
if encPem != "" { | ||
ks.LoadPublicKeyFromString(encPem) | ||
} else { | ||
ks.LoadPublicKeyFromFile(*pubKey) | ||
} | ||
if err != nil { | ||
return nil, err | ||
} | ||
if ks.PublicKey == nil { | ||
return nil, fmt.Errorf("failed") | ||
} | ||
return ks.PublicKey, nil | ||
} | ||
|
||
func getEnv(name, value string) string { | ||
v := os.Getenv(name) | ||
if v == "" { | ||
return value | ||
} | ||
return v | ||
} | ||
|
||
func getServerAddress() string { | ||
if *wsAddr != "" { | ||
return *wsAddr | ||
} | ||
host := getEnv("RANGER_HOST", "0.0.0.0") | ||
port := getEnv("RANGER_PORT", "8080") | ||
return fmt.Sprintf("%s:%s", host, port) | ||
} | ||
|
||
func getRBACConfig() map[string][]string { | ||
envs := os.Environ() | ||
|
||
rbacEnv := filterPrefixed("RANGO_RBAC_", envs) | ||
|
||
return envToMatrix(rbacEnv, "RANGO_RBAC_") | ||
} | ||
|
||
func envToMatrix(env []string, trimPrefix string) map[string][]string { | ||
matr := make(map[string][]string) | ||
|
||
for _, rec := range env { | ||
kv := strings.Split(rec, "=") | ||
key := strings.ToLower(strings.TrimPrefix(kv[0], trimPrefix)) | ||
value := strings.Split(kv[1], ",") | ||
|
||
matr[key] = value | ||
} | ||
|
||
return matr | ||
} | ||
|
||
func filterPrefixed(prefix string, arr []string) []string { | ||
var res []string | ||
|
||
for _, rec := range arr { | ||
if strings.HasPrefix(rec, prefix) { | ||
res = append(res, rec) | ||
} | ||
} | ||
|
||
return res | ||
} | ||
|
||
func main() { | ||
log.New(os.Getenv("APP_NAME")) | ||
flag.Parse() | ||
|
||
metrics.Enable() | ||
ctx := context.Background() | ||
rbac := getRBACConfig() | ||
hub := routing.NewHub(rbac) | ||
pub, err := getPublicKey() | ||
if err != nil { | ||
log.Errorf("Loading public key failed: %s", err.Error()) | ||
time.Sleep(2 * time.Second) | ||
return | ||
} | ||
|
||
kafkaBrokers := strings.Split(os.Getenv("KAFKA_BROKERS"), ",") | ||
seeds := kgo.SeedBrokers(kafkaBrokers...) | ||
client, err := kgo.NewClient( | ||
seeds, | ||
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), | ||
kgo.AllowAutoTopicCreation(), | ||
kgo.ConsumerGroup(fmt.Sprintf("rango-%s", utils.RandomString(10))), | ||
kgo.ConsumeTopics(*exName), | ||
kgo.DisableAutoCommit(), | ||
app := fx.New( | ||
config.Module, | ||
metrics.Module, | ||
routing.Module, | ||
app.Module, | ||
) | ||
if err != nil { | ||
log.Errorf("Failed to create consumer: %s", err.Error()) | ||
return | ||
} | ||
|
||
log.Info("Starting rango...") | ||
|
||
go func() { | ||
for { | ||
fetches := client.PollRecords(ctx, -1) | ||
if err := fetches.Err(); err != nil { | ||
log.Fatalf("Failed to poll consumer %v", err) | ||
continue | ||
} | ||
|
||
records := fetches.Records() | ||
for _, r := range records { | ||
hub.ReceiveMsg(r) | ||
|
||
client.CommitRecords(ctx, r) | ||
} | ||
} | ||
}() | ||
|
||
defer client.Close() | ||
|
||
go hub.ListenWebsocketEvents() | ||
|
||
wsHandler := func(w http.ResponseWriter, r *http.Request) { | ||
routing.NewClient(hub, w, r) | ||
} | ||
|
||
http.HandleFunc("/private", authHandler(wsHandler, pub, true)) | ||
http.HandleFunc("/public", authHandler(wsHandler, pub, false)) | ||
http.HandleFunc("/", authHandler(wsHandler, pub, false)) | ||
|
||
go http.ListenAndServe(":4242", promhttp.Handler()) | ||
|
||
log.Infof("Listenning on %s", getServerAddress()) | ||
err = http.ListenAndServe(getServerAddress(), nil) | ||
if err != nil { | ||
log.Fatalf("ListenAndServe failed: " + err.Error()) | ||
} | ||
app.Run() | ||
} |
Oops, something went wrong.