Skip to content

Commit

Permalink
Merge pull request #16 from cyverse-de/httphandlers
Browse files Browse the repository at this point in the history
Adds HTTP handlers for the NATS messages handled by subscriptions service
  • Loading branch information
johnworth authored Jun 6, 2024
2 parents a8af34c + 9b4b466 commit 3528661
Show file tree
Hide file tree
Showing 12 changed files with 1,156 additions and 511 deletions.
584 changes: 392 additions & 192 deletions app/addons.go

Large diffs are not rendered by default.

207 changes: 157 additions & 50 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package app
import (
"context"
"fmt"
"net/http"
"strings"

"github.com/cyverse-de/go-mod/logging"
"github.com/cyverse-de/go-mod/pbinit"
"github.com/cyverse-de/p/go/qms"
"github.com/cyverse-de/subscriptions/common"
"github.com/cyverse-de/subscriptions/db"
"github.com/cyverse-de/subscriptions/errors"
"github.com/cyverse-de/subscriptions/natscl"
"github.com/jmoiron/sqlx"
"github.com/labstack/echo/v4"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -22,17 +25,67 @@ var log = logging.Log.WithFields(logrus.Fields{"package": "apps"})
type App struct {
client *natscl.Client
db *sqlx.DB
Router *echo.Echo
userSuffix string
ReportOverages bool
}

func New(client *natscl.Client, db *sqlx.DB, userSuffix string) *App {
return &App{
app := &App{
client: client,
db: db,
userSuffix: userSuffix,
Router: echo.New(),
ReportOverages: true,
}

app.Router.HTTPErrorHandler = func(err error, c echo.Context) {
code := http.StatusInternalServerError
var body interface{}

switch err := err.(type) {
case common.ErrorResponse:
code = http.StatusBadRequest
body = err
case *common.ErrorResponse:
code = http.StatusBadRequest
body = err
case *echo.HTTPError:
echoErr := err
code = echoErr.Code
body = common.NewErrorResponse(err)
default:
body = common.NewErrorResponse(err)
}

c.JSON(code, body) // nolint:errcheck
}

app.Router.GET("/", app.GreetingHTTPHandler).Name = "greeting"
app.Router.GET("/summary/:user", app.GetUserSummaryHTTPHandler)
app.Router.PUT("/addons", app.AddAddonHTTPHandler)
app.Router.GET("/addons", app.ListAddonsHTTPHandler)
app.Router.POST("/addons/:uuid", app.UpdateAddonHTTPHandler)
app.Router.DELETE("/addons/:uuid", app.DeleteAddonHTTPHandler)
app.Router.GET("/subscriptions/:uuid/addons", app.ListSubscriptionAddonsHTTPHandler)
app.Router.GET("/subscriptions/:sub_uuid/addons/:addon_uuid", app.GetSubscriptionAddonHTTPHandler)
app.Router.PUT("/subscriptions/:sub_uuid/addons/:addon_uuid", app.AddSubscriptionAddonHTTPHandler)
app.Router.DELETE("/subscriptions/:sub_uuid/addons/:addon_uuid", app.DeleteSubscriptionAddonHTTPHandler)
app.Router.POST("/subscriptions/:sub_uuid/addons/:addon_uuid", app.UpdateSubscriptionAddonHTTPHandler)
app.Router.PUT("/users", app.AddUserHTTPHandler)
app.Router.GET("/users/:username/updates", app.GetUserUpdatesHTTPHandler)
app.Router.PUT("/user/:username/updates", app.AddUserUpdateHTTPHandler)
app.Router.GET("/users/:username/overages", app.GetUserOveragesHTTPHandler)
app.Router.GET("/users/:username/overages/:resource_name", app.CheckUserOveragesHTTPHandler)
app.Router.GET("/users/:username/usages", app.GetUsagesHTTPHandler)
app.Router.PUT("/users/:username/usages", app.AddUsageHTTPHandler)
app.Router.GET("/plans", app.ListPlansHTTPHandler)
app.Router.PUT("/plans", app.AddPlanHTTPHandler)
app.Router.GET("/plans/:plan_id", app.GetPlanHTTPHandler)
app.Router.POST("/quotas/defaults", app.UpsertQuotaDefaultsHTTPHandler)
app.Router.PUT("/quotas", app.AddQuotaHTTPHandler)

return app
}

func (a *App) FixUsername(username string) (string, error) {
Expand Down Expand Up @@ -84,28 +137,17 @@ func (a *App) validateUpdate(request *qms.AddUpdateRequest) (string, error) {
return username, nil
}

func (a *App) GetUserUpdatesHandler(subject, reply string, request *qms.UpdateListRequest) {
var err error
func (a *App) GreetingHTTPHandler(ctx echo.Context) error {
return ctx.String(http.StatusOK, "Hello from subscriptions.")
}

log := log.WithFields(logrus.Fields{"context": "get all user updates over nats"})
func (a *App) getUserUpdates(ctx context.Context, request *qms.UpdateListRequest) *qms.UpdateListResponse {
response := pbinit.NewQMSUpdateListResponse()
ctx, span := pbinit.InitQMSUpdateListRequest(request, subject)
defer span.End()

// Avoid duplicating a lot of error reporting code.
sendError := func(ctx context.Context, response *qms.UpdateListResponse, err error) {
log.Error(err)
response.Error = errors.NatsError(ctx, err)
if err = a.client.Respond(ctx, reply, response); err != nil {
log.Error(err)
}
}

username, err := a.FixUsername(request.User.Username)
if err != nil {
sendError(ctx, response, err)
return

response.Error = errors.NatsError(ctx, err)
return response
}

log = log.WithFields(logrus.Fields{"user": username})
Expand All @@ -114,8 +156,8 @@ func (a *App) GetUserUpdatesHandler(subject, reply string, request *qms.UpdateLi

mUpdates, err := d.UserUpdates(ctx, username)
if err != nil {
sendError(ctx, response, err)
return
response.Error = errors.NatsError(ctx, err)
return response
}

for _, mu := range mUpdates {
Expand All @@ -140,39 +182,61 @@ func (a *App) GetUserUpdatesHandler(subject, reply string, request *qms.UpdateLi
})
}

return response
}

func (a *App) GetUserUpdatesHandler(subject, reply string, request *qms.UpdateListRequest) {
var err error

log := log.WithFields(logrus.Fields{"context": "get all user updates over nats"})

ctx, span := pbinit.InitQMSUpdateListRequest(request, subject)
defer span.End()

response := a.getUserUpdates(ctx, request)

if response.Error != nil {
log.Error(response.Error.Message)
}

if err = a.client.Respond(ctx, reply, response); err != nil {
log.Error(err)
}
}

func (a *App) AddUserUpdateHandler(subject, reply string, request *qms.AddUpdateRequest) {
func (a *App) GetUserUpdatesHTTPHandler(c echo.Context) error {
ctx := c.Request().Context()

request := &qms.UpdateListRequest{
User: &qms.QMSUser{
Username: c.Param("username"),
},
}

response := a.getUserUpdates(ctx, request)

if response.Error != nil {
return c.JSON(int(response.Error.StatusCode), response)
}

return c.JSON(http.StatusOK, response)
}

func (a *App) addUserUpdate(ctx context.Context, request *qms.AddUpdateRequest) *qms.AddUpdateResponse {
var (
err error
userID, resourceTypeID, operationID string
update *db.Update
)

// Initialize the response.
log := log.WithFields(logrus.Fields{"context": "add a user update over nats"})
response := pbinit.NewQMSAddUpdateResponse()
ctx, span := pbinit.InitQMSAddUpdateRequest(request, subject)
defer span.End()

// Avoid duplicating a lot of error reporting code.
sendError := func(ctx context.Context, response *qms.AddUpdateResponse, err error) {
log.Error(err)
response.Error = errors.NatsError(ctx, err)
if err = a.client.Respond(ctx, reply, response); err != nil {
log.Error(err)
}
}

d := db.New(a.db)

username, err := a.validateUpdate(request)
if err != nil {
sendError(ctx, response, err)
return
response.Error = errors.NatsError(ctx, err)
return response
}

log = log.WithFields(logrus.Fields{"user": username})
Expand All @@ -182,8 +246,8 @@ func (a *App) AddUserUpdateHandler(subject, reply string, request *qms.AddUpdate
log.Infof("getting user ID for %s", username)
user, err := d.EnsureUser(ctx, username)
if err != nil {
sendError(ctx, response, err)
return
response.Error = errors.NatsError(ctx, err)
return response
}
userID = user.ID
log.Infof("user ID for %s is %s", username, userID)
Expand All @@ -201,8 +265,8 @@ func (a *App) AddUserUpdateHandler(subject, reply string, request *qms.AddUpdate
request.Update.ResourceType.Unit,
)
if err != nil {
sendError(ctx, response, err)
return
response.Error = errors.NatsError(ctx, err)
return response
}
log.Infof("resource type id for resource %s is '%s'", request.Update.ResourceType.Name, resourceTypeID)
} else {
Expand All @@ -218,8 +282,8 @@ func (a *App) AddUserUpdateHandler(subject, reply string, request *qms.AddUpdate
request.Update.Operation.Name,
)
if err != nil {
sendError(ctx, response, err)
return
response.Error = errors.NatsError(ctx, err)
return response
}
log.Infof("operation ID for %s is %s", request.Update.Operation.Name, operationID)
} else {
Expand Down Expand Up @@ -251,32 +315,32 @@ func (a *App) AddUserUpdateHandler(subject, reply string, request *qms.AddUpdate
log.Info("adding update to the database")
_, err = d.AddUserUpdate(ctx, update)
if err != nil {
sendError(ctx, response, err)
return
response.Error = errors.NatsError(ctx, err)
return response
}
log.Info("done adding update to the database")

switch update.ValueType {
case db.UsagesTrackedMetric:
log.Info("processing update for usage")
if err = d.ProcessUpdateForUsage(ctx, update); err != nil {
sendError(ctx, response, err)
return
response.Error = errors.NatsError(ctx, err)
return response
}
log.Info("after processing update for usage")

case db.QuotasTrackedMetric:
log.Info("processing update for quota")
if err = d.ProcessUpdateForQuota(ctx, update); err != nil {
sendError(ctx, response, err)
return
response.Error = errors.NatsError(ctx, err)
return response
}
log.Info("after processing update for quota")

default:
err = fmt.Errorf("unknown value type in update: %s", update.ValueType)
sendError(ctx, response, err)
return
response.Error = errors.NatsError(ctx, err)
return response
}

// Set up the object for the response.
Expand All @@ -301,8 +365,51 @@ func (a *App) AddUserUpdateHandler(subject, reply string, request *qms.AddUpdate
}
}

return response
}

func (a *App) AddUserUpdateHandler(subject, reply string, request *qms.AddUpdateRequest) {
var err error

// Initialize the response.
log := log.WithFields(logrus.Fields{"context": "add a user update over nats"})

ctx, span := pbinit.InitQMSAddUpdateRequest(request, subject)
defer span.End()

response := a.addUserUpdate(ctx, request)

if response.Error != nil {
log.Error(response.Error.Message)
}

// Send the response to the caller
if err = a.client.Respond(ctx, reply, response); err != nil {
log.Error(err)
}
}

func (a *App) AddUserUpdateHTTPHandler(c echo.Context) error {
var (
err error
request qms.AddUpdateRequest
)

ctx := c.Request().Context()

if err = c.Bind(&request); err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{
"message": "bad request",
})
}

request.Update.User.Username = c.Param("username")

response := a.addUserUpdate(ctx, &request)

if response.Error != nil {
return c.JSON(int(response.Error.StatusCode), response)
}

return c.JSON(http.StatusOK, response)
}
Loading

0 comments on commit 3528661

Please sign in to comment.