diff --git a/stream/operator.go b/stream/operator.go index 4beb4f5a7..4a53528a7 100644 --- a/stream/operator.go +++ b/stream/operator.go @@ -620,3 +620,57 @@ func (op *TableDeleteOperator) Op() (OperatorFunc, error) { func (op *TableDeleteOperator) String() string { return fmt.Sprintf("tableDelete('%s')", op.Name) } + +// A DistinctOperator filters duplicate documents. +type DistinctOperator struct{} + +// Distinct filters duplicate documents based on one or more expressions. +func Distinct(exprs ...expr.Expr) *DistinctOperator { + return &DistinctOperator{} +} + +// Op implements the Operator interface. +func (op *DistinctOperator) Op() (OperatorFunc, error) { + var buf bytes.Buffer + enc := document.NewValueEncoder(&buf) + m := make(map[string]struct{}) + + return func(env *expr.Environment) (*expr.Environment, error) { + buf.Reset() + + d, ok := env.GetDocument() + if !ok { + return nil, errors.New("missing document") + } + + fields, err := document.Fields(d) + if err != nil { + return nil, err + } + + for _, field := range fields { + value, err := d.GetByField(field) + if err != nil { + return nil, err + } + + err = enc.Encode(value) + if err != nil { + return nil, err + } + } + + _, ok = m[string(buf.Bytes())] + // if value already exists, filter it out + if ok { + return nil, nil + } + + m[buf.String()] = struct{}{} + return env, nil + }, nil +} + +func (op *DistinctOperator) String() string { + return "distinct()" +} diff --git a/stream/operator_test.go b/stream/operator_test.go index c7a771d8a..27fd0f6e4 100644 --- a/stream/operator_test.go +++ b/stream/operator_test.go @@ -637,3 +637,53 @@ func TestTableDelete(t *testing.T) { require.Equal(t, stream.TableDelete("test").String(), "tableDelete('test')") }) } + +func TestDistinct(t *testing.T) { + tests := []struct { + name string + values testutil.Docs + want testutil.Docs + fails bool + }{ + { + "all different", + testutil.MakeDocuments(`{"a": 0}`, `{"a": null}`, `{"a": true}`), + testutil.MakeDocuments(`{"a": 0}`, `{"a": null}`, `{"a": true}`), + false, + }, + { + "some duplicates", + testutil.MakeDocuments(`{"a": 0}`, `{"a": 0}`, `{"a": null}`, `{"a": null}`, `{"a": true}`, `{"a": true}`, `{"a": [1, 2]}`, `{"a": [1, 2]}`), + testutil.MakeDocuments(`{"a": 0}`, `{"a": null}`, `{"a": true}`, `{"a": [1, 2]}`), + false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := stream.New(stream.NewDocumentIterator(test.values...)) + s = s.Pipe(stream.Distinct()) + + var got []document.Document + err := s.Iterate(func(env *expr.Environment) error { + d, ok := env.GetDocument() + require.True(t, ok) + var fb document.FieldBuffer + err := fb.Copy(d) + require.NoError(t, err) + got = append(got, &fb) + return nil + }) + if test.fails { + require.Error(t, err) + } else { + require.NoError(t, err) + test.want.RequireEqual(t, got) + } + }) + } + + t.Run("String", func(t *testing.T) { + require.Equal(t, `distinct()`, stream.Distinct().String()) + }) +}