From 820b9b0c3a846f6d5726c02b21f470f9780afebe Mon Sep 17 00:00:00 2001 From: qloog Date: Wed, 4 Sep 2024 12:02:35 +0800 Subject: [PATCH] feat: add re-entry and automatic renewal for redis lock --- CHANGELOG.md | 1 + go.mod | 4 +- go.sum | 9 +--- pkg/lock/luascript.go | 22 ++++++++ pkg/lock/redis.go | 119 ++++++++++++++++++++++++++++++++++------- pkg/lock/redis_test.go | 85 +++++++++++++++++++++++++++++ 6 files changed, 212 insertions(+), 28 deletions(-) create mode 100644 pkg/lock/luascript.go create mode 100644 pkg/lock/redis_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e56bc39b..8722897f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## v1.10.0 - feat: support async flush log to disk +- feat: add re-entry and automatic renewal for redis lock - chore: using gorm offical plguin for tracing and metrics ## v1.9.0 diff --git a/go.mod b/go.mod index fb142853a..2c4fc55cd 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/go-eagle/eagle go 1.22.3 require ( - github.com/1024casts/gorm-opentelemetry v1.0.1-0.20210805144709-183269b54068 github.com/Shopify/sarama v1.19.0 github.com/alicebob/miniredis/v2 v2.15.1 github.com/cenkalti/backoff/v4 v4.2.1 @@ -74,6 +73,7 @@ require ( gorm.io/driver/mysql v1.5.2 gorm.io/driver/postgres v1.5.4 gorm.io/gorm v1.25.10 + gorm.io/plugin/opentelemetry v0.1.4 ) require ( @@ -170,6 +170,7 @@ require ( github.com/shopspring/decimal v1.4.0 // indirect github.com/spf13/afero v1.9.2 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.2.0 // indirect github.com/swaggo/swag v1.7.0 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect @@ -203,5 +204,4 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gorm.io/plugin/opentelemetry v0.1.4 // indirect ) diff --git a/go.sum b/go.sum index 72ee4d886..47b28b64a 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,6 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/1024casts/gorm-opentelemetry v1.0.1-0.20210805144709-183269b54068 h1:XWMHFSEROBGd33gZNMsTb6zxECMN8xOJkO0ucOJdz58= -github.com/1024casts/gorm-opentelemetry v1.0.1-0.20210805144709-183269b54068/go.mod h1:nEAgMK5Iab8nqYy8zn1tYjEjkeDA2PmszXOwZhlDrMs= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak= github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= @@ -421,7 +419,6 @@ github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= -github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -503,7 +500,6 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI= github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -1255,11 +1251,8 @@ gorm.io/driver/mysql v1.5.2 h1:QC2HRskSE75wBuOxe0+iCkyJZ+RqpudsQtqkp+IMuXs= gorm.io/driver/mysql v1.5.2/go.mod h1:pQLhh1Ut/WUAySdTHwBpBv6+JKcj+ua4ZFx1QQTBzb8= gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo= gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0= -gorm.io/driver/sqlite v1.1.4 h1:PDzwYE+sI6De2+mxAneV9Xs11+ZyKV6oxD3wDGkaNvM= -gorm.io/driver/sqlite v1.1.4/go.mod h1:mJCeTFr7+crvS+TRnWc5Z3UvwxUN1BGBLMrf5LA9DYw= gorm.io/driver/sqlite v1.5.0 h1:zKYbzRCpBrT1bNijRnxLDJWPjVfImGEn0lSnUY5gZ+c= -gorm.io/gorm v1.20.7/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw= -gorm.io/gorm v1.20.12/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw= +gorm.io/driver/sqlite v1.5.0/go.mod h1:kDMDfntV9u/vuMmz8APHtHF0b4nyBB7sfCieC6G8k8I= gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s= gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= diff --git a/pkg/lock/luascript.go b/pkg/lock/luascript.go new file mode 100644 index 000000000..1680f998e --- /dev/null +++ b/pkg/lock/luascript.go @@ -0,0 +1,22 @@ +package lock + +var ( + // lockscript lua script for acrequire a lock + lockLuaScript = ` +if redis.call("GET", KEYS[1]) == ARGV[1] then + redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2]) + return "OK" +else + return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2]) +end +` + + // unlockscript lua script for release a lock + unlockLuaScript = ` +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +else + return 0 +end +` +) diff --git a/pkg/lock/redis.go b/pkg/lock/redis.go index b2b003824..0d617d585 100644 --- a/pkg/lock/redis.go +++ b/pkg/lock/redis.go @@ -2,20 +2,42 @@ package lock import ( "context" + "errors" "fmt" + "strconv" + "sync" "time" + "github.com/redis/go-redis/v9" + "github.com/go-eagle/eagle/pkg/log" +) - "github.com/redis/go-redis/v9" +const ( + // renewalDuration is the renewal duration. + renewalDuration int64 = 1000 +) + +// Lua scripts for locking and unlocking +// It only init once and cache in memory +var ( + + // lockScript init lua script + lockScript = redis.NewScript(lockLuaScript) + + // unlockScript init lua script + unlockScript = redis.NewScript(unlockLuaScript) ) -// RedisLock is a redis lock. +// RedisLock is a Redis-based distributed lock. type RedisLock struct { key string redisClient *redis.Client token string expiration time.Duration + mu sync.Mutex // 用于保护共享属性 + renewing bool // 续期标志 + stopRenew chan struct{} // 用于停止续期 } // NewRedisLock new a redis lock instance @@ -26,38 +48,99 @@ func NewRedisLock(rdb *redis.Client, key string, expiration time.Duration) *Redi redisClient: rdb, token: genToken(), expiration: expiration, + stopRenew: make(chan struct{}), } return opt } // Lock acquires the lock. +// It will return false if the lock is already acquired. func (l *RedisLock) Lock(ctx context.Context) (bool, error) { - isSet, err := l.redisClient.SetNX(ctx, l.key, l.token, l.expiration).Result() - if err == redis.Nil { - return false, nil - } else if err != nil { - log.Errorf("acquires the lock err, key: %s, err: %s", l.key, err.Error()) + // 加锁,防止并发问题 + l.mu.Lock() + defer l.mu.Unlock() + + ret, err := lockScript.Run(ctx, l.redisClient, []string{l.key}, + []string{l.token, strconv.FormatInt(l.expiration.Milliseconds()+renewalDuration, 10)}, + ).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return false, nil + } + + log.WithContext(ctx).Errorf("redis lock: acquires the lock err, key: %s, err: %s", l.key, err.Error()) return false, err + } else if ret == nil { + return false, nil + } + + reply, ok := ret.(string) + if ok && reply == "OK" { + if !l.renewing { + l.renewing = true + go l.autoRenew(ctx) // 启动续期协程 + } + return true, nil } - return isSet, nil + + return false, nil } -// Unlock del the lock. +// Unlock release a lock. // NOTE: token 一致才会执行删除,避免误删,这里用了lua脚本进行事务处理 func (l *RedisLock) Unlock(ctx context.Context) (bool, error) { - luaScript := "if redis.call('GET',KEYS[1]) == ARGV[1] then return redis.call('DEL',KEYS[1]) else return 0 end" - ret, err := l.redisClient.Eval(ctx, luaScript, []string{l.key}, l.token).Result() - if err != nil { - return false, err + // 解锁时也需要加锁保护 + l.mu.Lock() + defer l.mu.Unlock() + + // 停止续期协程并标记不再续期 + if l.renewing { + close(l.stopRenew) + l.renewing = false } - reply, ok := ret.(int64) - if !ok { - return false, nil + + for i := 0; i < 3; i++ { // 最多重试3次 + ret, err := unlockScript.Run(ctx, l.redisClient, []string{l.key}, l.token).Result() + if err != nil { + log.WithContext(ctx).Errorf("redis lock: failed to unlock, attempt %d, key: %s, err: %v", i+1, l.key, err) + time.Sleep(50 * time.Millisecond) // 等待一下再重试 + continue + } + reply, ok := ret.(int64) + if ok && reply == 1 { + return true, nil + } + break } - return reply == 1, nil + + return false, errors.New("redis lock: failed to unlock after multiple attempts") +} + +func (l *RedisLock) autoRenew(ctx context.Context) { + ticker := time.NewTicker(time.Duration(renewalDuration) * time.Millisecond / 2) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // 续期操作 + l.mu.Lock() + l.redisClient.Expire(ctx, l.key, l.expiration) + l.mu.Unlock() + case <-l.stopRenew: + return + case <-ctx.Done(): + return + } + } +} + +// GetTTL returns the TTL of the lock. +func (l *RedisLock) GetTTL(ctx context.Context) (time.Duration, error) { + return l.redisClient.TTL(ctx, l.key).Result() } -// getRedisKey 获取key +// getRedisKey returns the Redis key for the lock. func getRedisKey(key string) string { return fmt.Sprintf(RedisLockKey, key) } diff --git a/pkg/lock/redis_test.go b/pkg/lock/redis_test.go new file mode 100644 index 000000000..f538c26b2 --- /dev/null +++ b/pkg/lock/redis_test.go @@ -0,0 +1,85 @@ +package lock_test + +import ( + "context" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/go-eagle/eagle/pkg/lock" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" +) + +func TestRedisLock(t *testing.T) { + // Set up a miniredis server + s, err := miniredis.Run() + if err != nil { + t.Fatalf("failed to start miniredis: %v", err) + } + defer s.Close() + + // Create a Redis client + rdb := redis.NewClient(&redis.Options{ + Addr: s.Addr(), + }) + + ctx := context.Background() + + // Test acquiring the lock + l := lock.NewRedisLock(rdb, "test-key", 5*time.Second) + locked, err := l.Lock(ctx) + assert.NoError(t, err, "expected no error on acquiring lock") + assert.True(t, locked, "expected to successfully acquire the lock") + + // Test re-acquiring the lock + lockedAgain, err := l.Lock(ctx) + assert.NoError(t, err, "expected no error on re-acquiring lock") + assert.True(t, lockedAgain, "expected not to acquire the lock again") + + // Test releasing the lock + unlocked, err := l.Unlock(ctx) + assert.NoError(t, err, "expected no error on releasing lock") + assert.True(t, unlocked, "expected to successfully release the lock") + + // Test re-acquiring the lock after release + lockedAgain, err = l.Lock(ctx) + assert.NoError(t, err, "expected no error on re-acquiring lock after release") + assert.True(t, lockedAgain, "expected to successfully re-acquire the lock after release") +} + +func TestAutoRenew(t *testing.T) { + // Set up a miniredis server + s, err := miniredis.Run() + if err != nil { + t.Fatalf("failed to start miniredis: %v", err) + } + defer s.Close() + + // Create a Redis client + rdb := redis.NewClient(&redis.Options{ + Addr: s.Addr(), + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Test auto-renewal of the lock + l := lock.NewRedisLock(rdb, "test-key-auto-renew", 1*time.Second) + locked, err := l.Lock(ctx) + assert.NoError(t, err, "expected no error on acquiring lock") + assert.True(t, locked, "expected to successfully acquire the lock") + + // Wait for some time to ensure the lock is being renewed + time.Sleep(1500 * time.Millisecond) + + // Check the TTL of the key to verify that it has been renewed + ttl, err := l.GetTTL(ctx) + assert.NoError(t, err, "expected no error on checking TTL") + assert.Greater(t, ttl, time.Duration(0), "expected TTL to be greater than zero due to renewal") + + // Release the lock + unlocked, err := l.Unlock(ctx) + assert.NoError(t, err, "expected no error on releasing lock") + assert.True(t, unlocked, "expected to successfully release the lock") +}