Skip to content

Commit

Permalink
Merge pull request Blockdaemon#24 from Blockdaemon/consensus-tracker
Browse files Browse the repository at this point in the history
sidecar: add consensus API
  • Loading branch information
nyetwurk authored Jul 15, 2022
2 parents 834bb23 + 9fb1181 commit 5cca5d1
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 16 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ require (
filippo.io/edwards25519 v1.0.0-rc.1 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dfuse-io/logging v0.0.0-20210109005628-b97a57253f70 // indirect
Expand All @@ -47,6 +49,8 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/gorilla/rpc v1.2.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-hclog v0.12.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.0 // indirect
Expand Down Expand Up @@ -87,6 +91,7 @@ require (
github.com/ugorji/go/codec v1.2.7 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/ratelimit v0.2.0 // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
Expand All @@ -95,6 +96,7 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blendle/zapdriver v1.3.1 h1:C3dydBOWYRiOk+B8X9IVZ5IOe+7cl+tGOexN4QqHfpE=
github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -264,7 +266,9 @@ github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk=
github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
Expand Down Expand Up @@ -428,6 +432,7 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down Expand Up @@ -581,6 +586,7 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA=
go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
Expand Down
10 changes: 8 additions & 2 deletions internal/cmd/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ var (
netInterface string
listenPort uint16
ledgerDir string
rpcWsUrl string
)

func init() {
flags := Cmd.Flags()
flags.StringVar(&netInterface, "interface", "", "Only accept connections from this interface")
flags.Uint16Var(&listenPort, "port", 13080, "Listen port")
flags.StringVar(&ledgerDir, "ledger", "", "Path to ledger dir")
flags.StringVar(&rpcWsUrl, "ws", "ws://localhost:8900", "Solana RPC PubSub WebSocket endpoint")
flags.AddFlagSet(logger.Flags)
}

Expand All @@ -68,8 +70,12 @@ func run() {
server.Use(ginzap.RecoveryWithZap(httpLog, false))

groupV1 := server.Group("/v1")
handler := sidecar.NewHandler(ledgerDir, httpLog)
handler.RegisterHandlers(groupV1)

snapshotHandler := sidecar.NewSnapshotHandler(ledgerDir, httpLog)
snapshotHandler.RegisterHandlers(groupV1)

consensusHandler := sidecar.NewConsensusHandler(rpcWsUrl, httpLog)
consensusHandler.RegisterHandlers(groupV1)

err = server.RunListener(listener)
log.Error("Server stopped", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion internal/integrationtest/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newSidecar(t *testing.T, slots ...uint64) (server *httptest.Server, root *l
}
ledgerDir := root.GetLedgerDir(t)

handler := &sidecar.Handler{
handler := &sidecar.SnapshotHandler{
LedgerDir: ledgerDir,
Log: zaptest.NewLogger(t),
}
Expand Down
79 changes: 79 additions & 0 deletions internal/sidecar/consensus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 Blockdaemon Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sidecar

import (
"io"
"net/http"

"github.com/gagliardetto/solana-go/rpc/ws"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)

// ConsensusHandler implements the consensus-related sidecar API methods.
type ConsensusHandler struct {
RpcWsUrl string
Log *zap.Logger
}

// NewConsensusHandler creates a new sidecar consensus API handler using the provided WS RPC and logger.
func NewConsensusHandler(rpcWsUrl string, log *zap.Logger) *ConsensusHandler {
return &ConsensusHandler{
RpcWsUrl: rpcWsUrl,
Log: log,
}
}

// RegisterHandlers registers this API with Gin web framework.
func (h *ConsensusHandler) RegisterHandlers(group gin.IRoutes) {
group.GET("/slot_updates", h.GetSlotUpdates)
}

// GetSlotUpdates streams RPC "slotsUpdatesSubscribe" events via SSE.
func (h *ConsensusHandler) GetSlotUpdates(c *gin.Context) {
ctx := c.Request.Context()

conn, err := ws.Connect(ctx, h.RpcWsUrl)
if err != nil {
h.Log.Error("Failed to connect to Solana RPC WebSocket", zap.Error(err))
c.AbortWithStatus(http.StatusBadGateway)
return
}
defer conn.Close()

go func() {
<-ctx.Done()
conn.Close()
}()

slotUpdates, err := conn.SlotsUpdatesSubscribe()
if err != nil {
h.Log.Error("Failed to connect to subscribe to slot updates", zap.Error(err))
c.AbortWithStatus(http.StatusServiceUnavailable)
return
}
defer slotUpdates.Unsubscribe()

c.Stream(func(w io.Writer) bool {
update, err := slotUpdates.Recv()
if err != nil {
h.Log.Error("Failed to receive slot update event", zap.Error(err))
return false
}
c.SSEvent("slot_update", update)
return true
})
}
20 changes: 10 additions & 10 deletions internal/sidecar/sidecar.go → internal/sidecar/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ import (
"go.uber.org/zap"
)

// Handler implements the sidecar API methods.
type Handler struct {
// SnapshotHandler implements the snapshot-related sidecar API methods.
type SnapshotHandler struct {
LedgerDir fs.FS
Log *zap.Logger
}

// NewHandler creates a new sidecar API using the provided ledger dir and logger.
func NewHandler(ledgerDir string, log *zap.Logger) *Handler {
return &Handler{
// NewSnapshotHandler creates a new sidecar snapshot API handler using the provided ledger dir and logger.
func NewSnapshotHandler(ledgerDir string, log *zap.Logger) *SnapshotHandler {
return &SnapshotHandler{
LedgerDir: os.DirFS(ledgerDir),
Log: log,
}
}

// RegisterHandlers registers this API with Gin web framework.
func (s *Handler) RegisterHandlers(group gin.IRoutes) {
func (s *SnapshotHandler) RegisterHandlers(group gin.IRoutes) {
group.GET("/snapshots", s.ListSnapshots)
group.HEAD("/snapshot.tar.bz2", s.DownloadBestSnapshot)
group.GET("/snapshot.tar.bz2", s.DownloadBestSnapshot)
Expand All @@ -54,7 +54,7 @@ func (s *Handler) RegisterHandlers(group gin.IRoutes) {
}

// ListSnapshots is an API handler listing available snapshots on the node.
func (s *Handler) ListSnapshots(c *gin.Context) {
func (s *SnapshotHandler) ListSnapshots(c *gin.Context) {
infos, err := ledger.ListSnapshots(s.LedgerDir)
if err != nil {
s.Log.Error("Failed to list snapshots", zap.Error(err))
Expand All @@ -68,7 +68,7 @@ func (s *Handler) ListSnapshots(c *gin.Context) {
}

// DownloadBestSnapshot selects the best full snapshot and sends it to the client.
func (s *Handler) DownloadBestSnapshot(c *gin.Context) {
func (s *SnapshotHandler) DownloadBestSnapshot(c *gin.Context) {
files, err := ledger.ListSnapshotFiles(s.LedgerDir)
if err != nil {
s.Log.Error("Failed to list snapshot files", zap.Error(err))
Expand All @@ -85,7 +85,7 @@ func (s *Handler) DownloadBestSnapshot(c *gin.Context) {
}

// DownloadSnapshot sends a snapshot to the client.
func (s *Handler) DownloadSnapshot(c *gin.Context) {
func (s *SnapshotHandler) DownloadSnapshot(c *gin.Context) {
// Parse name and reject odd requests.
name := c.Param("name")
snapshot := ledger.ParseSnapshotFileName(name)
Expand All @@ -106,7 +106,7 @@ func (s *Handler) DownloadSnapshot(c *gin.Context) {
s.serveSnapshot(c, name)
}

func (s *Handler) serveSnapshot(c *gin.Context, name string) {
func (s *SnapshotHandler) serveSnapshot(c *gin.Context, name string) {
log := s.Log.With(zap.String("snapshot", name))

// Open file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ import (
"go.uber.org/zap/zaptest"
)

func newRouter(h *Handler) http.Handler {
func newRouter(h *SnapshotHandler) http.Handler {
router := gin.Default()
h.RegisterHandlers(router)
return router
}

func testRequest(h *Handler, req *http.Request) *httptest.ResponseRecorder {
func testRequest(h *SnapshotHandler, req *http.Request) *httptest.ResponseRecorder {
router := newRouter(h)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
return w
}

func TestHandler_ListSnapshots_Error(t *testing.T) {
h := NewHandler("???/some/nonexistent/path", zaptest.NewLogger(t))
h := NewSnapshotHandler("???/some/nonexistent/path", zaptest.NewLogger(t))

req, err := http.NewRequest(http.MethodGet, "/snapshots", nil)
require.NoError(t, err)
Expand Down

0 comments on commit 5cca5d1

Please sign in to comment.