Skip to content

Commit

Permalink
feat: add tests for the xread command
Browse files Browse the repository at this point in the history
  • Loading branch information
mhughdo committed Aug 27, 2024
1 parent b0d5b8d commit 84627bc
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 0 deletions.
250 changes: 250 additions & 0 deletions internal/app/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,253 @@ func TestGetAndSetCommand(t *testing.T) {
})
}
}

func TestXReadCommand(t *testing.T) {
ctx := context.Background()
srv, port, err := startServer(ctx)
require.NoError(t, err)
defer srv.Close(ctx)

rdb := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("localhost:%d", port),
Password: "",
DB: 0,
})

tests := []struct {
name string
setup func() error
action func() (interface{}, error)
expected interface{}
wantErr bool
}{
{
name: "XRead from empty stream",
setup: func() error {
return nil
},
action: func() (interface{}, error) {
result, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream", "0-0"},
}).Result()
if err == redis.Nil {
return nil, nil
}
return result, err
},
expected: nil,
},
{
name: "XRead with existing entries",
setup: func() error {
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream",
ID: "1-1",
Values: map[string]interface{}{"field1": "value1"},
}).Result()
return err
},
action: func() (interface{}, error) {
return rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream", "0-0"},
}).Result()
},
expected: []redis.XStream{{Stream: "mystream", Messages: []redis.XMessage{{ID: "1-1", Values: map[string]interface{}{"field1": "value1"}}}}},
},
{
name: "XRead with COUNT",
setup: func() error {
for i := 0; i < 5; i++ {
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream2",
ID: "*",
Values: map[string]interface{}{fmt.Sprintf("field%d", i): fmt.Sprintf("value%d", i)},
}).Result()
if err != nil {
return err
}
}
return nil
},
action: func() (interface{}, error) {
streams, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream2", "0-0"},
Count: 3,
}).Result()
if err != nil {
return nil, err
}
return len(streams[0].Messages), nil
},
expected: 3,
},
{
name: "XRead with BLOCK (timeout)",
setup: func() error {
return nil
},
action: func() (interface{}, error) {
start := time.Now()
_, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream3", "$"},
Block: time.Second,
}).Result()
duration := time.Since(start)
if err == redis.Nil {
return duration >= time.Second, nil
}
return nil, err
},
expected: true,
},
{
name: "XRead with BLOCK (new entry added)",
setup: func() error {
return nil
},
action: func() (interface{}, error) {
go func() {
time.Sleep(500 * time.Millisecond)
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream6",
ID: "1-1",
Values: map[string]interface{}{"field1": "value1"},
}).Result()
if err != nil {
t.Errorf("Failed to add entry: %v", err)
}
}()

return rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream6", "$"},
Block: 2 * time.Second,
}).Result()
},
expected: []redis.XStream{{Stream: "mystream6", Messages: []redis.XMessage{{ID: "1-1", Values: map[string]interface{}{"field1": "value1"}}}}},
},
{
name: "XRead with BLOCK (multiple streams)",
setup: func() error {
return nil
},
action: func() (interface{}, error) {
go func() {
time.Sleep(500 * time.Millisecond)
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream7",
ID: "1-1",
Values: map[string]interface{}{"field1": "value1"},
}).Result()
if err != nil {
t.Errorf("Failed to add entry: %v", err)
}
}()

return rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream7", "mystream8", "$", "$"},
Block: 2 * time.Second,
}).Result()
},
expected: []redis.XStream{{Stream: "mystream7", Messages: []redis.XMessage{{ID: "1-1", Values: map[string]interface{}{"field1": "value1"}}}}},
},
{
name: "XRead with BLOCK and COUNT",
setup: func() error {
return nil
},
action: func() (interface{}, error) {
go func() {
for i := 0; i < 3; i++ {
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream9",
ID: "*",
Values: map[string]interface{}{fmt.Sprintf("field%d", i): fmt.Sprintf("value%d", i)},
}).Result()
if err != nil {
t.Errorf("Failed to add entry: %v", err)
}
}
}()

streams, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream9", "0-0"},
Block: 2 * time.Second,
Count: 2,
}).Result()
if err != nil {
return nil, err
}
return len(streams[0].Messages), nil
},
expected: 1,
},
{
name: "XRead with BLOCK (immediate return for non-empty stream)",
setup: func() error {
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream10",
ID: "*",
Values: map[string]interface{}{"field1": "value1"},
}).Result()
return err
},
action: func() (interface{}, error) {
start := time.Now()
_, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream10", "+"},
Block: 5 * time.Second,
}).Result()
duration := time.Since(start)
if err != nil {
return nil, err
}
return duration < time.Second, nil
},
expected: true,
},
{
name: "XRead with multiple streams",
setup: func() error {
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream4",
ID: "*",
Values: map[string]interface{}{"field1": "value1"},
}).Result()
if err != nil {
return err
}
_, err = rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream5",
ID: "*",
Values: map[string]interface{}{"field2": "value2"},
}).Result()
return err
},
action: func() (interface{}, error) {
streams, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream4", "mystream5", "0-0", "0-0"},
}).Result()
if err != nil {
return nil, err
}
return len(streams), nil
},
expected: 2, // Number of streams
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
require.NoError(t, tt.setup())
}
result, err := tt.action()
if tt.wantErr {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tt.expected, result)
}
})
}
}
36 changes: 36 additions & 0 deletions pkg/keyval/radix_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,42 @@ func TestIsLargestID(t *testing.T) {
}
}

func TestValidateID(t *testing.T) {
tree := NewRadixTree()

tests := []struct {
name string
id string
wantValid bool
wantID string
}{
{"Valid complete ID", "1234567890-0", true, "1234567890-0"},
{"Valid incomplete ID", "1234567890", true, "1234567890-0"},
{"Valid special ID +", "+", true, "+"},
{"Valid special ID -", "-", true, "-"},
{"Invalid ID with multiple hyphens", "1234-567-890", false, ""},
{"Invalid ID with negative timestamp", "-1234567890-0", false, ""},
{"Invalid ID with negative sequence", "1234567890--1", false, ""},
{"Invalid ID with non-numeric timestamp", "abcdefghij-0", false, ""},
{"Invalid ID with non-numeric sequence", "1234567890-a", false, ""},
{"Empty string", "", false, ""},
{"Invalid ID with only hyphen", "-", true, "-"},
{"Invalid ID with space", "1234567890 0", false, ""},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotValid, gotID := tree.ValidateID(tt.id)
if gotValid != tt.wantValid {
t.Errorf("ValidateID() gotValid = %v, want %v", gotValid, tt.wantValid)
}
if gotID != tt.wantID {
t.Errorf("ValidateID() gotID = %v, want %v", gotID, tt.wantID)
}
})
}
}

func incrementSequence(seq string) string {
i, _ := strconv.ParseInt(seq, 10, 64)
return strconv.FormatInt(i+1, 10)
Expand Down

0 comments on commit 84627bc

Please sign in to comment.