Skip to content

Commit

Permalink
feat: redis-writer add buffered sending to significantly improve speed (
Browse files Browse the repository at this point in the history
  • Loading branch information
EquentR authored Nov 25, 2024
1 parent e3bf5c2 commit 087036a
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
58 changes: 58 additions & 0 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"RedisShake/internal/client/proto"
Expand All @@ -20,6 +22,9 @@ type Redis struct {
writer *bufio.Writer
protoReader *proto.Reader
protoWriter *proto.Writer
timer *time.Timer
sendCount uint64
mu sync.Mutex
}

func NewSentinelMasterClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis {
Expand Down Expand Up @@ -81,6 +86,9 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo
r = NewRedisClient(ctx, replicaInfo.BestReplica, username, password, Tls, false)
}

r.timer = time.NewTimer(time.Second)
go r.autoFlush(ctx)

return r
}

Expand Down Expand Up @@ -185,11 +193,54 @@ func (r *Redis) SendBytes(buf []byte) {
r.flush()
}

func (r *Redis) SendBytesBuff(buf []byte) {
r.mu.Lock()
defer r.mu.Unlock()
_, err := r.writer.Write(buf)
if err != nil {
log.Panicf(err.Error())
}
r.flushBuff()
}

func (r *Redis) flushBuff() {
if atomic.AddUint64(&r.sendCount, 1)%100 != 0 {
return
}
if !r.timer.Stop() {
select {
case <-r.timer.C:
default:
}
}
r.timer.Reset(time.Second)
r.flush()
}

func (r *Redis) flush() {
err := r.writer.Flush()
if err != nil {
log.Panicf(err.Error())
}
atomic.StoreUint64(&r.sendCount, 0)
}

func (r *Redis) autoFlush(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-r.timer.C:
if atomic.LoadUint64(&r.sendCount) > 0 {
r.mu.Lock()
err := r.writer.Flush()
r.mu.Unlock()
if err != nil {
log.Panicf(err.Error())
}
}
}
}
}

func (r *Redis) Receive() (interface{}, error) {
Expand Down Expand Up @@ -217,6 +268,13 @@ func (r *Redis) Close() {
if err := r.conn.Close(); err != nil {
log.Infof("close redis conn err: %s\n", err.Error())
}
// release the timer
if !r.timer.Stop() {
select {
case <-r.timer.C:
default:
}
}
}

/* Commands */
Expand Down
11 changes: 10 additions & 1 deletion internal/writer/redis_standalone_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type RedisWriterOptions struct {
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
OffReply bool `mapstructure:"off_reply" default:"false"`
BuffSend bool `mapstructure:"buff_send" default:"false"`
}

type redisStandaloneWriter struct {
Expand All @@ -39,6 +40,8 @@ type redisStandaloneWriter struct {
ch chan *entry.Entry
chWg sync.WaitGroup

buffSend bool

stat struct {
Name string `json:"name"`
UnansweredBytes int64 `json:"unanswered_bytes"`
Expand All @@ -52,6 +55,7 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false)
rw.ch = make(chan *entry.Entry, 1024)
rw.buffSend = opts.BuffSend
if opts.OffReply {
log.Infof("turn off the reply of write")
rw.offReply = true
Expand Down Expand Up @@ -93,7 +97,12 @@ func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entr
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
w.client.SendBytes(bytes)
if w.buffSend {
w.client.SendBytesBuff(bytes)
} else {
w.client.SendBytes(bytes)
}

}
w.chWg.Done()
}()
Expand Down
13 changes: 7 additions & 6 deletions shake.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ username = "" # keep empty if not using ACL
password = "" # keep empty if no authentication is required
tls = false
off_reply = false # turn off the server reply
buff_send = false # buffer send, default false. may be a sync delay when true, but it can greatly improve the speed

[filter]
# Allow keys with specific prefixes or suffixes
Expand Down Expand Up @@ -70,8 +71,8 @@ block_db = []
# allow_command = ["GET", "SET"] # Only allow GET and SET commands
# block_command = ["DEL", "FLUSHDB"] # Block DEL and FLUSHDB commands
# Leave empty to allow all commands
allow_command = []
block_command = []
allow_command = []
block_command = []

# Allow or block specific command groups
# Available groups:
Expand All @@ -82,8 +83,8 @@ block_command = []
# allow_command_group = ["STRING", "HASH"] # Only allow STRING and HASH commands
# block_command_group = ["SCRIPTING", "PUBSUB"] # Block SCRIPTING and PUBSUB commands
# Leave empty to allow all command groups
allow_command_group = []
block_command_group = []
allow_command_group = []
block_command_group = []

# Function for custom data processing
# For best practices and examples, visit:
Expand Down Expand Up @@ -118,7 +119,7 @@ rdb_restore_command_behavior = "panic" # panic, rewrite or skip
pipeline_count_limit = 1024

# This setting corresponds to the 'client-query-buffer-limit' in Redis configuration.
# The default value is typically 1GB.
# The default value is typically 1GB.
# It's recommended not to modify this value unless absolutely necessary.
target_redis_client_max_querybuf_len = 1073741824 # 1GB in bytes

Expand All @@ -128,7 +129,7 @@ target_redis_client_max_querybuf_len = 1073741824 # 1GB in bytes
# It's recommended not to modify this value unless absolutely necessary.
target_redis_proto_max_bulk_len = 512_000_000

# If the source is Elasticache, you can set this item. AWS ElastiCache has custom
# If the source is Elasticache, you can set this item. AWS ElastiCache has custom
# psync command, which can be obtained through a ticket.
aws_psync = "" # example: aws_psync = "10.0.0.1:6379@nmfu2sl5osync,10.0.0.1:6379@xhma21xfkssync"

Expand Down

0 comments on commit 087036a

Please sign in to comment.