diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index b30803bde8d..8ad5b8c425c 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -388,6 +388,10 @@ func main() { exitOnError("Creating NATS connection", err) } eventBus := bus.NewNATSBus(nc) + if cfg.Trace { + eventBus.TraceEvents() + } + eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName, envs) var logsStream logsclient.Stream diff --git a/go.mod b/go.mod index badfa1069a7..53b3ee8231c 100644 --- a/go.mod +++ b/go.mod @@ -37,8 +37,8 @@ require ( github.com/minio/minio-go/v7 v7.0.47 github.com/montanaflynn/stats v0.6.6 github.com/moogar0880/problems v0.1.1 - github.com/nats-io/nats-server/v2 v2.10.4 - github.com/nats-io/nats.go v1.31.0 + github.com/nats-io/nats-server/v2 v2.10.16 + github.com/nats-io/nats.go v1.35.0 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.6-0.20230925090304-df64c4bbad77 github.com/onsi/ginkgo/v2 v2.15.0 @@ -62,7 +62,7 @@ require ( golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/oauth2 v0.13.0 golang.org/x/sync v0.5.0 - golang.org/x/text v0.14.0 + golang.org/x/text v0.15.0 google.golang.org/appengine v1.6.8 google.golang.org/grpc v1.60.0 google.golang.org/protobuf v1.31.0 @@ -127,7 +127,7 @@ require ( github.com/itchyny/timefmt-go v0.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/cpuid/v2 v2.2.3 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lithammer/fuzzysearch v1.1.8 // indirect @@ -150,8 +150,8 @@ require ( github.com/muesli/reflow v0.3.0 // indirect github.com/muesli/termenv v0.12.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/nats-io/jwt/v2 v2.5.2 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/jwt/v2 v2.5.7 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/package-url/packageurl-go v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -187,12 +187,12 @@ require ( github.com/yuin/goldmark-emoji v1.0.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.16.0 // indirect + golang.org/x/crypto v0.23.0 // indirect golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.19.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/term v0.16.0 // indirect - golang.org/x/time v0.3.0 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/term v0.20.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.16.1 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect diff --git a/go.sum b/go.sum index 21895ee54b0..64f84b397cd 100644 --- a/go.sum +++ b/go.sum @@ -337,8 +337,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.14.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -424,14 +424,14 @@ github.com/muesli/termenv v0.12.0 h1:KuQRUE3PgxRFWhq4gHvZtPSLCGDqM5q/cYr1pZ39ytc github.com/muesli/termenv v0.12.0/go.mod h1:WCCv32tusQ/EEZ5S8oUIIrC/nIuBcxCVqlN4Xfkv+7A= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= -github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.10.4 h1:uB9xcwon3tPXWAdmTJqqqC6cie3yuPWHJjjTBgaPNus= -github.com/nats-io/nats-server/v2 v2.10.4/go.mod h1:eWm2JmHP9Lqm2oemB6/XGi0/GwsZwtWf8HIPUsh+9ns= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c= +github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0= +github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU= +github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk= +github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= @@ -618,8 +618,8 @@ golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= -golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -700,8 +700,8 @@ golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220923203811-8be639271d50/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= -golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -792,16 +792,17 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210503060354-a79de5458b56/go.mod h1:tfny5GFUkzUvx4ps4ajbZsCe5lw1metzhBm9T3x7oIY= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= -golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -813,13 +814,13 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/internal/app/api/v1/events.go b/internal/app/api/v1/events.go index 644c412e8c4..fb1b5bc59ca 100644 --- a/internal/app/api/v1/events.go +++ b/internal/app/api/v1/events.go @@ -18,18 +18,6 @@ func (s TestkubeAPI) InitEvents() { // run workers s.Events.Listen(context.Background()) - - // handle response logs - go func() { - s.Log.Debug("Listening for workers results") - for resp := range s.Events.Results { - if resp.Error() != "" { - s.Log.Errorw("got error when sending webhooks", "response", resp) - continue - } - s.Log.Debugw("got event response", "response", resp) - } - }() } func (s TestkubeAPI) EventsStreamHandler() fiber.Handler { diff --git a/internal/config/config.go b/internal/config/config.go index 67797fa6b9a..77f0921a0bd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -92,6 +92,7 @@ type Config struct { EnableSecretsEndpoint bool `envconfig:"ENABLE_SECRETS_ENDPOINT" default:"false"` DisableMongoMigrations bool `envconfig:"DISABLE_MONGO_MIGRATIONS" default:"false"` Debug bool `envconfig:"DEBUG" default:"false"` + Trace bool `envconfig:"TRACE" default:"false"` EnableImageDataPersistentCache bool `envconfig:"TESTKUBE_ENABLE_IMAGE_DATA_PERSISTENT_CACHE" default:"false"` ImageDataPersistentCacheKey string `envconfig:"TESTKUBE_IMAGE_DATA_PERSISTENT_CACHE_KEY" default:"testkube-image-cache"` LogServerGrpcAddress string `envconfig:"LOG_SERVER_GRPC_ADDRESS" default:":9090"` diff --git a/pkg/event/bus/nats.go b/pkg/event/bus/nats.go index 484b9b4a14c..6d35c17598f 100644 --- a/pkg/event/bus/nats.go +++ b/pkg/event/bus/nats.go @@ -149,5 +149,18 @@ func (n *NATSBus) Close() error { } func (n *NATSBus) queueName(subscription, queue string) string { - return fmt.Sprintf("%s.%s", SubscriptionName, queue) + return fmt.Sprintf("%s.%s", subscription, queue) +} + +func (n *NATSBus) TraceEvents() { + s, err := n.nc.Subscribe(SubscriptionName+".>", func(event testkube.Event) { + log.Tracew(log.DefaultLogger, "all events.> trace", event.Log()...) + }) + + if err != nil { + log.DefaultLogger.Errorw("error subscribing to all events", "error", err) + return + } + + log.DefaultLogger.Infow("subscribed to all events", "subscription", s.Subject) } diff --git a/pkg/event/emitter.go b/pkg/event/emitter.go index 2a0c36ec9d3..52fd65fd4e7 100644 --- a/pkg/event/emitter.go +++ b/pkg/event/emitter.go @@ -22,7 +22,6 @@ const ( // NewEmitter returns new emitter instance func NewEmitter(eventBus bus.Bus, clusterName string, envs map[string]string) *Emitter { return &Emitter{ - Results: make(chan testkube.EventResult, eventsBuffer), Log: log.DefaultLogger, Loader: NewLoader(), Bus: eventBus, @@ -34,7 +33,6 @@ func NewEmitter(eventBus bus.Bus, clusterName string, envs map[string]string) *E // Emitter handles events emitting for webhooks type Emitter struct { - Results chan testkube.EventResult Listeners common.Listeners Loader *Loader Log *zap.SugaredLogger @@ -57,25 +55,10 @@ func (e *Emitter) UpdateListeners(listeners common.Listeners) { e.mutex.Lock() defer e.mutex.Unlock() - oldMap := make(map[string]map[string]common.Listener, 0) - newMap := make(map[string]map[string]common.Listener, 0) result := make([]common.Listener, 0) - for _, l := range e.Listeners { - if _, ok := oldMap[l.Kind()]; !ok { - oldMap[l.Kind()] = make(map[string]common.Listener, 0) - } - - oldMap[l.Kind()][l.Name()] = l - } - - for _, l := range listeners { - if _, ok := newMap[l.Kind()]; !ok { - newMap[l.Kind()] = make(map[string]common.Listener, 0) - } - - newMap[l.Kind()][l.Name()] = l - } + oldMap := listerersToMap(e.Listeners) + newMap := listerersToMap(listeners) // check for missing listeners for kind, lMap := range oldMap { @@ -126,6 +109,20 @@ func (e *Emitter) UpdateListeners(listeners common.Listeners) { e.Listeners = result } +func listerersToMap(listeners []common.Listener) map[string]map[string]common.Listener { + m := make(map[string]map[string]common.Listener, 0) + + for _, l := range listeners { + if _, ok := m[l.Kind()]; !ok { + m[l.Kind()] = make(map[string]common.Listener, 0) + } + + m[l.Kind()][l.Name()] = l + } + + return m +} + // Notify notifies emitter with webhook func (e *Emitter) Notify(event testkube.Event) { event.ClusterName = e.ClusterName @@ -161,19 +158,19 @@ func (e *Emitter) Listen(ctx context.Context) { } func (e *Emitter) startListener(l common.Listener) { - e.Log.Infow("starting listener", l.Name(), l.Metadata()) err := e.Bus.SubscribeTopic("events.>", l.Name(), e.notifyHandler(l)) if err != nil { - e.Log.Errorw("error subscribing to event", "error", err) + e.Log.Errorw("error while starting listener", "error", err) } + e.Log.Infow("started listener", l.Name(), l.Metadata()) } func (e *Emitter) stopListener(name string) { - e.Log.Infow("stoping listener", name) err := e.Bus.Unsubscribe(name) if err != nil { - e.Log.Errorw("error unsubscribing from event", "error", err) + e.Log.Errorw("error while stopping listener", "error", err) } + e.Log.Infow("stopped listener", name) } func (e *Emitter) notifyHandler(l common.Listener) bus.Handler { @@ -181,7 +178,7 @@ func (e *Emitter) notifyHandler(l common.Listener) bus.Handler { return func(event testkube.Event) error { if event.Valid(l.Selector(), l.Events()) { result := l.Notify(event) - log.Tracew(logger, "listener notified", append(event.Log(), "result", result)) + log.Tracew(logger, "listener notified", append(event.Log(), "result", result)...) } else { log.Tracew(logger, "dropping event not matching selector or type", event.Log()...) } diff --git a/pkg/event/kind/slack/loader.go b/pkg/event/kind/slack/loader.go index 0b34f962939..85c1cbc53b5 100644 --- a/pkg/event/kind/slack/loader.go +++ b/pkg/event/kind/slack/loader.go @@ -44,6 +44,8 @@ func (r *SlackLoader) Load() (listeners common.Listeners, err error) { if r.slackNotifier.Ready { return common.Listeners{NewSlackListener("slack", "", r.events, r.slackNotifier)}, nil } - r.Log.Debugw("Slack notifier is not ready or not configured properly, omiting", "kind", r.Kind()) + + log.Tracew(r.Log, "Slack notifier is not ready or not configured properly, omiting", "kind", r.Kind()) + return common.Listeners{}, nil } diff --git a/pkg/log/trace.go b/pkg/log/trace.go index f5138657613..c9b31738cb4 100644 --- a/pkg/log/trace.go +++ b/pkg/log/trace.go @@ -8,27 +8,29 @@ import ( "go.uber.org/zap" ) +const tracePrefix = "TRACE: " + func Tracew(logger *zap.SugaredLogger, msg string, keysAndValues ...interface{}) { if isTraceEnabled() { - logger.Debugw(msg, keysAndValues...) + logger.Debugw(tracePrefix+msg, keysAndValues...) } } func Tracef(logger *zap.SugaredLogger, msg string, keysAndValues ...interface{}) { if isTraceEnabled() { - logger.Debugf(msg, keysAndValues...) + logger.Debugf(tracePrefix+msg, keysAndValues...) } } func Trace(logger *zap.SugaredLogger, msg string, keysAndValues ...interface{}) { if isTraceEnabled() { - logger.Debug(msg) + logger.Debug(tracePrefix + msg) } } func Traceln(logger *zap.SugaredLogger, msg string, keysAndValues ...interface{}) { if isTraceEnabled() { - logger.Debug(msg) + logger.Debug(tracePrefix + msg) } } diff --git a/pkg/logs/client/stream_test.go b/pkg/logs/client/stream_test.go index 75705f2ddf7..ac31b70e31e 100644 --- a/pkg/logs/client/stream_test.go +++ b/pkg/logs/client/stream_test.go @@ -113,7 +113,10 @@ func TestStream_StartStop(t *testing.T) { } func TestStream_Name(t *testing.T) { - client, err := NewNatsLogStream(nil) + ns, nc := bus.TestServerWithConnection() + defer ns.Shutdown() + + client, err := NewNatsLogStream(nc) assert.NoError(t, err) t.Run("passed one string param", func(t *testing.T) {