Skip to content

Commit

Permalink
Add distinct operator
Browse files Browse the repository at this point in the history
  • Loading branch information
asdine committed Jan 5, 2021
1 parent bf22c86 commit b013422
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 0 deletions.
54 changes: 54 additions & 0 deletions stream/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,57 @@ func (op *TableDeleteOperator) Op() (OperatorFunc, error) {
func (op *TableDeleteOperator) String() string {
return fmt.Sprintf("tableDelete('%s')", op.Name)
}

// A DistinctOperator filters duplicate documents.
type DistinctOperator struct{}

// Distinct filters duplicate documents based on one or more expressions.
func Distinct(exprs ...expr.Expr) *DistinctOperator {
return &DistinctOperator{}
}

// Op implements the Operator interface.
func (op *DistinctOperator) Op() (OperatorFunc, error) {
var buf bytes.Buffer
enc := document.NewValueEncoder(&buf)
m := make(map[string]struct{})

return func(env *expr.Environment) (*expr.Environment, error) {
buf.Reset()

d, ok := env.GetDocument()
if !ok {
return nil, errors.New("missing document")
}

fields, err := document.Fields(d)
if err != nil {
return nil, err
}

for _, field := range fields {
value, err := d.GetByField(field)
if err != nil {
return nil, err
}

err = enc.Encode(value)
if err != nil {
return nil, err
}
}

_, ok = m[string(buf.Bytes())]
// if value already exists, filter it out
if ok {
return nil, nil
}

m[buf.String()] = struct{}{}
return env, nil
}, nil
}

func (op *DistinctOperator) String() string {
return "distinct()"
}
50 changes: 50 additions & 0 deletions stream/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,3 +637,53 @@ func TestTableDelete(t *testing.T) {
require.Equal(t, stream.TableDelete("test").String(), "tableDelete('test')")
})
}

func TestDistinct(t *testing.T) {
tests := []struct {
name string
values testutil.Docs
want testutil.Docs
fails bool
}{
{
"all different",
testutil.MakeDocuments(`{"a": 0}`, `{"a": null}`, `{"a": true}`),
testutil.MakeDocuments(`{"a": 0}`, `{"a": null}`, `{"a": true}`),
false,
},
{
"some duplicates",
testutil.MakeDocuments(`{"a": 0}`, `{"a": 0}`, `{"a": null}`, `{"a": null}`, `{"a": true}`, `{"a": true}`, `{"a": [1, 2]}`, `{"a": [1, 2]}`),
testutil.MakeDocuments(`{"a": 0}`, `{"a": null}`, `{"a": true}`, `{"a": [1, 2]}`),
false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := stream.New(stream.NewDocumentIterator(test.values...))
s = s.Pipe(stream.Distinct())

var got []document.Document
err := s.Iterate(func(env *expr.Environment) error {
d, ok := env.GetDocument()
require.True(t, ok)
var fb document.FieldBuffer
err := fb.Copy(d)
require.NoError(t, err)
got = append(got, &fb)
return nil
})
if test.fails {
require.Error(t, err)
} else {
require.NoError(t, err)
test.want.RequireEqual(t, got)
}
})
}

t.Run("String", func(t *testing.T) {
require.Equal(t, `distinct()`, stream.Distinct().String())
})
}

0 comments on commit b013422

Please sign in to comment.