diff --git a/connection.go b/connection.go index d6cf928..2aba3c0 100644 --- a/connection.go +++ b/connection.go @@ -3,11 +3,14 @@ package sdp import ( "context" "fmt" + reflect "reflect" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) @@ -39,7 +42,23 @@ type EncodedConnectionImpl struct { // assert interface implementation var _ EncodedConnection = (*EncodedConnectionImpl)(nil) +func recordMessage(ctx context.Context, name, subj, typ, msg string) { + log.WithContext(ctx).WithFields(log.Fields{ + "msg type": typ, + "subj": subj, + "msg": msg, + }).Trace(name) + span := trace.SpanFromContext(ctx) + span.AddEvent(name, trace.WithAttributes( + attribute.String("om.sdp.subject", subj), + attribute.String("om.sdp.message", msg), + )) +} + func (ec *EncodedConnectionImpl) Publish(ctx context.Context, subj string, m proto.Message) error { + // TODO: protojson.Format is pretty expensive, replace with summarized data + recordMessage(ctx, "Publish", subj, fmt.Sprint(reflect.TypeOf(m)), protojson.Format(m)) + data, err := proto.Marshal(m) if err != nil { return err @@ -49,10 +68,13 @@ func (ec *EncodedConnectionImpl) Publish(ctx context.Context, subj string, m pro Subject: subj, Data: data, } - return ec.PublishMsg(ctx, msg) + InjectOtelTraceContext(ctx, msg) + return ec.Conn.PublishMsg(msg) } func (ec *EncodedConnectionImpl) PublishMsg(ctx context.Context, msg *nats.Msg) error { + recordMessage(ctx, "Publish", msg.Subject, "[]byte", "binary") + InjectOtelTraceContext(ctx, msg) return ec.Conn.PublishMsg(msg) } @@ -68,8 +90,16 @@ func (ec *EncodedConnectionImpl) QueueSubscribe(subj, queue string, cb nats.MsgH } func (ec *EncodedConnectionImpl) RequestMsg(ctx context.Context, msg *nats.Msg) (*nats.Msg, error) { + recordMessage(ctx, "RequestMsg", msg.Subject, "[]byte", "binary") InjectOtelTraceContext(ctx, msg) - return ec.Conn.RequestMsgWithContext(ctx, msg) + reply, err := ec.Conn.RequestMsgWithContext(ctx, msg) + + if err != nil { + recordMessage(ctx, "RequestMsg Error", msg.Subject, fmt.Sprint(reflect.TypeOf(err)), err.Error()) + } else { + recordMessage(ctx, "RequestMsg Reply", msg.Subject, "[]byte", "binary") + } + return reply, err } func (ec *EncodedConnectionImpl) Drain() error { @@ -104,6 +134,7 @@ func (ec *EncodedConnectionImpl) Drop() { func Unmarshal(ctx context.Context, b []byte, m proto.Message) error { err := proto.Unmarshal(b, m) if err != nil { + recordMessage(ctx, "Unmarshal err", "unknown", fmt.Sprint(reflect.TypeOf(err)), err.Error()) log.WithContext(ctx).Errorf("Error parsing message: %v", err) trace.SpanFromContext(ctx).SetStatus(codes.Error, fmt.Sprintf("Error parsing message: %v", err)) return err @@ -114,16 +145,20 @@ func Unmarshal(ctx context.Context, b []byte, m proto.Message) error { // some remaining unknown fields. If there are some, fail. if unk := m.ProtoReflect().GetUnknown(); unk != nil { err = fmt.Errorf("unmarshal to %T had unknown fields, likely a type mismatch. Unknowns: %v", m, unk) + recordMessage(ctx, "Unmarshal unknown", "unknown", fmt.Sprint(reflect.TypeOf(m)), protojson.Format(m)) log.WithContext(ctx).Errorf("Error parsing message: %v", err) trace.SpanFromContext(ctx).SetStatus(codes.Error, fmt.Sprintf("Error parsing message: %v", err)) return err } + + recordMessage(ctx, "Unmarshal", "unknown", fmt.Sprint(reflect.TypeOf(m)), protojson.Format(m)) return nil } //go:generate go run genhandler.go Item //go:generate go run genhandler.go Query +//go:generate go run genhandler.go QueryResponse //go:generate go run genhandler.go QueryError //go:generate go run genhandler.go CancelQuery //go:generate go run genhandler.go UndoQuery diff --git a/encoder_test.go b/encoder_test.go index 2f2fc71..05b579e 100644 --- a/encoder_test.go +++ b/encoder_test.go @@ -46,9 +46,7 @@ var metadata = Metadata{ RecursionBehaviour: &Query_RecursionBehaviour{ LinkDepth: 12, }, - Scope: "testScope", - ItemSubject: "items", - ResponseSubject: "responses", + Scope: "testScope", }, Timestamp: timestamppb.Now(), SourceDuration: &durationpb.Duration{ diff --git a/handler_queryresponse.go b/handler_queryresponse.go new file mode 100644 index 0000000..9756929 --- /dev/null +++ b/handler_queryresponse.go @@ -0,0 +1,55 @@ +// Code generated by "genhandler QueryResponse"; DO NOT EDIT + +package sdp + +import ( + "context" + + "github.com/nats-io/nats.go" + "go.opentelemetry.io/otel/trace" +) + +func NewQueryResponseHandler(spanName string, h func(ctx context.Context, i *QueryResponse), spanOpts ...trace.SpanStartOption) nats.MsgHandler { + return NewOtelExtractingHandler( + spanName, + func(ctx context.Context, m *nats.Msg) { + var i QueryResponse + err := Unmarshal(ctx, m.Data, &i) + if err != nil { + return + } + h(ctx, &i) + }, + tracer, + ) +} + +func NewRawQueryResponseHandler(spanName string, h func(ctx context.Context, m *nats.Msg, i *QueryResponse), spanOpts ...trace.SpanStartOption) nats.MsgHandler { + return NewOtelExtractingHandler( + spanName, + func(ctx context.Context, m *nats.Msg) { + var i QueryResponse + err := Unmarshal(ctx, m.Data, &i) + if err != nil { + return + } + h(ctx, m, &i) + }, + tracer, + ) +} + +func NewAsyncRawQueryResponseHandler(spanName string, h func(ctx context.Context, m *nats.Msg, i *QueryResponse), spanOpts ...trace.SpanStartOption) nats.MsgHandler { + return NewAsyncOtelExtractingHandler( + spanName, + func(ctx context.Context, m *nats.Msg) { + var i QueryResponse + err := Unmarshal(ctx, m.Data, &i) + if err != nil { + return + } + h(ctx, m, &i) + }, + tracer, + ) +} diff --git a/items.go b/items.go index e8164ce..c4b4d06 100644 --- a/items.go +++ b/items.go @@ -283,6 +283,11 @@ func (qrb *Query_RecursionBehaviour) Copy(dest *Query_RecursionBehaviour) { dest.FollowOnlyBlastPropagation = qrb.FollowOnlyBlastPropagation } +// Subject returns a NATS subject for all traffic relating to this query +func (q *Query) Subject() string { + return fmt.Sprintf("query.%v", q.ParseUuid()) +} + // Copy copies all information from one Query pointer to another func (q *Query) Copy(dest *Query) { dest.Type = q.Type @@ -293,8 +298,6 @@ func (q *Query) Copy(dest *Query) { q.RecursionBehaviour.Copy(dest.RecursionBehaviour) } dest.Scope = q.Scope - dest.ItemSubject = q.ItemSubject - dest.ResponseSubject = q.ResponseSubject dest.IgnoreCache = q.IgnoreCache dest.UUID = q.UUID diff --git a/progress.go b/progress.go index 6d3dfa6..f4c73b3 100644 --- a/progress.go +++ b/progress.go @@ -76,7 +76,7 @@ func (rs *ResponseSender) Start(ctx context.Context, ec EncodedConnection, respo rs.connection.Publish( ctx, rs.ResponseSubject, - &resp, + &QueryResponse{ResponseType: &QueryResponse_Response{Response: &resp}}, ) } @@ -104,7 +104,7 @@ func (rs *ResponseSender) Start(ctx context.Context, ec EncodedConnection, respo ec.Publish( ctx, rs.ResponseSubject, - r, + &QueryResponse{ResponseType: &QueryResponse_Response{Response: r}}, ) } return @@ -118,7 +118,7 @@ func (rs *ResponseSender) Start(ctx context.Context, ec EncodedConnection, respo ec.Publish( ctx, rs.ResponseSubject, - r, + &QueryResponse{ResponseType: &QueryResponse_Response{Response: r}}, ) } } @@ -222,7 +222,7 @@ func (re *Responder) LastStateTime() time.Time { return re.lastStateTime } -// QueryProgress represents the status of a request +// QueryProgress represents the status of a query type QueryProgress struct { // How long to wait after `MarkStarted()` has been called to get at least // one responder, if there are no responders in this time, the request will @@ -251,10 +251,7 @@ type QueryProgress struct { cancelled bool subMutex sync.Mutex - // NATS subscriptions - itemSub *nats.Subscription - responseSub *nats.Subscription - errorSub *nats.Subscription + querySub *nats.Subscription // Counters for how many things we have sent over the channels. This is // required to make sure that we aren't closing channels that have pending @@ -309,19 +306,6 @@ func (qp *QueryProgress) Start(ctx context.Context, ec EncodedConnection, itemCh qp.requestCtx = ctx - // Populate inboxes if they aren't already - if qp.Query.ItemSubject == "" { - qp.Query.ItemSubject = fmt.Sprintf("return.item.%v", nats.NewInbox()) - } - - if qp.Query.ResponseSubject == "" { - qp.Query.ResponseSubject = fmt.Sprintf("return.response.%v", nats.NewInbox()) - } - - if qp.Query.ErrorSubject == "" { - qp.Query.ErrorSubject = fmt.Sprintf("return.error.%v", nats.NewInbox()) - } - if len(qp.Query.UUID) == 0 { u := uuid.New() qp.Query.UUID = u[:] @@ -350,7 +334,7 @@ func (qp *QueryProgress) Start(ctx context.Context, ec EncodedConnection, itemCh var err error - qp.itemSub, err = ec.Subscribe(qp.Query.ItemSubject, NewItemHandler("Request.ItemSubject", func(ctx context.Context, item *Item) { + itemHandler := func(ctx context.Context, item *Item) { defer atomic.AddInt64(qp.itemsProcessed, 1) span := trace.SpanFromContext(ctx) @@ -390,13 +374,9 @@ func (qp *QueryProgress) Start(ctx context.Context, ec EncodedConnection, itemCh qp.itemChan <- item } - })) - - if err != nil { - return err } - qp.errorSub, err = ec.Subscribe(qp.Query.ErrorSubject, NewQueryErrorHandler("Request.ErrorSubject", func(ctx context.Context, err *QueryError) { + errorHandler := func(ctx context.Context, err *QueryError) { defer atomic.AddInt64(qp.errorsProcessed, 1) if err != nil { @@ -432,16 +412,24 @@ func (qp *QueryProgress) Start(ctx context.Context, ec EncodedConnection, itemCh qp.errorChan <- err } - })) - - if err != nil { - return err } - qp.responseSub, err = ec.Subscribe(qp.Query.ResponseSubject, NewResponseHandler("ProcessResponse", qp.ProcessResponse)) - + qp.querySub, err = ec.Subscribe(qp.Query.Subject(), NewQueryResponseHandler("QueryProgress", func(ctx context.Context, qr *QueryResponse) { + log.WithContext(ctx).WithFields(log.Fields{ + "response": qr, + }).Info("Received response") + switch qr.ResponseType.(type) { + case *QueryResponse_NewItem: + itemHandler(ctx, qr.GetNewItem()) + case *QueryResponse_Error: + errorHandler(ctx, qr.GetError()) + case *QueryResponse_Response: + qp.ProcessResponse(ctx, qr.GetResponse()) + default: + panic(fmt.Sprintf("Received unexpected QueryResponse: %v", qr)) + } + })) if err != nil { - qp.itemSub.Unsubscribe() return err } @@ -507,44 +495,7 @@ func (qp *QueryProgress) Drain() { } // Close the item and error subscriptions - unsubscribeGracefully(qp.itemSub) - unsubscribeGracefully(qp.errorSub) - - if qp.responseSub != nil { - // Drain the response connection to, but don't wait for callbacks to finish. - // this is because this code here is likely called as part of a callback and - // therefore would cause deadlock as it essentially waits for itself to - // finish - qp.responseSub.Unsubscribe() - } - - // This double-checks that all callbacks are *definitely* complete to avoid - // a situation where we close the channel with a goroutine still pending a - // send. This is rare due to the use of RWMutex on the channel, but still - // possible - var itemsDelivered int64 - var errorsDelivered int64 - var err error - - for { - itemsDelivered, err = qp.itemSub.Delivered() - - if err != nil { - break - } - - errorsDelivered, err = qp.errorSub.Delivered() - - if err != nil { - break - } - - if (itemsDelivered == *qp.itemsProcessed) && (errorsDelivered == *qp.errorsProcessed) { - break - } - - time.Sleep(50 * time.Millisecond) - } + unsubscribeGracefully(qp.querySub) qp.chanMutex.Lock() defer qp.chanMutex.Unlock() @@ -923,14 +874,14 @@ func stallMonitor(context context.Context, timeout time.Duration, responder *Res // unsubscribeGracefully Closes a NATS subscription gracefully, this includes // draining, unsubscribing and ensuring that all callbacks are complete -func unsubscribeGracefully(c *nats.Subscription) error { - if c != nil { +func unsubscribeGracefully(s *nats.Subscription) error { + if s != nil { // Drain NATS connections - err := c.Drain() + err := s.Drain() if err != nil { // If that fails, fall back to an unsubscribe - err = c.Unsubscribe() + err = s.Unsubscribe() if err != nil { return err @@ -938,15 +889,6 @@ func unsubscribeGracefully(c *nats.Subscription) error { } // Wait for all items to finish processing, including all callbacks - for { - messages, _, _ := c.Pending() - - if messages > 0 { - time.Sleep(50 * time.Millisecond) - } else { - break - } - } } return nil diff --git a/progress_test.go b/progress_test.go index 1e95139..b3a372e 100644 --- a/progress_test.go +++ b/progress_test.go @@ -61,9 +61,13 @@ func TestResponseSenderDone(t *testing.T) { finalMessage := tp.Messages[len(tp.Messages)-1] tp.messagesMutex.Unlock() - if finalResponse, ok := finalMessage.V.(*Response); ok { - if finalResponse.State != ResponderState_COMPLETE { - t.Errorf("Expected final message state to be COMPLETE (1), found: %v", finalResponse.State) + if queryResponse, ok := finalMessage.V.(*QueryResponse); ok { + if finalResponse, ok := queryResponse.ResponseType.(*QueryResponse_Response); ok { + if finalResponse.Response.State != ResponderState_COMPLETE { + t.Errorf("Expected final message state to be COMPLETE (1), found: %v", finalResponse.Response.State) + } + } else { + t.Errorf("Final QueryResponse did not contain a valid Response object. Message content type %T", queryResponse.ResponseType) } } else { t.Errorf("Final message did not contain a valid response object. Message content type %T", finalMessage.V) @@ -102,9 +106,13 @@ func TestResponseSenderError(t *testing.T) { finalMessage := tp.Messages[len(tp.Messages)-1] tp.messagesMutex.Unlock() - if finalResponse, ok := finalMessage.V.(*Response); ok { - if finalResponse.State != ResponderState_ERROR { - t.Errorf("Expected final message state to be ERROR, found: %v", finalResponse.State) + if queryResponse, ok := finalMessage.V.(*QueryResponse); ok { + if finalResponse, ok := queryResponse.ResponseType.(*QueryResponse_Response); ok { + if finalResponse.Response.State != ResponderState_ERROR { + t.Errorf("Expected final message state to be ERROR, found: %v", finalResponse.Response.State) + } + } else { + t.Errorf("Final QueryResponse did not contain a valid Response object. Message content type %T", queryResponse.ResponseType) } } else { t.Errorf("Final message did not contain a valid response object. Message content type %T", finalMessage.V) @@ -143,9 +151,13 @@ func TestResponseSenderCancel(t *testing.T) { finalMessage := tp.Messages[len(tp.Messages)-1] tp.messagesMutex.Unlock() - if finalResponse, ok := finalMessage.V.(*Response); ok { - if finalResponse.State != ResponderState_CANCELLED { - t.Errorf("Expected final message state to be CANCELLED, found: %v", finalResponse.State) + if queryResponse, ok := finalMessage.V.(*QueryResponse); ok { + if finalResponse, ok := queryResponse.ResponseType.(*QueryResponse_Response); ok { + if finalResponse.Response.State != ResponderState_CANCELLED { + t.Errorf("Expected final message state to be CANCELLED, found: %v", finalResponse.Response.State) + } + } else { + t.Errorf("Final QueryResponse did not contain a valid Response object. Message content type %T", queryResponse.ResponseType) } } else { t.Errorf("Final message did not contain a valid response object. Message content type %T", finalMessage.V) @@ -613,7 +625,6 @@ func TestStart(t *testing.T) { errs := make(chan *QueryError, 128) err := rp.Start(context.Background(), &conn, items, errs) - if err != nil { t.Fatal(err) } @@ -622,8 +633,17 @@ func TestStart(t *testing.T) { t.Errorf("expected 1 message to be sent, got %v", len(conn.Messages)) } + response := QueryResponse{ + ResponseType: &QueryResponse_NewItem{ + NewItem: &item, + }, + } + // Test that the handlers work - conn.Publish(context.Background(), query.ItemSubject, &item) + err = conn.Publish(context.Background(), query.Subject(), &response) + if err != nil { + t.Fatal(err) + } receivedItem := <-items @@ -697,12 +717,10 @@ func TestExecute(t *testing.T) { RecursionBehaviour: &Query_RecursionBehaviour{ LinkDepth: 0, }, - Scope: "global", - IgnoreCache: false, - UUID: u[:], - Timeout: durationpb.New(10 * time.Second), - ItemSubject: "items", - ResponseSubject: "responses", + Scope: "global", + IgnoreCache: false, + UUID: u[:], + Timeout: durationpb.New(10 * time.Second), } rp := NewQueryProgress(&q) @@ -724,12 +742,10 @@ func TestExecute(t *testing.T) { RecursionBehaviour: &Query_RecursionBehaviour{ LinkDepth: 0, }, - Scope: "global", - IgnoreCache: false, - UUID: u[:], - Timeout: durationpb.New(10 * time.Second), - ItemSubject: "items1", - ResponseSubject: "responses1", + Scope: "global", + IgnoreCache: false, + UUID: u[:], + Timeout: durationpb.New(10 * time.Second), } rp := NewQueryProgress(&q) @@ -740,30 +756,46 @@ func TestExecute(t *testing.T) { time.Sleep(delay) - conn.Publish(context.Background(), q.ResponseSubject, &Response{ - Responder: "test", - State: ResponderState_WORKING, - UUID: q.UUID, - NextUpdateIn: &durationpb.Duration{ - Seconds: 10, - Nanos: 0, + conn.Publish(context.Background(), q.Subject(), &QueryResponse{ + ResponseType: &QueryResponse_Response{ + Response: &Response{ + Responder: "test", + State: ResponderState_WORKING, + UUID: q.UUID, + NextUpdateIn: &durationpb.Duration{ + Seconds: 10, + Nanos: 0, + }, + }, }, }) time.Sleep(delay) - conn.Publish(context.Background(), q.ItemSubject, &item) + conn.Publish(context.Background(), q.Subject(), &QueryResponse{ + ResponseType: &QueryResponse_NewItem{ + NewItem: &item, + }, + }) time.Sleep(delay) - conn.Publish(context.Background(), q.ItemSubject, &item) + conn.Publish(context.Background(), q.Subject(), &QueryResponse{ + ResponseType: &QueryResponse_NewItem{ + NewItem: &item, + }, + }) time.Sleep(delay) - conn.Publish(context.Background(), q.ResponseSubject, &Response{ - Responder: "test", - State: ResponderState_COMPLETE, - UUID: q.UUID, + conn.Publish(context.Background(), q.Subject(), &QueryResponse{ + ResponseType: &QueryResponse_Response{ + Response: &Response{ + Responder: "test", + State: ResponderState_COMPLETE, + UUID: q.UUID, + }, + }, }) }() @@ -814,7 +846,7 @@ func TestRealNats(t *testing.T) { time.Sleep(delay) - enc.Publish(ctx, q.ResponseSubject, &Response{ + enc.Publish(ctx, q.Subject(), &QueryResponse{ResponseType: &QueryResponse_Response{Response: &Response{ Responder: "test", State: ResponderState_WORKING, UUID: q.UUID, @@ -822,19 +854,19 @@ func TestRealNats(t *testing.T) { Seconds: 10, Nanos: 0, }, - }) + }}}) time.Sleep(delay) - enc.Publish(ctx, q.ItemSubject, &item) + enc.Publish(ctx, q.Subject(), &QueryResponse{ResponseType: &QueryResponse_NewItem{NewItem: &item}}) - enc.Publish(ctx, q.ItemSubject, &item) + enc.Publish(ctx, q.Subject(), &QueryResponse{ResponseType: &QueryResponse_NewItem{NewItem: &item}}) - enc.Publish(ctx, q.ResponseSubject, &Response{ + enc.Publish(ctx, q.Subject(), &QueryResponse{ResponseType: &QueryResponse_Response{Response: &Response{ Responder: "test", State: ResponderState_COMPLETE, UUID: q.UUID, - }) + }}}) })) ready <- true }() @@ -878,7 +910,7 @@ func TestFastFinisher(t *testing.T) { } // Respond immediately saying we're started - err = conn.Publish(context.Background(), q.ResponseSubject, &Response{ + err = conn.Publish(context.Background(), q.Subject(), &QueryResponse{ResponseType: &QueryResponse_Response{Response: &Response{ Responder: "fast", State: ResponderState_WORKING, UUID: q.UUID, @@ -886,7 +918,7 @@ func TestFastFinisher(t *testing.T) { Seconds: 1, Nanos: 0, }, - }) + }}}) if err != nil { t.Fatal(err) } @@ -894,17 +926,17 @@ func TestFastFinisher(t *testing.T) { time.Sleep(100 * time.Millisecond) // Send an item - err = conn.Publish(context.Background(), q.ItemSubject, newItem()) + err = conn.Publish(context.Background(), q.Subject(), &QueryResponse{ResponseType: &QueryResponse_NewItem{NewItem: newItem()}}) if err != nil { t.Fatal(err) } // Send a complete message - err = conn.Publish(context.Background(), q.ResponseSubject, &Response{ + err = conn.Publish(context.Background(), q.Subject(), &QueryResponse{ResponseType: &QueryResponse_Response{Response: &Response{ Responder: "fast", State: ResponderState_COMPLETE, UUID: q.UUID, - }) + }}}) if err != nil { t.Fatal(err) } @@ -924,7 +956,7 @@ func TestFastFinisher(t *testing.T) { // Wait 250ms before starting time.Sleep(250 * time.Millisecond) - err = conn.Publish(context.Background(), q.ResponseSubject, &Response{ + err = conn.Publish(context.Background(), q.Subject(), &QueryResponse{ResponseType: &QueryResponse_Response{Response: &Response{ Responder: "slow", State: ResponderState_WORKING, UUID: q.UUID, @@ -932,7 +964,7 @@ func TestFastFinisher(t *testing.T) { Seconds: 1, Nanos: 0, }, - }) + }}}) if err != nil { t.Fatal(err) } @@ -940,17 +972,17 @@ func TestFastFinisher(t *testing.T) { // Send an item item := newItem() item.Attributes.Set("name", "baz") - err = conn.Publish(context.Background(), q.ItemSubject, newItem()) + err = conn.Publish(context.Background(), q.Subject(), &QueryResponse{ResponseType: &QueryResponse_NewItem{NewItem: item}}) if err != nil { t.Fatal(err) } // Send a complete message - err = conn.Publish(context.Background(), q.ResponseSubject, &Response{ + err = conn.Publish(context.Background(), q.Subject(), &QueryResponse{ResponseType: &QueryResponse_Response{Response: &Response{ Responder: "slow", State: ResponderState_COMPLETE, UUID: q.UUID, - }) + }}}) if err != nil { t.Fatal(err) } diff --git a/validation_test.go b/validation_test.go index e7e699c..d0e4aa0 100644 --- a/validation_test.go +++ b/validation_test.go @@ -444,13 +444,10 @@ func newQuery() *Query { RecursionBehaviour: &Query_RecursionBehaviour{ LinkDepth: 1, }, - Scope: "global", - UUID: u[:], - Timeout: durationpb.New(time.Second), - IgnoreCache: false, - ItemSubject: "return.items.1", - ResponseSubject: "return.responses.1", - ErrorSubject: "return.errors.1", + Scope: "global", + UUID: u[:], + Timeout: durationpb.New(time.Second), + IgnoreCache: false, } } @@ -528,9 +525,7 @@ func newItem() *Item { RecursionBehaviour: &Query_RecursionBehaviour{ LinkDepth: 12, }, - Scope: "testScope", - ItemSubject: "items", - ResponseSubject: "responses", + Scope: "testScope", }, Timestamp: timestamppb.Now(), SourceDuration: &durationpb.Duration{