Skip to content

Commit

Permalink
api: add session websocket endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Sep 20, 2023
1 parent 3669e94 commit 170ff3b
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
7 changes: 5 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ type (

// A RHPSessionReporter reports on RHP session lifecycle events
RHPSessionReporter interface {
Subscribe(sub rhp.SessionSubscriber)
Subscribe(rhp.SessionSubscriber)
Unsubscribe(rhp.SessionSubscriber)

Active() []rhp.Session
}

Expand Down Expand Up @@ -205,7 +207,8 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C
"DELETE /volumes/:id/cancel": api.handleDELETEVolumeCancelOp,
"PUT /volumes/:id/resize": api.handlePUTVolumeResize,
// session endpoints
"GET /sessions": api.handleGETSessions,
"GET /sessions": api.handleGETSessions,
"GET /sessions/subscribe": api.handleGETSessionsSubscribe,
// tpool endpoints
"GET /tpool/fee": api.handleGETTPoolFee,
// wallet endpoints
Expand Down
4 changes: 0 additions & 4 deletions api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,6 @@ func (a *api) handlePUTVolume(c jape.Context) {
a.checkServerError(c, "failed to update volume", err)
}

func (a *api) handleGETSessions(c jape.Context) {
c.Encode(a.sessions.Active())
}

func (a *api) handleDeleteSector(c jape.Context) {
var root types.Hash256
if err := c.DecodeParam("root", &root); err != nil {
Expand Down
45 changes: 45 additions & 0 deletions api/rhpsessions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package api

import (
"context"
"encoding/json"

"go.sia.tech/hostd/rhp"
"go.sia.tech/jape"
"go.uber.org/zap"
"nhooyr.io/websocket"
)

type rhpSessionSubscriber struct {
conn *websocket.Conn
}

func (rs *rhpSessionSubscriber) ReceiveSessionEvent(event rhp.SessionEvent) {
buf, err := json.Marshal(event)
if err != nil {
return
}
rs.conn.Write(context.Background(), websocket.MessageText, buf)
}

func (a *api) handleGETSessions(c jape.Context) {
c.Encode(a.sessions.Active())
}

func (a *api) handleGETSessionsSubscribe(c jape.Context) {
wsc, err := websocket.Accept(c.ResponseWriter, c.Request, &websocket.AcceptOptions{
OriginPatterns: []string{"*"},
})
if err != nil {
a.log.Warn("failed to accept websocket connection", zap.Error(err))
return
}
defer wsc.Close(websocket.StatusNormalClosure, "")

// subscribe the websocket conn
sub := &rhpSessionSubscriber{
conn: wsc,
}
a.sessions.Subscribe(sub)
defer a.sessions.Unsubscribe(sub)
}

0 comments on commit 170ff3b

Please sign in to comment.