diff --git a/api/api.go b/api/api.go index fd5bc77b..ff2d9b6b 100644 --- a/api/api.go +++ b/api/api.go @@ -109,7 +109,9 @@ type ( // A RHPSessionReporter reports on RHP session lifecycle events RHPSessionReporter interface { - Subscribe(sub rhp.SessionSubscriber) + Subscribe(rhp.SessionSubscriber) + Unsubscribe(rhp.SessionSubscriber) + Active() []rhp.Session } @@ -205,7 +207,8 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C "DELETE /volumes/:id/cancel": api.handleDELETEVolumeCancelOp, "PUT /volumes/:id/resize": api.handlePUTVolumeResize, // session endpoints - "GET /sessions": api.handleGETSessions, + "GET /sessions": api.handleGETSessions, + "GET /sessions/subscribe": api.handleGETSessionsSubscribe, // tpool endpoints "GET /tpool/fee": api.handleGETTPoolFee, // wallet endpoints diff --git a/api/endpoints.go b/api/endpoints.go index de32b6aa..16579f29 100644 --- a/api/endpoints.go +++ b/api/endpoints.go @@ -317,10 +317,6 @@ func (a *api) handlePUTVolume(c jape.Context) { a.checkServerError(c, "failed to update volume", err) } -func (a *api) handleGETSessions(c jape.Context) { - c.Encode(a.sessions.Active()) -} - func (a *api) handleDeleteSector(c jape.Context) { var root types.Hash256 if err := c.DecodeParam("root", &root); err != nil { diff --git a/api/rhpsessions.go b/api/rhpsessions.go new file mode 100644 index 00000000..5bebf072 --- /dev/null +++ b/api/rhpsessions.go @@ -0,0 +1,45 @@ +package api + +import ( + "context" + "encoding/json" + + "go.sia.tech/hostd/rhp" + "go.sia.tech/jape" + "go.uber.org/zap" + "nhooyr.io/websocket" +) + +type rhpSessionSubscriber struct { + conn *websocket.Conn +} + +func (rs *rhpSessionSubscriber) ReceiveSessionEvent(event rhp.SessionEvent) { + buf, err := json.Marshal(event) + if err != nil { + return + } + rs.conn.Write(context.Background(), websocket.MessageText, buf) +} + +func (a *api) handleGETSessions(c jape.Context) { + c.Encode(a.sessions.Active()) +} + +func (a *api) handleGETSessionsSubscribe(c jape.Context) { + wsc, err := websocket.Accept(c.ResponseWriter, c.Request, &websocket.AcceptOptions{ + OriginPatterns: []string{"*"}, + }) + if err != nil { + a.log.Warn("failed to accept websocket connection", zap.Error(err)) + return + } + defer wsc.Close(websocket.StatusNormalClosure, "") + + // subscribe the websocket conn + sub := &rhpSessionSubscriber{ + conn: wsc, + } + a.sessions.Subscribe(sub) + defer a.sessions.Unsubscribe(sub) +}