Skip to content

Commit

Permalink
Cherry pick commit 69ac7dc from other branch; now works without any r…
Browse files Browse the repository at this point in the history
…estrictions; original commit message: (#45)

Apply patch from freudl#1 manually

This results in a version that works, but only
when run with GOLANG_PROTOBUF_REGISTRATION_CONFLICT=warn
or compiled with -ldflags "-X google.golang.org/protobuf/reflect/protoregistry.conflictPolicy=warn"

This has to be adressed with an update to the
cf dependencies to at least version 9
  • Loading branch information
bmo-at-a9s authored Oct 11, 2023
1 parent bce7ac3 commit 21ac64f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 38 deletions.
5 changes: 4 additions & 1 deletion src/jetstream/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ replace (
github.com/cloudfoundry-incubator/stratos/src/jetstream/crypto => ./crypto
github.com/cloudfoundry-incubator/stratos/src/jetstream/docs => ./docs
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/cfapppush => ./plugins/cfapppush
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/cloudfoundry => ./plugins/cloudfoundry
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes => ./plugins/kubernetes
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/auth => ./plugins/kubernetes/auth
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/terminal => ./plugins/kubernetes/terminal
Expand All @@ -23,6 +24,8 @@ replace (
)

require (
code.cloudfoundry.org/go-log-cache v1.0.1-0.20230224210401-5e305670b626
code.cloudfoundry.org/go-loggregator v7.4.0+incompatible
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/antonlindstrom/pgstore v0.0.0-20220421113606-e3a6e3fed12a
github.com/cf-stratos/mysqlstore v0.0.0-20170822100912-304308519d13
Expand Down Expand Up @@ -64,10 +67,10 @@ require (
code.cloudfoundry.org/cli v0.0.0-20230912192837-efd1d03e7292 // indirect
code.cloudfoundry.org/cli-plugin-repo v0.0.0-20230912184324-f005268561a6 // indirect
code.cloudfoundry.org/clock v1.1.0 // indirect
code.cloudfoundry.org/go-log-cache v1.0.1-0.20230224210401-5e305670b626 // indirect
code.cloudfoundry.org/go-loggregator/v8 v8.0.5 // indirect
code.cloudfoundry.org/gofileutils v0.0.0-20170111115228-4d0c80011a0f // indirect
code.cloudfoundry.org/jsonry v1.1.4 // indirect
code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78 // indirect
code.cloudfoundry.org/tlsconfig v0.0.0-20230929201433-6cd2b78aba25 // indirect
code.cloudfoundry.org/ykk v0.0.0-20170424192843-e4df4ce2fd4d // indirect
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230106234847-43070de90fa1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions src/jetstream/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ code.cloudfoundry.org/go-diodes v0.0.0-20180905200951-72629b5276e3/go.mod h1:Jzi
code.cloudfoundry.org/go-envstruct v1.5.0/go.mod h1:E2S/gzRZpZ51PZnIv7Bo7QvcgH18yio19upkrRk0xLU=
code.cloudfoundry.org/go-log-cache v1.0.1-0.20211011162012-ede82a99d3cc h1:8gj5Z08i9ZvoIGi1A/E2CEQTbvJjogYQgBQUI2/DyNE=
code.cloudfoundry.org/go-log-cache v1.0.1-0.20211011162012-ede82a99d3cc/go.mod h1:8thG6lrstlbeI44hc7QgSnX8eau68+mNt9Pp33/TEcg=
code.cloudfoundry.org/go-loggregator v7.4.0+incompatible h1:KqZYloMQWM5Zg/BQKunOIA4OODh7djZbk48qqbowNFI=
code.cloudfoundry.org/go-loggregator v7.4.0+incompatible/go.mod h1:KPBTRqj+y738Nhf1+g4JHFaBU8j7dedirR5ETNHvMXU=
code.cloudfoundry.org/go-loggregator/v8 v8.0.2-0.20200722201844-b5130958b65d/go.mod h1:Or3cWTXwK6d3caPRBTUJv/suT+47jOltB7hYC/3ECCo=
code.cloudfoundry.org/go-loggregator/v8 v8.0.5 h1:p1rrGxTwUqLjlUVtbjTAvKOSGNmPuBja8LeQOQgRrBc=
code.cloudfoundry.org/go-loggregator/v8 v8.0.5/go.mod h1:mLlJ1ZyG6gVvBEtYypvbztRvFeCtBsTxE9tt+85tS6Y=
Expand All @@ -65,6 +67,8 @@ code.cloudfoundry.org/lager v2.0.0+incompatible/go.mod h1:O2sS7gKP3HM2iemG+Enwvy
code.cloudfoundry.org/lager/v3 v3.0.2 h1:H0dcQY+814G1Ea0e5K/AMaMpcr+Pe5Iv+AALJEwrP9U=
code.cloudfoundry.org/lager/v3 v3.0.2/go.mod h1:zA6tOIWhr5uZUez+PGpdfBHDWQOfhOrr0cgKDagZPwk=
code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a/go.mod h1:tkZo8GtzBjySJ7USvxm4E36lNQw1D3xM6oKHGqdaAJ4=
code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78 h1:mrZQaZmuDIPhSp6b96b+CRKC2uH44ifa5cjDV2epKis=
code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78/go.mod h1:tkZo8GtzBjySJ7USvxm4E36lNQw1D3xM6oKHGqdaAJ4=
code.cloudfoundry.org/tlsconfig v0.0.0-20200131000646-bbe0f8da39b3/go.mod h1:eTbFJpyXRGuFVyg5+oaj9B2eIbIc+0/kZjH8ftbtdew=
code.cloudfoundry.org/tlsconfig v0.0.0-20230929201433-6cd2b78aba25 h1:vfCOuqnZi86Jg1EXMWtdCb9ieko4hna/CLzI6ECTgFA=
code.cloudfoundry.org/tlsconfig v0.0.0-20230929201433-6cd2b78aba25/go.mod h1:C8SxvGRSutmgzV2FxH8Zwqz2Q8HsaAITQRQFKhlDzPw=
Expand Down
122 changes: 85 additions & 37 deletions src/jetstream/plugins/cloudfoundry/cf_websocket_streams.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package cloudfoundry

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"

logcache "code.cloudfoundry.org/go-log-cache"
"code.cloudfoundry.org/go-log-cache/rpc/logcache_v1"
"code.cloudfoundry.org/go-loggregator/v8/rpc/loggregator_v2"
"github.com/cloudfoundry-incubator/stratos/src/jetstream/api"
"github.com/cloudfoundry/noaa"
"github.com/cloudfoundry/noaa/consumer"
noaa_errors "github.com/cloudfoundry/noaa/errors"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
Expand All @@ -31,19 +34,19 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

func (c CloudFoundrySpecification) appStream(echoContext echo.Context) error {
func (c *CloudFoundrySpecification) appStream(echoContext echo.Context) error {
return c.commonStreamHandler(echoContext, appStreamHandler)
}

func (c CloudFoundrySpecification) firehose(echoContext echo.Context) error {
func (c *CloudFoundrySpecification) firehose(echoContext echo.Context) error {
return c.commonStreamHandler(echoContext, firehoseStreamHandler)
}

func (c CloudFoundrySpecification) appFirehose(echoContext echo.Context) error {
func (c *CloudFoundrySpecification) appFirehose(echoContext echo.Context) error {
return c.commonStreamHandler(echoContext, appFirehoseStreamHandler)
}

func (c CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context, bespokeStreamHandler func(echo.Context, *AuthorizedConsumer, *websocket.Conn) error) error {
func (c *CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context, bespokeStreamHandler func(echo.Context, *AuthorizedConsumer, *websocket.Conn) error) error {
ac, err := c.openNoaaConsumer(echoContext)
if err != nil {
return err
Expand All @@ -67,13 +70,14 @@ func (c CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context,
}

type AuthorizedConsumer struct {
consumer *consumer.Consumer
authToken string
refreshToken func() error
consumer *consumer.Consumer
logCacheClient *logcache.Client
authToken string
refreshToken func() error
}

// Refresh the Authorization token if needed and create a new Noaa consumer
func (c CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (*AuthorizedConsumer, error) {
func (c *CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (*AuthorizedConsumer, error) {

ac := &AuthorizedConsumer{}

Expand Down Expand Up @@ -118,32 +122,75 @@ func (c CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (*
log.Debugf("Creating Noaa consumer for Doppler endpoint %s", dopplerAddress)
ac.consumer = consumer.New(dopplerAddress, &tls.Config{InsecureSkipVerify: true}, http.ProxyFromEnvironment)

//Open a LogCache client to the log cache endpoint
logCacheUrl := strings.Replace(cnsiRecord.APIEndpoint.String(), "api.sys.", "log-cache.sys.", 1)
log.Debugf("Creating LogCache client for endpoint %s", logCacheUrl)
ac.logCacheClient = logcache.NewClient(logCacheUrl, logcache.WithHTTPClient(
NewLogCacheHttpClient(func() string {
return ac.authToken
})),
)

return ac, nil
}

// Attempts to get the recent logs, if we get an unauthorized error we will refresh the auth token and retry once
func getRecentLogs(ac *AuthorizedConsumer, cnsiGUID, appGUID string) ([]*events.LogMessage, error) {
log.Debug("getRecentLogs")
messages, err := ac.consumer.RecentLogs(appGUID, ac.authToken)
if err != nil {
errorPattern := "Failed to get recent messages for App %s on CNSI %s [%v]"
if _, ok := err.(*noaa_errors.UnauthorizedError); ok {
// If unauthorized, we may need to refresh our Auth token
// Note: annoyingly, older versions of CF also send back "401 - Unauthorized" when the app doesn't exist...
// This means we sometimes end up here even when our token is legit
if err := ac.refreshToken(); err != nil {
return nil, fmt.Errorf(errorPattern, appGUID, cnsiGUID, err)
}
messages, err = ac.consumer.RecentLogs(appGUID, ac.authToken)
// Attempts to relay the recent logs, if we get an unauthorized error we will refresh the auth token and retry once
func relayRecentLogsFromCache(relay func(msg *events.LogMessage), ac *AuthorizedConsumer, appGUID string) error {
logLineRequestCount := 1000
var envelopes []*loggregator_v2.Envelope
var err error

for logLineRequestCount >= 1 {
envelopes, err = ac.logCacheClient.Read(
context.Background(),
appGUID,
time.Time{},
logcache.WithEnvelopeTypes(logcache_v1.EnvelopeType_LOG),
logcache.WithLimit(logLineRequestCount),
)
if err != nil && err.Error() == "unexpected status code 429" {
err = ac.refreshToken()
if err != nil {
msg := fmt.Sprintf(errorPattern, appGUID, cnsiGUID, err)
return nil, echo.NewHTTPError(http.StatusUnauthorized, msg)
return fmt.Errorf("cannot refresh token when reading from cache again cause %v", err)
}
} else {
return nil, fmt.Errorf(errorPattern, appGUID, cnsiGUID, err)
err = nil
continue
}
if err == nil || err.Error() != "unexpected status code 429" {
break
}
logLineRequestCount /= 2
}
if err != nil {
return fmt.Errorf("failed to retrieve logs from Log Cache: %s", err)
}
return messages, nil

for _, envelope := range envelopes {
logEnvelope, ok := envelope.GetMessage().(*loggregator_v2.Envelope_Log)
if !ok {
continue
}
log := logEnvelope.Log
relay(&events.LogMessage{
Message: log.Payload,
MessageType: func(t loggregator_v2.Log_Type) *events.LogMessage_MessageType {
var r events.LogMessage_MessageType
switch t {
case loggregator_v2.Log_OUT:
r = events.LogMessage_OUT
case loggregator_v2.Log_ERR:
r = events.LogMessage_ERR
}
return &r
}(log.Type),
Timestamp: func(i int64) *int64 { return &i }(envelope.GetTimestamp()),
AppId: &appGUID,
SourceType: func(s string) *string { return &s }(envelope.GetTags()["source_type"]),
SourceInstance: &envelope.InstanceId,
})
}

return err
}

func drainErrors(errorChan <-chan error) {
Expand Down Expand Up @@ -184,11 +231,6 @@ func appStreamHandler(echoContext echo.Context, ac *AuthorizedConsumer, clientWe
appGUID := echoContext.Param("appGuid")

log.Infof("Received request for log stream for App ID: %s - in CNSI: %s", appGUID, cnsiGUID)

messages, err := getRecentLogs(ac, cnsiGUID, appGUID)
if err != nil {
return err
}
// Reusable closure to pump messages from Noaa to the client WebSocket
// N.B. We convert protobuf messages to JSON for ease of use in the frontend
relayLogMsg := func(msg *events.LogMessage) {
Expand All @@ -202,9 +244,15 @@ func appStreamHandler(echoContext echo.Context, ac *AuthorizedConsumer, clientWe
}
}

// Send the recent messages, sorted in Chronological order
for _, msg := range noaa.SortRecent(messages) {
relayLogMsg(msg)
/*
* Split into two parts…
* 1. LogCache Read for recent logs - inspired by CF CLI in order to replace noaa RecentLogs
* https://github.com/cloudfoundry/stratos/issues/5037
* 2. Stream subsequent logs as before
*/
err := relayRecentLogsFromCache(relayLogMsg, ac, appGUID)
if err != nil {
log.Errorf("Cannot relay recent logs via cache cause %v", err)
}

msgChan, errorChan := ac.consumer.TailingLogs(appGUID, ac.authToken)
Expand Down
20 changes: 20 additions & 0 deletions src/jetstream/plugins/cloudfoundry/log_cache_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cloudfoundry

import "net/http"

type LogCacheHttpClient struct {
httpClient *http.Client
accessToken func() string
}

func NewLogCacheHttpClient(accessToken func() string) *LogCacheHttpClient {
return &LogCacheHttpClient{
httpClient: http.DefaultClient,
accessToken: accessToken,
}
}

func (c *LogCacheHttpClient) Do(req *http.Request) (*http.Response, error) {
req.Header.Set("Authorization", c.accessToken())
return c.httpClient.Do(req)
}

0 comments on commit 21ac64f

Please sign in to comment.