Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add re-entry and automatic renewal for redis lock #150

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## v1.10.0

- feat: support async flush log to disk
- feat: add re-entry and automatic renewal for redis lock
- chore: using gorm offical plguin for tracing and metrics

## v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/go-eagle/eagle
go 1.22.3

require (
github.com/1024casts/gorm-opentelemetry v1.0.1-0.20210805144709-183269b54068
github.com/Shopify/sarama v1.19.0
github.com/alicebob/miniredis/v2 v2.15.1
github.com/cenkalti/backoff/v4 v4.2.1
Expand Down Expand Up @@ -74,6 +73,7 @@ require (
gorm.io/driver/mysql v1.5.2
gorm.io/driver/postgres v1.5.4
gorm.io/gorm v1.25.10
gorm.io/plugin/opentelemetry v0.1.4
)

require (
Expand Down Expand Up @@ -170,6 +170,7 @@ require (
github.com/shopspring/decimal v1.4.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/swaggo/swag v1.7.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
Expand Down Expand Up @@ -203,5 +204,4 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/plugin/opentelemetry v0.1.4 // indirect
)
9 changes: 1 addition & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/1024casts/gorm-opentelemetry v1.0.1-0.20210805144709-183269b54068 h1:XWMHFSEROBGd33gZNMsTb6zxECMN8xOJkO0ucOJdz58=
github.com/1024casts/gorm-opentelemetry v1.0.1-0.20210805144709-183269b54068/go.mod h1:nEAgMK5Iab8nqYy8zn1tYjEjkeDA2PmszXOwZhlDrMs=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
Expand Down Expand Up @@ -421,7 +419,6 @@ github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down Expand Up @@ -503,7 +500,6 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI=
github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down Expand Up @@ -1255,11 +1251,8 @@ gorm.io/driver/mysql v1.5.2 h1:QC2HRskSE75wBuOxe0+iCkyJZ+RqpudsQtqkp+IMuXs=
gorm.io/driver/mysql v1.5.2/go.mod h1:pQLhh1Ut/WUAySdTHwBpBv6+JKcj+ua4ZFx1QQTBzb8=
gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo=
gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0=
gorm.io/driver/sqlite v1.1.4 h1:PDzwYE+sI6De2+mxAneV9Xs11+ZyKV6oxD3wDGkaNvM=
gorm.io/driver/sqlite v1.1.4/go.mod h1:mJCeTFr7+crvS+TRnWc5Z3UvwxUN1BGBLMrf5LA9DYw=
gorm.io/driver/sqlite v1.5.0 h1:zKYbzRCpBrT1bNijRnxLDJWPjVfImGEn0lSnUY5gZ+c=
gorm.io/gorm v1.20.7/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
gorm.io/gorm v1.20.12/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
gorm.io/driver/sqlite v1.5.0/go.mod h1:kDMDfntV9u/vuMmz8APHtHF0b4nyBB7sfCieC6G8k8I=
gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s=
gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
Expand Down
22 changes: 22 additions & 0 deletions pkg/lock/luascript.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package lock

var (
// lockscript lua script for acrequire a lock
lockLuaScript = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
return "OK"
else
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end
`

// unlockscript lua script for release a lock
unlockLuaScript = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
)
119 changes: 101 additions & 18 deletions pkg/lock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,42 @@ package lock

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

"github.com/redis/go-redis/v9"

"github.com/go-eagle/eagle/pkg/log"
)

"github.com/redis/go-redis/v9"
const (
// renewalDuration is the renewal duration.
renewalDuration int64 = 1000
)

// Lua scripts for locking and unlocking
// It only init once and cache in memory
var (

// lockScript init lua script
lockScript = redis.NewScript(lockLuaScript)

// unlockScript init lua script
unlockScript = redis.NewScript(unlockLuaScript)
)

// RedisLock is a redis lock.
// RedisLock is a Redis-based distributed lock.
type RedisLock struct {
key string
redisClient *redis.Client
token string
expiration time.Duration
mu sync.Mutex // 用于保护共享属性
renewing bool // 续期标志
stopRenew chan struct{} // 用于停止续期
}

// NewRedisLock new a redis lock instance
Expand All @@ -26,38 +48,99 @@ func NewRedisLock(rdb *redis.Client, key string, expiration time.Duration) *Redi
redisClient: rdb,
token: genToken(),
expiration: expiration,
stopRenew: make(chan struct{}),
}
return opt
}

// Lock acquires the lock.
// It will return false if the lock is already acquired.
func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
isSet, err := l.redisClient.SetNX(ctx, l.key, l.token, l.expiration).Result()
if err == redis.Nil {
return false, nil
} else if err != nil {
log.Errorf("acquires the lock err, key: %s, err: %s", l.key, err.Error())
// 加锁,防止并发问题
l.mu.Lock()
defer l.mu.Unlock()

ret, err := lockScript.Run(ctx, l.redisClient, []string{l.key},
[]string{l.token, strconv.FormatInt(l.expiration.Milliseconds()+renewalDuration, 10)},
).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return false, nil
}

log.WithContext(ctx).Errorf("redis lock: acquires the lock err, key: %s, err: %s", l.key, err.Error())
return false, err
} else if ret == nil {
return false, nil
}

reply, ok := ret.(string)
if ok && reply == "OK" {
if !l.renewing {
l.renewing = true
go l.autoRenew(ctx) // 启动续期协程
}
return true, nil
}
return isSet, nil

return false, nil
}

// Unlock del the lock.
// Unlock release a lock.
// NOTE: token 一致才会执行删除,避免误删,这里用了lua脚本进行事务处理
func (l *RedisLock) Unlock(ctx context.Context) (bool, error) {
luaScript := "if redis.call('GET',KEYS[1]) == ARGV[1] then return redis.call('DEL',KEYS[1]) else return 0 end"
ret, err := l.redisClient.Eval(ctx, luaScript, []string{l.key}, l.token).Result()
if err != nil {
return false, err
// 解锁时也需要加锁保护
l.mu.Lock()
defer l.mu.Unlock()

// 停止续期协程并标记不再续期
if l.renewing {
close(l.stopRenew)
l.renewing = false
}
reply, ok := ret.(int64)
if !ok {
return false, nil

for i := 0; i < 3; i++ { // 最多重试3次
ret, err := unlockScript.Run(ctx, l.redisClient, []string{l.key}, l.token).Result()
if err != nil {
log.WithContext(ctx).Errorf("redis lock: failed to unlock, attempt %d, key: %s, err: %v", i+1, l.key, err)
time.Sleep(50 * time.Millisecond) // 等待一下再重试
continue
}
reply, ok := ret.(int64)
if ok && reply == 1 {
return true, nil
}
break
}
return reply == 1, nil

return false, errors.New("redis lock: failed to unlock after multiple attempts")
}

func (l *RedisLock) autoRenew(ctx context.Context) {
ticker := time.NewTicker(time.Duration(renewalDuration) * time.Millisecond / 2)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// 续期操作
l.mu.Lock()
l.redisClient.Expire(ctx, l.key, l.expiration)
l.mu.Unlock()
case <-l.stopRenew:
return
case <-ctx.Done():
return
}
}
}

// GetTTL returns the TTL of the lock.
func (l *RedisLock) GetTTL(ctx context.Context) (time.Duration, error) {
return l.redisClient.TTL(ctx, l.key).Result()
}

// getRedisKey 获取key
// getRedisKey returns the Redis key for the lock.
func getRedisKey(key string) string {
return fmt.Sprintf(RedisLockKey, key)
}
85 changes: 85 additions & 0 deletions pkg/lock/redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package lock_test

import (
"context"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/go-eagle/eagle/pkg/lock"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

func TestRedisLock(t *testing.T) {
// Set up a miniredis server
s, err := miniredis.Run()
if err != nil {
t.Fatalf("failed to start miniredis: %v", err)
}
defer s.Close()

// Create a Redis client
rdb := redis.NewClient(&redis.Options{
Addr: s.Addr(),
})

ctx := context.Background()

// Test acquiring the lock
l := lock.NewRedisLock(rdb, "test-key", 5*time.Second)
locked, err := l.Lock(ctx)
assert.NoError(t, err, "expected no error on acquiring lock")
assert.True(t, locked, "expected to successfully acquire the lock")

// Test re-acquiring the lock
lockedAgain, err := l.Lock(ctx)
assert.NoError(t, err, "expected no error on re-acquiring lock")
assert.True(t, lockedAgain, "expected not to acquire the lock again")

// Test releasing the lock
unlocked, err := l.Unlock(ctx)
assert.NoError(t, err, "expected no error on releasing lock")
assert.True(t, unlocked, "expected to successfully release the lock")

// Test re-acquiring the lock after release
lockedAgain, err = l.Lock(ctx)
assert.NoError(t, err, "expected no error on re-acquiring lock after release")
assert.True(t, lockedAgain, "expected to successfully re-acquire the lock after release")
}

func TestAutoRenew(t *testing.T) {
// Set up a miniredis server
s, err := miniredis.Run()
if err != nil {
t.Fatalf("failed to start miniredis: %v", err)
}
defer s.Close()

// Create a Redis client
rdb := redis.NewClient(&redis.Options{
Addr: s.Addr(),
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Test auto-renewal of the lock
l := lock.NewRedisLock(rdb, "test-key-auto-renew", 1*time.Second)
locked, err := l.Lock(ctx)
assert.NoError(t, err, "expected no error on acquiring lock")
assert.True(t, locked, "expected to successfully acquire the lock")

// Wait for some time to ensure the lock is being renewed
time.Sleep(1500 * time.Millisecond)

// Check the TTL of the key to verify that it has been renewed
ttl, err := l.GetTTL(ctx)
assert.NoError(t, err, "expected no error on checking TTL")
assert.Greater(t, ttl, time.Duration(0), "expected TTL to be greater than zero due to renewal")

// Release the lock
unlocked, err := l.Unlock(ctx)
assert.NoError(t, err, "expected no error on releasing lock")
assert.True(t, unlocked, "expected to successfully release the lock")
}
Loading