Skip to content

Commit

Permalink
drop generic marshaller
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Apr 15, 2024
1 parent 0ebca82 commit 92a7e3d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 52 deletions.
29 changes: 18 additions & 11 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -31,13 +32,13 @@ type ConsumerConfig struct {
var DefaultConsumerConfig = &ConsumerConfig{
ResponseEntryTimeout: time.Hour,
KeepAliveTimeout: 5 * time.Minute,
RedisStream: "default",
RedisGroup: defaultGroup,
RedisStream: "",
RedisGroup: "",
}

var DefaultTestConsumerConfig = &ConsumerConfig{
RedisStream: "default",
RedisGroup: defaultGroup,
RedisStream: "test_stream",
RedisGroup: "test_group",
ResponseEntryTimeout: time.Minute,
KeepAliveTimeout: 30 * time.Millisecond,
}
Expand All @@ -57,8 +58,6 @@ type Consumer[Request any, Response any] struct {
id string
client redis.UniversalClient
cfg *ConsumerConfig
mReq jsonMarshaller[Request]
mResp jsonMarshaller[Response]
}

type Message[Request any] struct {
Expand All @@ -70,6 +69,12 @@ func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerCo
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
if cfg.RedisStream == "" {
return nil, fmt.Errorf("redis stream name cannot be empty")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group name cannot be emtpy")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
Expand All @@ -78,8 +83,6 @@ func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerCo
id: uuid.NewString(),
client: c,
cfg: cfg,
mReq: jsonMarshaller[Request]{},
mResp: jsonMarshaller[Response]{},
}
return consumer, nil
}
Expand Down Expand Up @@ -147,8 +150,8 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
if !ok {
return nil, fmt.Errorf("casting request to string: %w", err)
}
req, err := c.mReq.Unmarshal([]byte(data))
if err != nil {
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("unmarshaling value: %v, error: %w", value, err)
}

Expand All @@ -159,7 +162,11 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
}

func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID string, result Response) error {
acquired, err := c.client.SetNX(ctx, messageID, c.mResp.Marshal(result), c.cfg.ResponseEntryTimeout).Result()
resp, err := json.Marshal(result)
if err != nil {
return fmt.Errorf("marshaling result: %w", err)
}
acquired, err := c.client.SetNX(ctx, messageID, resp, c.cfg.ResponseEntryTimeout).Result()
if err != nil || !acquired {
return fmt.Errorf("setting result for message: %v, error: %w", messageID, err)
}
Expand Down
64 changes: 23 additions & 41 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,11 @@ const (
defaultGroup = "default_consumer_group"
)

// Generic marshaller for Request and Response generic types.
// Note: unexported fields will be silently ignored.
type jsonMarshaller[T any] struct{}

// Marshal marshals generic type object with json marshal.
func (m jsonMarshaller[T]) Marshal(v T) []byte {
data, err := json.Marshal(v)
if err != nil {
log.Error("error marshaling", "value", v, "error", err)
return nil
}
return data
}

// Unmarshal converts a JSON byte slice back to the generic type object.
func (j jsonMarshaller[T]) Unmarshal(val []byte) (T, error) {
var v T
if err := json.Unmarshal(val, &v); err != nil {
return v, err
}
return v, nil
}

type Producer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ProducerConfig
mReq jsonMarshaller[Request]
mResp jsonMarshaller[Response]

promisesLock sync.RWMutex
promises map[string]*containers.Promise[Response]
Expand Down Expand Up @@ -92,17 +67,17 @@ type ProducerConfig struct {

var DefaultProducerConfig = &ProducerConfig{
EnableReproduce: true,
RedisStream: "default",
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: time.Second,
KeepAliveTimeout: 5 * time.Minute,
CheckResultInterval: 5 * time.Second,
RedisGroup: defaultGroup,
}

var DefaultTestProducerConfig = &ProducerConfig{
EnableReproduce: true,
RedisStream: "default",
RedisGroup: defaultGroup,
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
Expand All @@ -121,6 +96,12 @@ func NewProducer[Request any, Response any](cfg *ProducerConfig) (*Producer[Requ
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
if cfg.RedisStream == "" {
return nil, fmt.Errorf("redis stream cannot be emtpy")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group cannot be empty")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
Expand All @@ -129,8 +110,6 @@ func NewProducer[Request any, Response any](cfg *ProducerConfig) (*Producer[Requ
id: uuid.NewString(),
client: c,
cfg: cfg,
mReq: jsonMarshaller[Request]{},
mResp: jsonMarshaller[Response]{},
promises: make(map[string]*containers.Promise[Response]),
}, nil
}
Expand Down Expand Up @@ -191,12 +170,12 @@ func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.D
}
log.Error("Error reading value in redis", "key", id, "error", err)
}
val, err := p.mResp.Unmarshal([]byte(res))
if err != nil {
var resp Response
if err := json.Unmarshal([]byte(res), &resp); err != nil {
log.Error("Error unmarshaling", "value", res, "error", err)
continue
}
promise.Produce(val)
promise.Produce(resp)
delete(p.promises, id)
}
return p.cfg.CheckResultInterval
Expand All @@ -216,9 +195,13 @@ func (p *Producer[Request, Response]) promisesLen() int {
// message that was sent to inactive consumer and reinserts it into the stream,
// so that seamlessly return the answer in the same promise.
func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Request, oldKey string) (*containers.Promise[Response], error) {
val, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("marshaling value: %w", err)
}
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.cfg.RedisStream,
Values: map[string]any{messageKey: p.mReq.Marshal(value)},
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
return nil, fmt.Errorf("adding values to redis: %w", err)
Expand Down Expand Up @@ -250,11 +233,10 @@ func (p *Producer[Request, Response]) Produce(ctx context.Context, value Request

// Check if a consumer is with specified ID is alive.
func (p *Producer[Request, Response]) isConsumerAlive(ctx context.Context, consumerID string) bool {
val, err := p.client.Get(ctx, heartBeatKey(consumerID)).Int64()
if err != nil {
if _, err := p.client.Get(ctx, heartBeatKey(consumerID)).Int64(); err != nil {
return false
}
return time.Now().UnixMilli()-val < int64(p.cfg.KeepAliveTimeout.Milliseconds())
return true
}

func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {
Expand Down Expand Up @@ -318,13 +300,13 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
if !ok {
return nil, fmt.Errorf("casting request: %v to bytes", msg.Values[messageKey])
}
val, err := p.mReq.Unmarshal([]byte(data))
if err != nil {
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("marshaling value: %v, error: %w", msg.Values[messageKey], err)
}
res = append(res, &Message[Request]{
ID: msg.ID,
Value: val,
Value: req,
})
}
return res, nil
Expand Down

0 comments on commit 92a7e3d

Please sign in to comment.