Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add time window for trace get method in span store interface #6242

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
8 changes: 5 additions & 3 deletions cmd/anonymizer/app/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (q *Query) QueryTrace(traceID string) ([]model.Span, error) {
if err != nil {
return nil, fmt.Errorf("failed to convert the provided trace id: %w", err)
}

stream, err := q.client.GetTrace(context.Background(), &api_v2.GetTraceRequest{
// TODO: add start time & end time
request := api_v2.GetTraceRequest{
TraceID: mTraceID,
})
}

stream, err := q.client.GetTrace(context.Background(), &request)
if err != nil {
return nil, unwrapNotFoundErr(err)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
)

var (
matchContext = mock.AnythingOfType("*context.valueCtx")
matchTraceID = mock.AnythingOfType("model.TraceID")
matchContext = mock.AnythingOfType("*context.valueCtx")
matchGetTraceParameters = mock.AnythingOfType("spanstore.GetTraceParameters")

mockInvalidTraceID = "xyz"
mockTraceID = model.NewTraceID(0, 123456)
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestQueryTrace(t *testing.T) {
defer q.Close()

t.Run("No error", func(t *testing.T) {
s.spanReader.On("GetTrace", matchContext, matchTraceID).Return(
s.spanReader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
mockTraceGRPC, nil).Once()

spans, err := q.QueryTrace(mockTraceID.String())
Expand All @@ -120,7 +120,7 @@ func TestQueryTrace(t *testing.T) {
})

t.Run("Trace not found", func(t *testing.T) {
s.spanReader.On("GetTrace", matchContext, matchTraceID).Return(
s.spanReader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
nil, spanstore.ErrTraceNotFound).Once()

spans, err := q.QueryTrace(mockTraceID.String())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
factoryMocks "github.com/jaegertracing/jaeger/storage/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type mockStorageExt struct {
Expand Down Expand Up @@ -152,7 +153,7 @@ func TestExporter(t *testing.T) {
spanReader, err := storageFactory.CreateSpanReader()
require.NoError(t, err)
requiredTraceID := model.NewTraceID(0, 1) // 00000000000000000000000000000001
requiredTrace, err := spanReader.GetTrace(ctx, requiredTraceID)
requiredTrace, err := spanReader.GetTrace(ctx, spanstore.GetTraceParameters{TraceID: requiredTraceID})
require.NoError(t, err)
assert.Equal(t, spanID.String(), requiredTrace.Spans[0].SpanID.String())

Expand Down
6 changes: 4 additions & 2 deletions cmd/jaeger/internal/integration/span_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ func unwrapNotFoundErr(err error) error {
return err
}

func (r *spanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
func (r *spanReader) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) {
stream, err := r.client.GetTrace(ctx, &api_v2.GetTraceRequest{
TraceID: traceID,
TraceID: query.TraceID,
StartTime: query.StartTime,
EndTime: query.EndTime,
})
if err != nil {
return nil, unwrapNotFoundErr(err)
Expand Down
15 changes: 8 additions & 7 deletions cmd/query/app/apiv3/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ func parseResponse(t *testing.T, body []byte, obj gogoproto.Message) {
require.NoError(t, gogojsonpb.Unmarshal(bytes.NewBuffer(body), obj))
}

func makeTestTrace() (*model.Trace, model.TraceID) {
func makeTestTrace() (*model.Trace, spanstore.GetTraceParameters) {
traceID := model.NewTraceID(150, 160)
query := spanstore.GetTraceParameters{TraceID: traceID}
return &model.Trace{
Spans: []*model.Span{
{
Expand All @@ -94,7 +95,7 @@ func makeTestTrace() (*model.Trace, model.TraceID) {
},
},
},
}, traceID
}, query
}

func runGatewayTests(
Expand Down Expand Up @@ -140,18 +141,18 @@ func (gw *testGateway) runGatewayGetOperations(t *testing.T) {
}

func (gw *testGateway) runGatewayGetTrace(t *testing.T) {
trace, traceID := makeTestTrace()
gw.reader.On("GetTrace", matchContext, traceID).Return(trace, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces/"+traceID.String(), traceID)
trace, query := makeTestTrace()
gw.reader.On("GetTrace", matchContext, query).Return(trace, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces/"+query.TraceID.String(), query.TraceID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could introduce a preliminary PR that adds timestamps args to the REST API calls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here: #6248

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that PR doesn't seem to compile, can you get it to green state?

Copy link
Contributor Author

@rim99 rim99 Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a PR contains changes of current PR, and the PR above: #6258

}

func (gw *testGateway) runGatewayFindTraces(t *testing.T) {
trace, traceID := makeTestTrace()
trace, query := makeTestTrace()
q, qp := mockFindQueries()
gw.reader.
On("FindTraces", matchContext, qp).
Return([]*model.Trace{trace}, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), traceID)
gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), query.TraceID)
}

func (gw *testGateway) getTracesAndVerify(t *testing.T, url string, expectedTraceID model.TraceID) {
Expand Down
7 changes: 6 additions & 1 deletion cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS
return fmt.Errorf("malform trace ID: %w", err)
}

trace, err := h.QueryService.GetTrace(stream.Context(), traceID)
query := spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: request.GetStartTime(),
EndTime: request.GetEndTime(),
}
trace, err := h.QueryService.GetTrace(stream.Context(), query)
if err != nil {
return fmt.Errorf("cannot retrieve trace: %w", err)
}
Expand Down
78 changes: 54 additions & 24 deletions cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
)

var (
matchContext = mock.AnythingOfType("*context.valueCtx")
matchTraceID = mock.AnythingOfType("model.TraceID")
matchContext = mock.AnythingOfType("*context.valueCtx")
matchGetTraceParameters = mock.AnythingOfType("spanstore.GetTraceParameters")
)

func newGrpcServer(t *testing.T, handler *Handler) (*grpc.Server, net.Addr) {
Expand Down Expand Up @@ -78,33 +78,63 @@ func newTestServerClient(t *testing.T) *testServerClient {
}

func TestGetTrace(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, matchTraceID).Return(
&model.Trace{
Spans: []*model.Span{
{
OperationName: "foobar",
},
traceId, _ := model.TraceIDFromString("156")
testCases := []struct {
name string
expectedQuery spanstore.GetTraceParameters
request api_v3.GetTraceRequest
}{
{
"TestGetTrace",
spanstore.GetTraceParameters{
TraceID: traceId,
StartTime: time.Time{},
EndTime: time.Time{},
},
}, nil).Once()

getTraceStream, err := tsc.client.GetTrace(context.Background(),
&api_v3.GetTraceRequest{
TraceId: "156",
api_v3.GetTraceRequest{TraceId: "156"},
},
)
require.NoError(t, err)
recv, err := getTraceStream.Recv()
require.NoError(t, err)
td := recv.ToTraces()
require.EqualValues(t, 1, td.SpanCount())
assert.Equal(t, "foobar",
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
{
"TestGetTraceWithTimeWindow",
spanstore.GetTraceParameters{
TraceID: traceId,
StartTime: time.Unix(1, 2).UTC(),
EndTime: time.Unix(3, 4).UTC(),
},
api_v3.GetTraceRequest{
TraceId: "156",
StartTime: time.Unix(1, 2).UTC(),
EndTime: time.Unix(3, 4).UTC(),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, tc.expectedQuery).Return(
&model.Trace{
Spans: []*model.Span{
{
OperationName: "foobar",
},
},
}, nil).Once()

getTraceStream, err := tsc.client.GetTrace(context.Background(), &tc.request)
require.NoError(t, err)
recv, err := getTraceStream.Recv()
require.NoError(t, err)
td := recv.ToTraces()
require.EqualValues(t, 1, td.SpanCount())
assert.Equal(t, "foobar",
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
})
}
}

func TestGetTraceStorageError(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, matchTraceID).Return(
tsc.reader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
nil, errors.New("storage_error")).Once()

getTraceStream, err := tsc.client.GetTrace(context.Background(), &api_v3.GetTraceRequest{
Expand All @@ -118,7 +148,7 @@ func TestGetTraceStorageError(t *testing.T) {

func TestGetTraceTraceIDError(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, matchTraceID).Return(
tsc.reader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
&model.Trace{
Spans: []*model.Span{},
}, nil).Once()
Expand Down
10 changes: 8 additions & 2 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
)

const (
paramTraceID = "trace_id" // get trace by ID
paramTraceID = "trace_id" // get trace by ID
paramStartTime = "start_time"
paramEndTime = "end_time"
paramServiceName = "query.service_name" // find traces
paramOperationName = "query.operation_name"
paramTimeMin = "query.start_time_min"
Expand Down Expand Up @@ -136,7 +138,11 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
if h.tryParamError(w, err, paramTraceID) {
return
}
trc, err := h.QueryService.GetTrace(r.Context(), traceID)
// TODO: add start time & end time
request := spanstore.GetTraceParameters{
TraceID: traceID,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy timestamps from request

Copy link
Contributor Author

@rim99 rim99 Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODO, since API changes will be in another PR: #6248

}
trc, err := h.QueryService.GetTrace(r.Context(), request)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestHTTPGatewayGetTraceErrors(t *testing.T) {
// error from span reader
const simErr = "simulated error"
gw.reader.
On("GetTrace", matchContext, matchTraceID).
On("GetTrace", matchContext, matchGetTraceParameters).
Return(nil, errors.New(simErr)).Once()

r, err = http.NewRequest(http.MethodGet, "/api/v3/traces/123", nil)
Expand Down
14 changes: 12 additions & 2 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ func (g *GRPCHandler) GetTrace(r *api_v2.GetTraceRequest, stream api_v2.QuerySer
if r.TraceID == (model.TraceID{}) {
return errUninitializedTraceID
}
trace, err := g.queryService.GetTrace(stream.Context(), r.TraceID)
query := spanstore.GetTraceParameters{
TraceID: r.TraceID,
StartTime: r.StartTime,
EndTime: r.EndTime,
}
trace, err := g.queryService.GetTrace(stream.Context(), query)
if errors.Is(err, spanstore.ErrTraceNotFound) {
g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err))
return status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err)
Expand All @@ -109,7 +114,12 @@ func (g *GRPCHandler) ArchiveTrace(ctx context.Context, r *api_v2.ArchiveTraceRe
if r.TraceID == (model.TraceID{}) {
return nil, errUninitializedTraceID
}
err := g.queryService.ArchiveTrace(ctx, r.TraceID)
query := spanstore.GetTraceParameters{
TraceID: r.TraceID,
StartTime: r.StartTime,
EndTime: r.EndTime,
}
err := g.queryService.ArchiveTrace(ctx, query)
if errors.Is(err, spanstore.ErrTraceNotFound) {
g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err))
return nil, status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err)
Expand Down
Loading
Loading