Skip to content

Commit

Permalink
GH-2048 Replace local retry mechanism with shared lock (#2050)
Browse files Browse the repository at this point in the history
  • Loading branch information
dzikoysk authored Feb 5, 2024
1 parent 4f58525 commit 1b1cad0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@ import com.reposilite.shared.toErrorResponse
import com.reposilite.storage.api.FileType
import com.reposilite.storage.api.FileType.DIRECTORY
import com.reposilite.storage.api.FileType.FILE
import io.javalin.http.HttpStatus.INTERNAL_SERVER_ERROR
import io.javalin.http.HttpStatus.NO_CONTENT
import panda.std.Result
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import kotlin.io.path.isDirectory
import panda.std.Result

fun Path.type(): FileType =
if (this.isDirectory()) DIRECTORY else FILE

fun Path.inputStream(): Result<InputStream, ErrorResponse> =
Result.`when`(Files.exists(this), this, notFound(""))
.filter({ it.isDirectory().not() }, { NO_CONTENT.toErrorResponse("Requested file is a directory") })
.map { Files.newInputStream(it) }
.flatMap {
Result.supplyThrowing { Files.newInputStream(it) }
.onError { it.printStackTrace() }
.mapErr { INTERNAL_SERVER_ERROR.toErrorResponse("Cannot read file") }
}

internal fun Path.getExtension(): String =
getSimpleName().getExtension()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ import com.reposilite.storage.type
import io.javalin.http.ContentType
import io.javalin.http.ContentType.APPLICATION_OCTET_STREAM
import io.javalin.http.HttpStatus.INSUFFICIENT_STORAGE
import java.io.Closeable
import java.io.File
import java.io.FilterInputStream
import java.io.IOException
import java.io.InputStream
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Files
import java.nio.file.NoSuchFileException
import java.nio.file.Path
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.nio.file.StandardCopyOption
import java.nio.file.attribute.FileTime
import kotlin.io.path.absolutePathString
import java.util.concurrent.Executors
import java.util.concurrent.locks.ReentrantLock
import kotlin.streams.asSequence
import panda.std.Result
import panda.std.asSuccess
Expand All @@ -60,7 +61,40 @@ abstract class FileSystemStorageProvider protected constructor(
val rootDirectory: Path
) : StorageProvider {

override fun shutdown() {}
data class LockedLocation(
val location: Location,
val lock: ReentrantLock = ReentrantLock(),

This comment has been minimized.

Copy link
@ivy-cst

ivy-cst Feb 5, 2024

I would recommend to use a java.util.concurrent.locks.ReentrantReadWriteLock per location.
Then lockManager is not nescessary in my eyes.

This comment has been minimized.

Copy link
@dzikoysk

dzikoysk Feb 5, 2024

Author Owner

We need a possibility to lock a resource as long as there's an open input stream that points to it:

Locking using regular locks is limited to the same thread, otherwise you'll raise an illegal monitor state exception trying to unlock this from some other thread (that called close()). The performance of the locking mechanism itself is not important here.

val updates: Int = 1
)

private val lockManager = Executors.newSingleThreadExecutor()
private val lockedLocations = HashMap<Location, LockedLocation>(16)

This comment has been minimized.

Copy link
@ivy-cst

ivy-cst Feb 5, 2024

Here I would use a 'java.util.concurrent.ConcurrentHashMap' and use the computeIfAbsent method to generate the LockedLocation for a locaton.

This comment has been minimized.

Copy link
@dzikoysk

dzikoysk Feb 5, 2024

Author Owner

As long as we're syncing this via the same thread, there's no need for a concurrent hashmap impl, because it's always called from the same thread. Also, having only computeIfAbsent without the update usage counter (handled by compute) method, we wouldn't be able to make thread-safe cleanups. I guess you made that assumption having in mind the ReentrantReadWriteLock where we can obtain read/write count, as far as I remember - if so, that's a fair point to reconsider this lock.


override fun shutdown() {
lockManager.shutdown()
}

private fun acquireFileAccessLock(location: Location): Closeable =
lockManager.submit<Closeable> {
val lockedLocation = lockedLocations.compute(location) { _, currentLock ->
currentLock?.copy(updates = currentLock.updates + 1) ?: LockedLocation(location)
}!!

lockedLocation.lock.lock()

Closeable {
lockManager.execute {
lockedLocation.lock.unlock()

lockedLocations.compute(location) { _, currentLock ->
when (currentLock!!.updates) {
1 -> null
else -> currentLock.copy(updates = currentLock.updates - 1)
}
}
}
}
}.get()

override fun putFile(location: Location, inputStream: InputStream): Result<Unit, ErrorResponse> =
inputStream.use { data ->
Expand All @@ -83,27 +117,31 @@ abstract class FileSystemStorageProvider protected constructor(
data.copyTo(destination)
}

do {
try {
Files.move(temporaryFile.toPath(), file, REPLACE_EXISTING)
} catch (e: NoSuchFileException) {
// Concurrent Files.move calls may throw
// ~ https://github.com/dzikoysk/reposilite/issues/1975
journalist.logger.debug("[FS][1] Cannot move file ${temporaryFile.absolutePath} to ${file.absolutePathString()}, retrying...")
Thread.sleep(1000) // probably good enough for now
} catch (e: FileAlreadyExistsException) {
// ~ https://github.com/dzikoysk/reposilite/issues/2027
journalist.logger.debug("[FS][2] Cannot move file ${temporaryFile.absolutePath} to ${file.absolutePathString()}, retrying...")
Thread.sleep(1000) // probably good enough for now
}
} while (Files.exists(temporaryFile.toPath()))
acquireFileAccessLock(location).use {
Files.move(temporaryFile.toPath(), file, StandardCopyOption.REPLACE_EXISTING)
Unit
}
}
}

override fun getFile(location: Location): Result<InputStream, ErrorResponse> =
location.resolveWithRootDirectory()
.exists()
.flatMap { it.inputStream() }
.flatMap {
val lock = acquireFileAccessLock(location)

it.inputStream()
.onError { lock.close() }
.map { inputStream ->
object : FilterInputStream(inputStream) {
override fun close() {
lock.use {
inputStream.close()
}
}
}
}
}

override fun getFileDetails(location: Location): Result<out FileDetails, ErrorResponse> =
location.resolveWithRootDirectory()
Expand Down

0 comments on commit 1b1cad0

Please sign in to comment.