Skip to content

Commit

Permalink
pglock: unify lock release logic between client.Release() and lock.Cl…
Browse files Browse the repository at this point in the history
…ose()

Addresses #39
  • Loading branch information
ucirello committed May 18, 2023
1 parent 2533e12 commit 8716e2a
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 15 deletions.
12 changes: 3 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,11 @@ func (c *Client) Release(l *Lock) error {
}

// ReleaseContext will update the mutex entry to be able to be taken by other
// clients.
// clients. If a heartbeat is running, it will stopped it.
func (c *Client) ReleaseContext(ctx context.Context, l *Lock) error {
if l.IsReleased() {
l.heartbeatWG.Wait()
return ErrLockAlreadyReleased
}
l.heartbeatCancel()
l.heartbeatWG.Wait()
err := c.retry(ctx, func() error { return c.storeRelease(ctx, l) })
if l.IsReleased() {
l.heartbeatWG.Wait()
}
return err
}

Expand Down Expand Up @@ -343,7 +338,6 @@ func (c *Client) storeRelease(ctx context.Context, l *Lock) error {
return typedError(err, "cannot confirm whether the lock has been released")
} else if affected == 0 {
l.isReleased = true
l.heartbeatCancel()
return ErrLockAlreadyReleased
}
if !l.keepOnRelease {
Expand Down
5 changes: 4 additions & 1 deletion client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ func TestDBErrorHandling(t *testing.T) {
t.Fatal("cannot create mock:", err)
}
client.db = db
ctx, cancel := context.WithCancel(context.Background())
return client, mock, &Lock{
leaseDuration: time.Minute,
heartbeatContext: ctx,
heartbeatCancel: cancel,
leaseDuration: time.Minute,
}
}
t.Run("acquire", func(t *testing.T) {
Expand Down
43 changes: 43 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"cirello.io/pglock"
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/lib/pq"
"golang.org/x/sync/errgroup"
)

type fakeDriver struct{}
Expand Down Expand Up @@ -1208,3 +1209,45 @@ func TestGetAllLocks(t *testing.T) {
}
}
}

func TestStaleAfterRelease(t *testing.T) {
db := setupDB(t)
defer db.Close()
db.SetMaxOpenConns(30)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(30 * time.Minute)

c, err := pglock.New(db, pglock.WithOwner("TestStaleAfterRelease"))
if err != nil {
t.Fatal("cannot connect:", err)
}
if _, err := db.Exec("DELETE FROM " + pglock.DefaultTableName + " WHERE owner = 'TestStaleAfterRelease'"); err != nil {
t.Fatal("cannot reset table:", err)
}
var (
group errgroup.Group
start = make(chan struct{})
)
for i := 0; i < 100; i++ {
lockName := fmt.Sprint("lock-name-", i)
group.Go(func() error {
<-start
l, err := c.Acquire(lockName)
if err != nil {
return fmt.Errorf("cannot acquire lock (%q): %w", lockName, err)
}
t.Log(lockName, "acquired")
time.Sleep(6 * time.Second)
if err := l.Close(); err != nil {
return fmt.Errorf("cannot release lock (%q): %w", lockName, err)
}
t.Log(lockName, "released")
return nil
})
}
close(start)
errGroup := group.Wait()
if errGroup != nil {
t.Fatal("unexpected error: ", errGroup)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.11.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/text v0.7.0 // indirect
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
5 changes: 0 additions & 5 deletions lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ func (l *Lock) Data() []byte {

// Close releases the lock and interrupts the locks heartbeat, if configured.
func (l *Lock) Close() error {
// The lock release and the heartbeat may collide. If the heartbeat
// context is canceled during the lock release, the system will rely on
// the elapsed lease duration to make progress. By releasing the lock
// first, it won't matter whether it skips the heartbeat check or not.
err := l.client.Release(l)
l.heartbeatCancel()
return err
}

Expand Down

0 comments on commit 8716e2a

Please sign in to comment.