Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a unified query response subject #96

Merged
merged 3 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (a *ChangeMetadata) GetUUIDParsed() *uuid.UUID {
return &u
}

func (a *ChangeProperties) GetAffectedItemsBookmarkUUIDParsed() *uuid.UUID {
u, err := uuid.FromBytes(a.GetAffectedItemsBookmarkUUID())
func (a *ChangeProperties) GetChangingItemsBookmarkUUIDParsed() *uuid.UUID {
u, err := uuid.FromBytes(a.GetChangingItemsBookmarkUUID())
if err != nil {
return nil
}
Expand Down
43 changes: 37 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package sdp
import (
"context"
"fmt"
reflect "reflect"

"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -39,7 +42,23 @@ type EncodedConnectionImpl struct {
// assert interface implementation
var _ EncodedConnection = (*EncodedConnectionImpl)(nil)

func recordMessage(ctx context.Context, name, subj, typ, msg string) {
log.WithContext(ctx).WithFields(log.Fields{
"msg type": typ,
"subj": subj,
"msg": msg,
}).Trace(name)
span := trace.SpanFromContext(ctx)
span.AddEvent(name, trace.WithAttributes(
attribute.String("om.sdp.subject", subj),
attribute.String("om.sdp.message", msg),
))
}

func (ec *EncodedConnectionImpl) Publish(ctx context.Context, subj string, m proto.Message) error {
// TODO: protojson.Format is pretty expensive, replace with summarized data
recordMessage(ctx, "Publish", subj, fmt.Sprint(reflect.TypeOf(m)), protojson.Format(m))

data, err := proto.Marshal(m)
if err != nil {
return err
Expand All @@ -49,10 +68,13 @@ func (ec *EncodedConnectionImpl) Publish(ctx context.Context, subj string, m pro
Subject: subj,
Data: data,
}
return ec.PublishMsg(ctx, msg)
InjectOtelTraceContext(ctx, msg)
return ec.Conn.PublishMsg(msg)
}

func (ec *EncodedConnectionImpl) PublishMsg(ctx context.Context, msg *nats.Msg) error {
recordMessage(ctx, "Publish", msg.Subject, "[]byte", "binary")

InjectOtelTraceContext(ctx, msg)
return ec.Conn.PublishMsg(msg)
}
Expand All @@ -68,8 +90,16 @@ func (ec *EncodedConnectionImpl) QueueSubscribe(subj, queue string, cb nats.MsgH
}

func (ec *EncodedConnectionImpl) RequestMsg(ctx context.Context, msg *nats.Msg) (*nats.Msg, error) {
recordMessage(ctx, "RequestMsg", msg.Subject, "[]byte", "binary")
InjectOtelTraceContext(ctx, msg)
return ec.Conn.RequestMsgWithContext(ctx, msg)
reply, err := ec.Conn.RequestMsgWithContext(ctx, msg)

if err != nil {
recordMessage(ctx, "RequestMsg Error", msg.Subject, fmt.Sprint(reflect.TypeOf(err)), err.Error())
} else {
recordMessage(ctx, "RequestMsg Reply", msg.Subject, "[]byte", "binary")
}
return reply, err
}

func (ec *EncodedConnectionImpl) Drain() error {
Expand Down Expand Up @@ -104,6 +134,7 @@ func (ec *EncodedConnectionImpl) Drop() {
func Unmarshal(ctx context.Context, b []byte, m proto.Message) error {
err := proto.Unmarshal(b, m)
if err != nil {
recordMessage(ctx, "Unmarshal err", "unknown", fmt.Sprint(reflect.TypeOf(err)), err.Error())
log.WithContext(ctx).Errorf("Error parsing message: %v", err)
trace.SpanFromContext(ctx).SetStatus(codes.Error, fmt.Sprintf("Error parsing message: %v", err))
return err
Expand All @@ -114,20 +145,20 @@ func Unmarshal(ctx context.Context, b []byte, m proto.Message) error {
// some remaining unknown fields. If there are some, fail.
if unk := m.ProtoReflect().GetUnknown(); unk != nil {
err = fmt.Errorf("unmarshal to %T had unknown fields, likely a type mismatch. Unknowns: %v", m, unk)
recordMessage(ctx, "Unmarshal unknown", "unknown", fmt.Sprint(reflect.TypeOf(m)), protojson.Format(m))
log.WithContext(ctx).Errorf("Error parsing message: %v", err)
trace.SpanFromContext(ctx).SetStatus(codes.Error, fmt.Sprintf("Error parsing message: %v", err))
return err
}

recordMessage(ctx, "Unmarshal", "unknown", fmt.Sprint(reflect.TypeOf(m)), protojson.Format(m))
return nil
}

//go:generate go run genhandler.go Item

//go:generate go run genhandler.go Query
//go:generate go run genhandler.go QueryError
//go:generate go run genhandler.go QueryResponse
//go:generate go run genhandler.go CancelQuery
//go:generate go run genhandler.go UndoQuery

//go:generate go run genhandler.go GatewayResponse
//go:generate go run genhandler.go Response
//go:generate go run genhandler.go ReverseLinksRequest
4 changes: 1 addition & 3 deletions encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ var metadata = Metadata{
RecursionBehaviour: &Query_RecursionBehaviour{
LinkDepth: 12,
},
Scope: "testScope",
ItemSubject: "items",
ResponseSubject: "responses",
Scope: "testScope",
},
Timestamp: timestamppb.Now(),
SourceDuration: &durationpb.Duration{
Expand Down
55 changes: 0 additions & 55 deletions handler_queryerror.go

This file was deleted.

14 changes: 7 additions & 7 deletions handler_item.go → handler_queryresponse.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 0 additions & 55 deletions handler_response.go

This file was deleted.

7 changes: 5 additions & 2 deletions items.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ func (qrb *Query_RecursionBehaviour) Copy(dest *Query_RecursionBehaviour) {
dest.FollowOnlyBlastPropagation = qrb.FollowOnlyBlastPropagation
}

// Subject returns a NATS subject for all traffic relating to this query
func (q *Query) Subject() string {
return fmt.Sprintf("query.%v", q.ParseUuid())
}

// Copy copies all information from one Query pointer to another
func (q *Query) Copy(dest *Query) {
dest.Type = q.Type
Expand All @@ -293,8 +298,6 @@ func (q *Query) Copy(dest *Query) {
q.RecursionBehaviour.Copy(dest.RecursionBehaviour)
}
dest.Scope = q.Scope
dest.ItemSubject = q.ItemSubject
dest.ResponseSubject = q.ResponseSubject
dest.IgnoreCache = q.IgnoreCache
dest.UUID = q.UUID

Expand Down
Loading