Skip to content

Commit

Permalink
Speed-up republish artifacts
Browse files Browse the repository at this point in the history
  • Loading branch information
adpi2 committed Aug 21, 2024
1 parent 9ebf572 commit d63478c
Show file tree
Hide file tree
Showing 14 changed files with 102 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ trait SchedulerDatabase extends WebDatabase {
def updateArtifacts(artifacts: Seq[Artifact], newRef: Project.Reference): Future[Int]
def updateArtifactReleaseDate(reference: MavenReference, releaseDate: Instant): Future[Int]
def getAllGroupIds(): Future[Seq[Artifact.GroupId]]
def getAllArtifactIds(ref: Project.Reference): Future[Seq[(Artifact.GroupId, String)]]
def getArtifactIds(ref: Project.Reference): Future[Seq[(Artifact.GroupId, String)]]
def getAllMavenReferences(): Future[Seq[Artifact.MavenReference]]
def getMavenReferences(ref: Project.Reference): Future[Seq[Artifact.MavenReference]]
def getDependencies(projectRef: Project.Reference): Future[Seq[ArtifactDependency]]
def updateLatestVersion(ref: MavenReference): Future[Unit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class InMemoryDatabase extends SchedulerDatabase {
override def updateArtifacts(allArtifacts: Seq[Artifact], newRef: Project.Reference): Future[Int] = ???
override def getAllGroupIds(): Future[Seq[Artifact.GroupId]] = ???
override def getAllMavenReferences(): Future[Seq[Artifact.MavenReference]] = ???

override def getMavenReferences(ref: Project.Reference): Future[Seq[Artifact.MavenReference]] = ???
override def insertUser(userId: UUID, userInfo: UserInfo): Future[Unit] = ???
override def updateUser(userId: UUID, userInfo: UserState): Future[Unit] = ???
override def getUser(userId: UUID): Future[Option[UserState]] = ???
Expand Down Expand Up @@ -208,7 +210,7 @@ class InMemoryDatabase extends SchedulerDatabase {
Future.successful(res)
}

override def getAllArtifactIds(ref: Project.Reference): Future[Seq[(Artifact.GroupId, String)]] =
override def getArtifactIds(ref: Project.Reference): Future[Seq[(Artifact.GroupId, String)]] =
Future.successful(allArtifacts(ref).map(a => (a.groupId, a.artifactId)).toSeq)

override def updateLatestVersion(ref: Artifact.MavenReference): Future[Unit] =
Expand Down
20 changes: 8 additions & 12 deletions modules/data/src/main/scala/scaladex/data/maven/PomsReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,12 @@ class PomsReader(resolver: PomResolver) extends LazyLogging {
private val jdk = new Properties
jdk.setProperty("java.version", "1.8") // << ???

def loadOne(path: Path): Try[(ArtifactModel, String)] = {
val sha1 = path.getFileName.toString.stripSuffix(".pom")

Try {
val request = (new DefaultModelBuildingRequest)
.setModelResolver(modelResolver)
.setSystemProperties(jdk)
.setPomFile(path.toFile)

builder.build(request).getEffectiveModel
}.map(pom => (PomConvert(pom), sha1))
}
def loadOne(path: Path): Try[ArtifactModel] = Try {
val request = (new DefaultModelBuildingRequest)
.setModelResolver(modelResolver)
.setSystemProperties(jdk)
.setPomFile(path.toFile)

builder.build(request).getEffectiveModel
}.map(pom => PomConvert(pom))
}
6 changes: 4 additions & 2 deletions modules/infra/src/main/scala/scaladex/infra/SqlDatabase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,14 @@ class SqlDatabase(datasource: HikariDataSource, xa: doobie.Transactor[IO]) exten
override def getAllGroupIds(): Future[Seq[Artifact.GroupId]] =
run(ArtifactTable.selectGroupIds.to[Seq])

override def getAllArtifactIds(ref: Project.Reference): Future[Seq[(Artifact.GroupId, String)]] =
override def getArtifactIds(ref: Project.Reference): Future[Seq[(Artifact.GroupId, String)]] =
run(ArtifactTable.selectArtifactIds.to[Seq](ref))

override def getAllMavenReferences(): Future[Seq[Artifact.MavenReference]] =
run(ArtifactTable.selectMavenReference.to[Seq])
run(ArtifactTable.selectAllMavenReferences.to[Seq])

override def getMavenReferences(ref: Project.Reference): Future[Seq[Artifact.MavenReference]] =
run(ArtifactTable.selectMavenReferences.to[Seq](ref))
override def insertUser(userId: UUID, userInfo: UserInfo): Future[Unit] =
run(UserSessionsTable.insert.run((userId, userInfo)).map(_ => ()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ object ArtifactTable {
val selectArtifactIds: Query[Project.Reference, (Artifact.GroupId, String)] =
selectRequest(table, Seq("DISTINCT group_id", "artifact_id"), keys = projectReferenceFields)

val selectMavenReference: Query0[Artifact.MavenReference] =
val selectAllMavenReferences: Query0[Artifact.MavenReference] =
selectRequest(table, Seq("DISTINCT group_id", "artifact_id", "\"version\""))

val selectMavenReferences: Query[Project.Reference, Artifact.MavenReference] =
selectRequest(table, Seq("DISTINCT group_id", "artifact_id", "\"version\""), keys = projectReferenceFields)

val selectMavenReferenceWithNoReleaseDate: Query0[Artifact.MavenReference] =
selectRequest(table, Seq("group_id", "artifact_id", "\"version\""), where = Seq("release_date is NULL"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class ArtifactTableTests extends AnyFunSpec with BaseDatabaseSuite with Matchers
it("check selectOldestByProject")(check(selectOldestByProject))
it("check updateProjectRef")(check(updateProjectRef))
it("check selectGroupIds")(check(selectGroupIds))
it("check selectMavenReference")(check(selectMavenReference))
it("check selectAllMavenReferences")(check(selectAllMavenReferences))
it("check selectMavenReferences")(check(selectMavenReferences))
it("check updateReleaseDate")(check(updateReleaseDate))
it("check selectByMavenReference")(check(selectByMavenReference))
it("check countVersionsByProject")(check(countVersionsByProject))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class AdminPage(env: Env, adminService: AdminService) {
}
} ~
post {
path("tasks" / Task.updateMavenArtifacts.name) {
adminService.updateMavenArtifacts(user)
path("tasks" / Task.republishArtifacts.name) {
adminService.republishArtifacts(user)
redirect(Uri("/admin"), StatusCodes.SeeOther)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ class AdminService(
tasks = tasks :+ task
}

def updateMavenArtifacts(user: UserState): Unit = {
val task = TaskRunner.run(Task.updateMavenArtifacts, user.info.login, input = Seq.empty) { () =>
sonatypeSynchronizer.updateAllArtifacts()
def republishArtifacts(user: UserState): Unit = {
val task = TaskRunner.run(Task.republishArtifacts, user.info.login, input = Seq.empty) { () =>
sonatypeSynchronizer.republishArtifacts()
}
tasks = tasks :+ task
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ArtifactService(database: SchedulerDatabase)(implicit ec: ExecutionContext

def updateLatestVersions(ref: Project.Reference, preferStableVersion: Boolean): Future[Int] =
for {
artifactIds <- database.getAllArtifactIds(ref)
artifactIds <- database.getArtifactIds(ref)
_ <- artifactIds.mapSync {
case (groupId, artifactId) => updateLatestVersion(groupId, artifactId, preferStableVersion)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class GithubUpdater(database: WebDatabase, github: GithubClient)(implicit ec: Ex

case GithubResponse.MovedPermanently((destination, info)) =>
val status = GithubStatus.Moved(now, destination)
logger.info(s"$repo moved to $status")
logger.info(s"$repo moved to $destination")
database.moveProject(repo, info, status).map(_ => status)

case GithubResponse.Failed(code, reason) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import java.time.Instant

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Try

import com.typesafe.scalalogging.LazyLogging
import org.apache.pekko.actor.ActorSystem
import scaladex.core.model.Artifact
import scaladex.core.model.Env
import scaladex.core.model.Project
import scaladex.core.model.Sha1
Expand Down Expand Up @@ -49,17 +51,12 @@ class PublishProcess(
userState: Option[UserState]
): Future[PublishResult] = {
logger.info(s"Publishing POM $path")
val sha1 = Sha1(data)
val tempFile = filesystem.createTempFile(data, sha1, ".pom")
val future =
Future(pomsReader.loadOne(tempFile).get)
.flatMap { case (pom, _) => publishPom(pom, creationDate, userState) }
.recover { cause =>
logger.error(s"Invalid POM $path", cause)
PublishResult.InvalidPom
}
future.onComplete(_ => filesystem.deleteTempFile(tempFile))
future
Future(loadPom(data).get)
.flatMap(publishPom(_, creationDate, userState))
.recover { cause =>
logger.error(s"Invalid POM $path", cause)
PublishResult.InvalidPom
}
}

private def publishPom(
Expand All @@ -74,7 +71,7 @@ class PublishProcess(
Future.successful(PublishResult.NoGithubRepo)
case Some(repo) =>
// userState can be empty when the request of publish is done through the scheduler
if (userState.isEmpty || userState.get.hasPublishingAuthority(env) || userState.get.repos.contains(repo)) {
if (userState.forall(userState => userState.hasPublishingAuthority(env) || userState.repos.contains(repo))) {
converter.convert(pom, repo, creationDate) match {
case Some((artifact, deps)) =>
for {
Expand All @@ -98,6 +95,38 @@ class PublishProcess(
}
}
}

def republishPom(
repo: Project.Reference,
ref: Artifact.MavenReference,
data: String,
creationDate: Instant
): Future[PublishResult] =
Future(loadPom(data).get)
.flatMap { pom =>
converter.convert(pom, repo, creationDate).map(_._1) match {
case Some(artifact) if artifact.mavenReference == ref =>
database.insertArtifact(artifact).map(_ => PublishResult.Success)
case Some(artifact) =>
logger.error(s"Unexpected ref ${artifact.mavenReference}")
Future.successful(PublishResult.InvalidPom)
case None =>
logger.warn(s"Cannot convert $ref to valid Scala artifact.")
Future.successful(PublishResult.InvalidPom)
}
}
.recover { cause =>
logger.warn(s"Invalid POM $ref", cause)
PublishResult.InvalidPom
}

private def loadPom(data: String): Try[ArtifactModel] = {
val sha1 = Sha1(data)
val tempFile = filesystem.createTempFile(data, sha1, ".pom")
val res = pomsReader.loadOne(tempFile)
filesystem.deleteTempFile(tempFile)
res
}
}

object PublishProcess {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import scala.concurrent.Future
import com.typesafe.scalalogging.LazyLogging
import scaladex.core.model.Artifact
import scaladex.core.model.Artifact._
import scaladex.core.model.Project
import scaladex.core.service.SchedulerDatabase
import scaladex.core.service.SonatypeClient
import scaladex.core.util.ScalaExtensions._
Expand Down Expand Up @@ -84,19 +85,31 @@ class SonatypeService(
result <- findAndIndexMissingArtifacts(groupId, artifactNameOpt, mavenReferenceFromDatabase.toSet)
} yield s"Inserted $result poms"

def updateAllArtifacts(): Future[String] =
def republishArtifacts(): Future[String] =
for {
mavenReferences <- database.getAllMavenReferences()
_ = logger.info(s"${mavenReferences.size} artifacts will be synced for new metadata.")
publishResult <- mavenReferences.mapSync(updateArtifact)
successCount = publishResult.count(_ == PublishResult.Success)
failedCount = publishResult.size - successCount
} yield s"Synced $successCount poms, while $failedCount poms failed to update."
projectStatuses <- database.getAllProjectsStatuses()
refs = projectStatuses.collect { case (ref, status) if status.isOk || status.isUnknown || status.isFailed => ref }
counts <- refs.mapSync(republishArtifacts)
} yield {
val successes = counts.map(_._1).sum
val failures = counts.map(_._2).sum
s"Re-published $successes artifacts ($failures failures)."
}

private def republishArtifacts(projectRef: Project.Reference): Future[(Int, Int)] =
for {
mavenReferences <- database.getMavenReferences(projectRef)
publishResult <- mavenReferences.mapSync(republishArtifact(projectRef, _))
} yield {
val successes = publishResult.count(_ == PublishResult.Success)
val failures = publishResult.size - successes
logger.info(s"Re-published $successes artifacts of $projectRef ($failures failures)")
(successes, failures)
}

private def updateArtifact(ref: MavenReference): Future[PublishResult] =
sonatypeService.getPomFile(ref).map(ref -> _).flatMap {
case (mavenRef, Some((pomFile, creationDate))) =>
publishProcess.publishPom(mavenRef.toString(), pomFile, creationDate, None)
case _ => Future.successful(PublishResult.InvalidPom)
private def republishArtifact(projectRef: Project.Reference, ref: MavenReference): Future[PublishResult] =
sonatypeService.getPomFile(ref).flatMap {
case Some((pomFile, creationDate)) => publishProcess.republishPom(projectRef, ref, pomFile, creationDate)
case _ => Future.successful(PublishResult.InvalidPom)
}
}
6 changes: 3 additions & 3 deletions modules/template/src/main/scala/scaladex/view/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ object Task {
"Update the Github info of an existing project"
)

val updateMavenArtifacts: Task = Task(
"update-maven-artifact",
"Download all pom files to update existing artifacts with new fields"
val republishArtifacts: Task = Task(
"republish-maven-artifact",
"Re-download pom files of known artifacts to extract new fields"
)

case class Status(name: String, user: String, start: Instant, input: Seq[(String, String)], state: State) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ <h5>@Task.updateGithubInfo.name</h5>
</tr>
<tr>
<td>
<h5>@Task.updateMavenArtifacts.name</h5>
<p>@Task.updateMavenArtifacts.description</p>
<h5>@Task.republishArtifacts.name</h5>
<p>@Task.republishArtifacts.description</p>
</td>
<td></td>
<form action="/admin/tasks/@Task.updateMavenArtifacts.name" method="POST">
<form action="/admin/tasks/@Task.republishArtifacts.name" method="POST">
<td><button type="submit" class="btn btn-primary">Submit</button></td>
</form>
</tr>
Expand Down

0 comments on commit d63478c

Please sign in to comment.