diff --git a/src/jetstream/go.mod b/src/jetstream/go.mod index bbab44e7c3..88f8657bcf 100644 --- a/src/jetstream/go.mod +++ b/src/jetstream/go.mod @@ -8,6 +8,7 @@ replace ( github.com/cloudfoundry-incubator/stratos/src/jetstream/crypto => ./crypto github.com/cloudfoundry-incubator/stratos/src/jetstream/docs => ./docs github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/cfapppush => ./plugins/cfapppush + github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/cloudfoundry => ./plugins/cloudfoundry github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes => ./plugins/kubernetes github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/auth => ./plugins/kubernetes/auth github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/terminal => ./plugins/kubernetes/terminal @@ -23,6 +24,8 @@ replace ( ) require ( + code.cloudfoundry.org/go-log-cache v1.0.1-0.20230224210401-5e305670b626 + code.cloudfoundry.org/go-loggregator v7.4.0+incompatible github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/antonlindstrom/pgstore v0.0.0-20220421113606-e3a6e3fed12a github.com/cf-stratos/mysqlstore v0.0.0-20170822100912-304308519d13 @@ -64,10 +67,10 @@ require ( code.cloudfoundry.org/cli v0.0.0-20230912192837-efd1d03e7292 // indirect code.cloudfoundry.org/cli-plugin-repo v0.0.0-20230912184324-f005268561a6 // indirect code.cloudfoundry.org/clock v1.1.0 // indirect - code.cloudfoundry.org/go-log-cache v1.0.1-0.20230224210401-5e305670b626 // indirect code.cloudfoundry.org/go-loggregator/v8 v8.0.5 // indirect code.cloudfoundry.org/gofileutils v0.0.0-20170111115228-4d0c80011a0f // indirect code.cloudfoundry.org/jsonry v1.1.4 // indirect + code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78 // indirect code.cloudfoundry.org/tlsconfig v0.0.0-20230929201433-6cd2b78aba25 // indirect code.cloudfoundry.org/ykk v0.0.0-20170424192843-e4df4ce2fd4d // indirect github.com/AdaLogics/go-fuzz-headers v0.0.0-20230106234847-43070de90fa1 // indirect diff --git a/src/jetstream/go.sum b/src/jetstream/go.sum index 9d4f7fe67a..190ea26493 100644 --- a/src/jetstream/go.sum +++ b/src/jetstream/go.sum @@ -53,6 +53,8 @@ code.cloudfoundry.org/go-diodes v0.0.0-20180905200951-72629b5276e3/go.mod h1:Jzi code.cloudfoundry.org/go-envstruct v1.5.0/go.mod h1:E2S/gzRZpZ51PZnIv7Bo7QvcgH18yio19upkrRk0xLU= code.cloudfoundry.org/go-log-cache v1.0.1-0.20211011162012-ede82a99d3cc h1:8gj5Z08i9ZvoIGi1A/E2CEQTbvJjogYQgBQUI2/DyNE= code.cloudfoundry.org/go-log-cache v1.0.1-0.20211011162012-ede82a99d3cc/go.mod h1:8thG6lrstlbeI44hc7QgSnX8eau68+mNt9Pp33/TEcg= +code.cloudfoundry.org/go-loggregator v7.4.0+incompatible h1:KqZYloMQWM5Zg/BQKunOIA4OODh7djZbk48qqbowNFI= +code.cloudfoundry.org/go-loggregator v7.4.0+incompatible/go.mod h1:KPBTRqj+y738Nhf1+g4JHFaBU8j7dedirR5ETNHvMXU= code.cloudfoundry.org/go-loggregator/v8 v8.0.2-0.20200722201844-b5130958b65d/go.mod h1:Or3cWTXwK6d3caPRBTUJv/suT+47jOltB7hYC/3ECCo= code.cloudfoundry.org/go-loggregator/v8 v8.0.5 h1:p1rrGxTwUqLjlUVtbjTAvKOSGNmPuBja8LeQOQgRrBc= code.cloudfoundry.org/go-loggregator/v8 v8.0.5/go.mod h1:mLlJ1ZyG6gVvBEtYypvbztRvFeCtBsTxE9tt+85tS6Y= @@ -65,6 +67,8 @@ code.cloudfoundry.org/lager v2.0.0+incompatible/go.mod h1:O2sS7gKP3HM2iemG+Enwvy code.cloudfoundry.org/lager/v3 v3.0.2 h1:H0dcQY+814G1Ea0e5K/AMaMpcr+Pe5Iv+AALJEwrP9U= code.cloudfoundry.org/lager/v3 v3.0.2/go.mod h1:zA6tOIWhr5uZUez+PGpdfBHDWQOfhOrr0cgKDagZPwk= code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a/go.mod h1:tkZo8GtzBjySJ7USvxm4E36lNQw1D3xM6oKHGqdaAJ4= +code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78 h1:mrZQaZmuDIPhSp6b96b+CRKC2uH44ifa5cjDV2epKis= +code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78/go.mod h1:tkZo8GtzBjySJ7USvxm4E36lNQw1D3xM6oKHGqdaAJ4= code.cloudfoundry.org/tlsconfig v0.0.0-20200131000646-bbe0f8da39b3/go.mod h1:eTbFJpyXRGuFVyg5+oaj9B2eIbIc+0/kZjH8ftbtdew= code.cloudfoundry.org/tlsconfig v0.0.0-20230929201433-6cd2b78aba25 h1:vfCOuqnZi86Jg1EXMWtdCb9ieko4hna/CLzI6ECTgFA= code.cloudfoundry.org/tlsconfig v0.0.0-20230929201433-6cd2b78aba25/go.mod h1:C8SxvGRSutmgzV2FxH8Zwqz2Q8HsaAITQRQFKhlDzPw= diff --git a/src/jetstream/plugins/cloudfoundry/cf_websocket_streams.go b/src/jetstream/plugins/cloudfoundry/cf_websocket_streams.go index 1e5ee6e309..cc55baaa25 100644 --- a/src/jetstream/plugins/cloudfoundry/cf_websocket_streams.go +++ b/src/jetstream/plugins/cloudfoundry/cf_websocket_streams.go @@ -1,17 +1,20 @@ package cloudfoundry import ( + "context" "crypto/tls" "encoding/json" "fmt" "net/http" "strconv" + "strings" "time" + logcache "code.cloudfoundry.org/go-log-cache" + "code.cloudfoundry.org/go-log-cache/rpc/logcache_v1" + "code.cloudfoundry.org/go-loggregator/v8/rpc/loggregator_v2" "github.com/cloudfoundry-incubator/stratos/src/jetstream/api" - "github.com/cloudfoundry/noaa" "github.com/cloudfoundry/noaa/consumer" - noaa_errors "github.com/cloudfoundry/noaa/errors" "github.com/cloudfoundry/sonde-go/events" "github.com/gorilla/websocket" "github.com/labstack/echo/v4" @@ -31,19 +34,19 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } -func (c CloudFoundrySpecification) appStream(echoContext echo.Context) error { +func (c *CloudFoundrySpecification) appStream(echoContext echo.Context) error { return c.commonStreamHandler(echoContext, appStreamHandler) } -func (c CloudFoundrySpecification) firehose(echoContext echo.Context) error { +func (c *CloudFoundrySpecification) firehose(echoContext echo.Context) error { return c.commonStreamHandler(echoContext, firehoseStreamHandler) } -func (c CloudFoundrySpecification) appFirehose(echoContext echo.Context) error { +func (c *CloudFoundrySpecification) appFirehose(echoContext echo.Context) error { return c.commonStreamHandler(echoContext, appFirehoseStreamHandler) } -func (c CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context, bespokeStreamHandler func(echo.Context, *AuthorizedConsumer, *websocket.Conn) error) error { +func (c *CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context, bespokeStreamHandler func(echo.Context, *AuthorizedConsumer, *websocket.Conn) error) error { ac, err := c.openNoaaConsumer(echoContext) if err != nil { return err @@ -67,13 +70,14 @@ func (c CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context, } type AuthorizedConsumer struct { - consumer *consumer.Consumer - authToken string - refreshToken func() error + consumer *consumer.Consumer + logCacheClient *logcache.Client + authToken string + refreshToken func() error } // Refresh the Authorization token if needed and create a new Noaa consumer -func (c CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (*AuthorizedConsumer, error) { +func (c *CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (*AuthorizedConsumer, error) { ac := &AuthorizedConsumer{} @@ -118,32 +122,75 @@ func (c CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (* log.Debugf("Creating Noaa consumer for Doppler endpoint %s", dopplerAddress) ac.consumer = consumer.New(dopplerAddress, &tls.Config{InsecureSkipVerify: true}, http.ProxyFromEnvironment) + //Open a LogCache client to the log cache endpoint + logCacheUrl := strings.Replace(cnsiRecord.APIEndpoint.String(), "api.sys.", "log-cache.sys.", 1) + log.Debugf("Creating LogCache client for endpoint %s", logCacheUrl) + ac.logCacheClient = logcache.NewClient(logCacheUrl, logcache.WithHTTPClient( + NewLogCacheHttpClient(func() string { + return ac.authToken + })), + ) + return ac, nil } -// Attempts to get the recent logs, if we get an unauthorized error we will refresh the auth token and retry once -func getRecentLogs(ac *AuthorizedConsumer, cnsiGUID, appGUID string) ([]*events.LogMessage, error) { - log.Debug("getRecentLogs") - messages, err := ac.consumer.RecentLogs(appGUID, ac.authToken) - if err != nil { - errorPattern := "Failed to get recent messages for App %s on CNSI %s [%v]" - if _, ok := err.(*noaa_errors.UnauthorizedError); ok { - // If unauthorized, we may need to refresh our Auth token - // Note: annoyingly, older versions of CF also send back "401 - Unauthorized" when the app doesn't exist... - // This means we sometimes end up here even when our token is legit - if err := ac.refreshToken(); err != nil { - return nil, fmt.Errorf(errorPattern, appGUID, cnsiGUID, err) - } - messages, err = ac.consumer.RecentLogs(appGUID, ac.authToken) +// Attempts to relay the recent logs, if we get an unauthorized error we will refresh the auth token and retry once +func relayRecentLogsFromCache(relay func(msg *events.LogMessage), ac *AuthorizedConsumer, appGUID string) error { + logLineRequestCount := 1000 + var envelopes []*loggregator_v2.Envelope + var err error + + for logLineRequestCount >= 1 { + envelopes, err = ac.logCacheClient.Read( + context.Background(), + appGUID, + time.Time{}, + logcache.WithEnvelopeTypes(logcache_v1.EnvelopeType_LOG), + logcache.WithLimit(logLineRequestCount), + ) + if err != nil && err.Error() == "unexpected status code 429" { + err = ac.refreshToken() if err != nil { - msg := fmt.Sprintf(errorPattern, appGUID, cnsiGUID, err) - return nil, echo.NewHTTPError(http.StatusUnauthorized, msg) + return fmt.Errorf("cannot refresh token when reading from cache again cause %v", err) } - } else { - return nil, fmt.Errorf(errorPattern, appGUID, cnsiGUID, err) + err = nil + continue } + if err == nil || err.Error() != "unexpected status code 429" { + break + } + logLineRequestCount /= 2 + } + if err != nil { + return fmt.Errorf("failed to retrieve logs from Log Cache: %s", err) } - return messages, nil + + for _, envelope := range envelopes { + logEnvelope, ok := envelope.GetMessage().(*loggregator_v2.Envelope_Log) + if !ok { + continue + } + log := logEnvelope.Log + relay(&events.LogMessage{ + Message: log.Payload, + MessageType: func(t loggregator_v2.Log_Type) *events.LogMessage_MessageType { + var r events.LogMessage_MessageType + switch t { + case loggregator_v2.Log_OUT: + r = events.LogMessage_OUT + case loggregator_v2.Log_ERR: + r = events.LogMessage_ERR + } + return &r + }(log.Type), + Timestamp: func(i int64) *int64 { return &i }(envelope.GetTimestamp()), + AppId: &appGUID, + SourceType: func(s string) *string { return &s }(envelope.GetTags()["source_type"]), + SourceInstance: &envelope.InstanceId, + }) + } + + return err } func drainErrors(errorChan <-chan error) { @@ -184,11 +231,6 @@ func appStreamHandler(echoContext echo.Context, ac *AuthorizedConsumer, clientWe appGUID := echoContext.Param("appGuid") log.Infof("Received request for log stream for App ID: %s - in CNSI: %s", appGUID, cnsiGUID) - - messages, err := getRecentLogs(ac, cnsiGUID, appGUID) - if err != nil { - return err - } // Reusable closure to pump messages from Noaa to the client WebSocket // N.B. We convert protobuf messages to JSON for ease of use in the frontend relayLogMsg := func(msg *events.LogMessage) { @@ -202,9 +244,15 @@ func appStreamHandler(echoContext echo.Context, ac *AuthorizedConsumer, clientWe } } - // Send the recent messages, sorted in Chronological order - for _, msg := range noaa.SortRecent(messages) { - relayLogMsg(msg) + /* + * Split into two parts… + * 1. LogCache Read for recent logs - inspired by CF CLI in order to replace noaa RecentLogs + * https://github.com/cloudfoundry/stratos/issues/5037 + * 2. Stream subsequent logs as before + */ + err := relayRecentLogsFromCache(relayLogMsg, ac, appGUID) + if err != nil { + log.Errorf("Cannot relay recent logs via cache cause %v", err) } msgChan, errorChan := ac.consumer.TailingLogs(appGUID, ac.authToken) diff --git a/src/jetstream/plugins/cloudfoundry/log_cache_client.go b/src/jetstream/plugins/cloudfoundry/log_cache_client.go new file mode 100644 index 0000000000..31d488530b --- /dev/null +++ b/src/jetstream/plugins/cloudfoundry/log_cache_client.go @@ -0,0 +1,20 @@ +package cloudfoundry + +import "net/http" + +type LogCacheHttpClient struct { + httpClient *http.Client + accessToken func() string +} + +func NewLogCacheHttpClient(accessToken func() string) *LogCacheHttpClient { + return &LogCacheHttpClient{ + httpClient: http.DefaultClient, + accessToken: accessToken, + } +} + +func (c *LogCacheHttpClient) Do(req *http.Request) (*http.Response, error) { + req.Header.Set("Authorization", c.accessToken()) + return c.httpClient.Do(req) +}