diff --git a/go.mod b/go.mod index ae0a742..9a88a40 100644 --- a/go.mod +++ b/go.mod @@ -27,11 +27,13 @@ require ( filippo.io/edwards25519 v1.0.0-rc.1 // indirect github.com/VividCortex/ewma v1.2.0 // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect + github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect github.com/benbjohnson/clock v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blendle/zapdriver v1.3.1 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dfuse-io/logging v0.0.0-20210109005628-b97a57253f70 // indirect @@ -47,6 +49,8 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/uuid v1.1.2 // indirect + github.com/gorilla/rpc v1.2.0 // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/go-cleanhttp v0.5.1 // indirect github.com/hashicorp/go-hclog v0.12.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.0 // indirect @@ -87,6 +91,7 @@ require ( github.com/ugorji/go/codec v1.2.7 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/multierr v1.8.0 // indirect + go.uber.org/ratelimit v0.2.0 // indirect golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect diff --git a/go.sum b/go.sum index 51c9e8d..dcb9ef5 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -95,6 +96,7 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/blendle/zapdriver v1.3.1 h1:C3dydBOWYRiOk+B8X9IVZ5IOe+7cl+tGOexN4QqHfpE= github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -264,7 +266,9 @@ github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0 github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk= github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= @@ -428,6 +432,7 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -581,6 +586,7 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA= go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= diff --git a/internal/cmd/sidecar/sidecar.go b/internal/cmd/sidecar/sidecar.go index 15d8e42..1904235 100644 --- a/internal/cmd/sidecar/sidecar.go +++ b/internal/cmd/sidecar/sidecar.go @@ -41,6 +41,7 @@ var ( netInterface string listenPort uint16 ledgerDir string + rpcWsUrl string ) func init() { @@ -48,6 +49,7 @@ func init() { flags.StringVar(&netInterface, "interface", "", "Only accept connections from this interface") flags.Uint16Var(&listenPort, "port", 13080, "Listen port") flags.StringVar(&ledgerDir, "ledger", "", "Path to ledger dir") + flags.StringVar(&rpcWsUrl, "ws", "ws://localhost:8900", "Solana RPC PubSub WebSocket endpoint") flags.AddFlagSet(logger.Flags) } @@ -68,8 +70,12 @@ func run() { server.Use(ginzap.RecoveryWithZap(httpLog, false)) groupV1 := server.Group("/v1") - handler := sidecar.NewHandler(ledgerDir, httpLog) - handler.RegisterHandlers(groupV1) + + snapshotHandler := sidecar.NewSnapshotHandler(ledgerDir, httpLog) + snapshotHandler.RegisterHandlers(groupV1) + + consensusHandler := sidecar.NewConsensusHandler(rpcWsUrl, httpLog) + consensusHandler.RegisterHandlers(groupV1) err = server.RunListener(listener) log.Error("Server stopped", zap.Error(err)) diff --git a/internal/integrationtest/sidecar_test.go b/internal/integrationtest/sidecar_test.go index b61cb9d..fc794cd 100644 --- a/internal/integrationtest/sidecar_test.go +++ b/internal/integrationtest/sidecar_test.go @@ -86,7 +86,7 @@ func newSidecar(t *testing.T, slots ...uint64) (server *httptest.Server, root *l } ledgerDir := root.GetLedgerDir(t) - handler := &sidecar.Handler{ + handler := &sidecar.SnapshotHandler{ LedgerDir: ledgerDir, Log: zaptest.NewLogger(t), } diff --git a/internal/sidecar/consensus.go b/internal/sidecar/consensus.go new file mode 100644 index 0000000..a57fc7b --- /dev/null +++ b/internal/sidecar/consensus.go @@ -0,0 +1,79 @@ +// Copyright 2022 Blockdaemon Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sidecar + +import ( + "io" + "net/http" + + "github.com/gagliardetto/solana-go/rpc/ws" + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +// ConsensusHandler implements the consensus-related sidecar API methods. +type ConsensusHandler struct { + RpcWsUrl string + Log *zap.Logger +} + +// NewConsensusHandler creates a new sidecar consensus API handler using the provided WS RPC and logger. +func NewConsensusHandler(rpcWsUrl string, log *zap.Logger) *ConsensusHandler { + return &ConsensusHandler{ + RpcWsUrl: rpcWsUrl, + Log: log, + } +} + +// RegisterHandlers registers this API with Gin web framework. +func (h *ConsensusHandler) RegisterHandlers(group gin.IRoutes) { + group.GET("/slot_updates", h.GetSlotUpdates) +} + +// GetSlotUpdates streams RPC "slotsUpdatesSubscribe" events via SSE. +func (h *ConsensusHandler) GetSlotUpdates(c *gin.Context) { + ctx := c.Request.Context() + + conn, err := ws.Connect(ctx, h.RpcWsUrl) + if err != nil { + h.Log.Error("Failed to connect to Solana RPC WebSocket", zap.Error(err)) + c.AbortWithStatus(http.StatusBadGateway) + return + } + defer conn.Close() + + go func() { + <-ctx.Done() + conn.Close() + }() + + slotUpdates, err := conn.SlotsUpdatesSubscribe() + if err != nil { + h.Log.Error("Failed to connect to subscribe to slot updates", zap.Error(err)) + c.AbortWithStatus(http.StatusServiceUnavailable) + return + } + defer slotUpdates.Unsubscribe() + + c.Stream(func(w io.Writer) bool { + update, err := slotUpdates.Recv() + if err != nil { + h.Log.Error("Failed to receive slot update event", zap.Error(err)) + return false + } + c.SSEvent("slot_update", update) + return true + }) +} diff --git a/internal/sidecar/sidecar.go b/internal/sidecar/snapshot.go similarity index 85% rename from internal/sidecar/sidecar.go rename to internal/sidecar/snapshot.go index 06fa08d..32c38ac 100644 --- a/internal/sidecar/sidecar.go +++ b/internal/sidecar/snapshot.go @@ -28,22 +28,22 @@ import ( "go.uber.org/zap" ) -// Handler implements the sidecar API methods. -type Handler struct { +// SnapshotHandler implements the snapshot-related sidecar API methods. +type SnapshotHandler struct { LedgerDir fs.FS Log *zap.Logger } -// NewHandler creates a new sidecar API using the provided ledger dir and logger. -func NewHandler(ledgerDir string, log *zap.Logger) *Handler { - return &Handler{ +// NewSnapshotHandler creates a new sidecar snapshot API handler using the provided ledger dir and logger. +func NewSnapshotHandler(ledgerDir string, log *zap.Logger) *SnapshotHandler { + return &SnapshotHandler{ LedgerDir: os.DirFS(ledgerDir), Log: log, } } // RegisterHandlers registers this API with Gin web framework. -func (s *Handler) RegisterHandlers(group gin.IRoutes) { +func (s *SnapshotHandler) RegisterHandlers(group gin.IRoutes) { group.GET("/snapshots", s.ListSnapshots) group.HEAD("/snapshot.tar.bz2", s.DownloadBestSnapshot) group.GET("/snapshot.tar.bz2", s.DownloadBestSnapshot) @@ -54,7 +54,7 @@ func (s *Handler) RegisterHandlers(group gin.IRoutes) { } // ListSnapshots is an API handler listing available snapshots on the node. -func (s *Handler) ListSnapshots(c *gin.Context) { +func (s *SnapshotHandler) ListSnapshots(c *gin.Context) { infos, err := ledger.ListSnapshots(s.LedgerDir) if err != nil { s.Log.Error("Failed to list snapshots", zap.Error(err)) @@ -68,7 +68,7 @@ func (s *Handler) ListSnapshots(c *gin.Context) { } // DownloadBestSnapshot selects the best full snapshot and sends it to the client. -func (s *Handler) DownloadBestSnapshot(c *gin.Context) { +func (s *SnapshotHandler) DownloadBestSnapshot(c *gin.Context) { files, err := ledger.ListSnapshotFiles(s.LedgerDir) if err != nil { s.Log.Error("Failed to list snapshot files", zap.Error(err)) @@ -85,7 +85,7 @@ func (s *Handler) DownloadBestSnapshot(c *gin.Context) { } // DownloadSnapshot sends a snapshot to the client. -func (s *Handler) DownloadSnapshot(c *gin.Context) { +func (s *SnapshotHandler) DownloadSnapshot(c *gin.Context) { // Parse name and reject odd requests. name := c.Param("name") snapshot := ledger.ParseSnapshotFileName(name) @@ -106,7 +106,7 @@ func (s *Handler) DownloadSnapshot(c *gin.Context) { s.serveSnapshot(c, name) } -func (s *Handler) serveSnapshot(c *gin.Context, name string) { +func (s *SnapshotHandler) serveSnapshot(c *gin.Context, name string) { log := s.Log.With(zap.String("snapshot", name)) // Open file. diff --git a/internal/sidecar/sidecar_test.go b/internal/sidecar/snapshot_test.go similarity index 85% rename from internal/sidecar/sidecar_test.go rename to internal/sidecar/snapshot_test.go index d51b1e0..006aee7 100644 --- a/internal/sidecar/sidecar_test.go +++ b/internal/sidecar/snapshot_test.go @@ -25,13 +25,13 @@ import ( "go.uber.org/zap/zaptest" ) -func newRouter(h *Handler) http.Handler { +func newRouter(h *SnapshotHandler) http.Handler { router := gin.Default() h.RegisterHandlers(router) return router } -func testRequest(h *Handler, req *http.Request) *httptest.ResponseRecorder { +func testRequest(h *SnapshotHandler, req *http.Request) *httptest.ResponseRecorder { router := newRouter(h) w := httptest.NewRecorder() router.ServeHTTP(w, req) @@ -39,7 +39,7 @@ func testRequest(h *Handler, req *http.Request) *httptest.ResponseRecorder { } func TestHandler_ListSnapshots_Error(t *testing.T) { - h := NewHandler("???/some/nonexistent/path", zaptest.NewLogger(t)) + h := NewSnapshotHandler("???/some/nonexistent/path", zaptest.NewLogger(t)) req, err := http.NewRequest(http.MethodGet, "/snapshots", nil) require.NoError(t, err)