Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flock-based File Mutex #86

Open
wants to merge 3 commits into
base: 3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions src/FileMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ final class FileMutex implements Mutex
private const LATENCY_TIMEOUT = 0.01;
private const DELAY_LIMIT = 1;

private static ?\Closure $errorHandler = null;

private readonly Filesystem $filesystem;

private readonly string $directory;
Expand All @@ -26,39 +28,63 @@ public function __construct(private readonly string $fileName, ?Filesystem $file
$this->directory = \dirname($this->fileName);
}

/**
* @throws SyncException
*/
public function acquire(?Cancellation $cancellation = null): Lock
{
if (!$this->filesystem->isDirectory($this->directory)) {
throw new SyncException(\sprintf('Directory of "%s" does not exist or is not a directory', $this->fileName));
}

// Try to create the lock file. If the file already exists, someone else
// has the lock, so set an asynchronous timer and try again.
// Try to create and lock the file. If flock fails, someone else already has the lock,
// so set an asynchronous timer and try again.
for ($attempt = 0; true; ++$attempt) {
try {
$file = $this->filesystem->openFile($this->fileName, 'x');

// Return a lock object that can be used to release the lock on the mutex.
$lock = new Lock($this->release(...));
\set_error_handler(self::$errorHandler ??= static fn () => true);

$file->close();
try {
$handle = \fopen($this->fileName, 'c');
if ($handle) {
if (\flock($handle, \LOCK_EX | \LOCK_NB, $wouldBlock)) {
return new Lock(fn () => $this->release($handle));
}

return $lock;
} catch (FilesystemException) {
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * (2 ** $attempt)), cancellation: $cancellation);
if (!$wouldBlock) {
throw new FilesystemException(\sprintf(
'flock call on "%s" failed: %s',
$this->fileName,
\error_get_last()['message'] ?? 'Unknown error',
));
}
}
} finally {
\restore_error_handler();
}

$multiplier = 2 ** \min(31, $attempt);
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * $multiplier), cancellation: $cancellation);
}
}

/**
* Releases the lock on the mutex.
*
* @param resource $handle
*
* @throws SyncException
*/
private function release(): void
private function release($handle): void
{
try {
$this->filesystem->deleteFile($this->fileName);

\set_error_handler(self::$errorHandler ??= static fn () => true);

try {
\fclose($handle);
} finally {
\restore_error_handler();
}
} catch (\Throwable $exception) {
throw new SyncException(
'Failed to unlock the mutex file: ' . $this->fileName,
Expand Down
45 changes: 4 additions & 41 deletions src/KeyedFileMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@
use Amp\Sync\KeyedMutex;
use Amp\Sync\Lock;
use Amp\Sync\SyncException;
use function Amp\delay;

final class KeyedFileMutex implements KeyedMutex
{
private const LATENCY_TIMEOUT = 0.01;
private const DELAY_LIMIT = 1;

private readonly Filesystem $filesystem;

private readonly string $directory;
Expand All @@ -26,47 +22,14 @@ public function __construct(string $directory, ?Filesystem $filesystem = null)
$this->directory = \rtrim($directory, "/\\");
}

public function acquire(string $key, ?Cancellation $cancellation = null): Lock
{
if (!$this->filesystem->isDirectory($this->directory)) {
throw new SyncException(\sprintf('Directory "%s" does not exist or is not a directory', $this->directory));
}

$filename = $this->getFilename($key);

// Try to create the lock file. If the file already exists, someone else
// has the lock, so set an asynchronous timer and try again.
for ($attempt = 0; true; ++$attempt) {
try {
$file = $this->filesystem->openFile($filename, 'x');

// Return a lock object that can be used to release the lock on the mutex.
$lock = new Lock(fn () => $this->release($filename));

$file->close();

return $lock;
} catch (FilesystemException) {
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * (2 ** $attempt)), cancellation: $cancellation);
}
}
}

/**
* Releases the lock on the mutex.
*
* @throws SyncException
*/
private function release(string $filename): void
public function acquire(string $key, ?Cancellation $cancellation = null): Lock
{
try {
$this->filesystem->deleteFile($filename);
} catch (\Throwable $exception) {
throw new SyncException(
'Failed to unlock the mutex file: ' . $filename,
previous: $exception,
);
}
$mutex = new FileMutex($this->getFilename($key), $this->filesystem);

return $mutex->acquire($cancellation);
}

private function getFilename(string $key): string
Expand Down
Loading