Skip to content

Commit

Permalink
Migrate from Akka to Pekko (#26563)
Browse files Browse the repository at this point in the history
* migrate ActorSystem to pekko

* Health mob

Create PekkoActorSystem

Co-Authored-By: Jamie B <[email protected]>
Co-Authored-By: Charlotte Emms <[email protected]>
Co-Authored-By: Ioanna Kokkini <[email protected]>
Co-Authored-By: Ashleigh Carr <[email protected]>
Co-Authored-By: Parisa Tork <[email protected]>

* compiling

* scalafmt

* migrate other usages

* tidy, refactor

* standardise naming

* naming

---------

Co-authored-by: Jamie B <[email protected]>
Co-authored-by: Charlotte Emms <[email protected]>
Co-authored-by: Ioanna Kokkini <[email protected]>
Co-authored-by: Ashleigh Carr <[email protected]>
Co-authored-by: Parisa Tork <[email protected]>
  • Loading branch information
6 people authored Sep 13, 2023
1 parent fa46f7a commit b991611
Show file tree
Hide file tree
Showing 43 changed files with 134 additions and 130 deletions.
6 changes: 3 additions & 3 deletions admin/app/AppLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import conf.switches.SwitchboardLifecycle
import conf.CachedHealthCheckLifeCycle
import controllers.{AdminControllers, HealthCheck}
import _root_.dfp.DfpDataCacheLifecycle
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import concurrent.BlockingOperations
import contentapi.{CapiHttpClient, ContentApiClient, HttpClient}
import http.{AdminFilters, AdminHttpErrorHandler, CommonGzipFilter}
Expand Down Expand Up @@ -36,7 +36,7 @@ class AppLoader extends FrontendApplicationLoader {
trait AdminServices extends I18nComponents {
def wsClient: WSClient
def akkaAsync: AkkaAsync
def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
implicit val executionContext: ExecutionContext
lazy val capiHttpClient: HttpClient = wire[CapiHttpClient]
lazy val contentApiClient = wire[ContentApiClient]
Expand Down Expand Up @@ -101,7 +101,7 @@ trait AppComponents extends FrontendComponents with AdminControllers with AdminS
DfpApiMetrics.DfpApiErrors,
)

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem

override lazy val httpErrorHandler: HttpErrorHandler = wire[AdminHttpErrorHandler]
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonGzipFilter].filters ++ wire[AdminFilters].filters
Expand Down
2 changes: 1 addition & 1 deletion admin/test/dfp/DfpApiValidationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import concurrent.BlockingOperations
import common.dfp.{GuAdUnit, GuLineItem, GuTargeting, Sponsorship}
import com.google.api.ads.admanager.axis.v202308._
import org.joda.time.DateTime
import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down
6 changes: 3 additions & 3 deletions admin/test/services/ParameterStoreServiceTest.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package services

import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import concurrent.BlockingOperations
import org.scalatest.concurrent.ScalaFutures
import org.mockito.Mockito._
Expand All @@ -11,8 +11,8 @@ import org.scalatestplus.mockito.MockitoSugar
class ParameterStoreServiceTest extends AnyFlatSpec with ScalaFutures with Matchers with MockitoSugar {

"findParameterBySubstring" should "retrieve a parameter from the parameter store by substring" in {
val actorSystem = ActorSystem()
val blockingOperations = new BlockingOperations(actorSystem)
val pekkoActorSystem = PekkoActorSystem()
val blockingOperations = new BlockingOperations(pekkoActorSystem)

val parameterStore = mock[ParameterStore]
when(parameterStore.getPath("/frontend", isRecursiveSearch = true)) thenReturn Map(
Expand Down
4 changes: 2 additions & 2 deletions applications/app/AppLoader.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import app.{FrontendApplicationLoader, FrontendBuildInfo, FrontendComponents}
import com.softwaremill.macwire._
import common.dfp.DfpAgentLifecycle
Expand Down Expand Up @@ -97,5 +97,5 @@ trait AppComponents extends FrontendComponents with ApplicationsControllers with
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonFilters].filters
override lazy val httpRequestHandler: HttpRequestHandler = wire[DevParametersHttpRequestHandler]

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
}
5 changes: 2 additions & 3 deletions archive/app/AppLoader.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import http.{CommonFilters, CorsHttpErrorHandler}
import app.{FrontendApplicationLoader, FrontendBuildInfo, FrontendComponents}
import com.softwaremill.macwire._
Expand Down Expand Up @@ -49,6 +49,5 @@ trait AppComponents extends FrontendComponents {
override lazy val httpErrorHandler: HttpErrorHandler = wire[CorsHttpErrorHandler]
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonFilters].filters
override lazy val httpRequestHandler: HttpRequestHandler = wire[DevParametersHttpRequestHandler]

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
}
4 changes: 2 additions & 2 deletions article/app/AppLoader.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import _root_.commercial.targeting.TargetingLifecycle
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import app.{FrontendApplicationLoader, FrontendBuildInfo, FrontendComponents}
import com.softwaremill.macwire._
import common.Assets.DiscussionExternalAssetsLifecycle
Expand Down Expand Up @@ -98,5 +98,5 @@ trait AppComponents extends FrontendComponents with ArticleControllers with Topi
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonFilters].filters
override lazy val httpRequestHandler: HttpRequestHandler = wire[DevParametersHttpRequestHandler]

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package services

import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import app.LifecycleComponent
import common.AutoRefresh
import model.{TagDefinition, TagIndexListings}
Expand All @@ -12,7 +12,7 @@ import scala.language.postfixOps
class NewspaperBooksAndSectionsAutoRefresh(
newspaperBookSectionTagAgent: NewspaperBookSectionTagAgent,
newspaperBookTagAgent: NewspaperBookTagAgent,
)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext)
)(implicit pekkoActorSystem: PekkoActorSystem, executionContext: ExecutionContext)
extends LifecycleComponent {
override def start(): Unit = {
newspaperBookTagAgent.start()
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ val common = library("common")
identityModel,
capiAws,
okhttp,
pekkoActor,
pekkoStream,
) ++ jackson,
TestAssets / mappings ~= filterAssets,
)
Expand Down
6 changes: 3 additions & 3 deletions commercial/app/AppLoader.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import app.{FrontendApplicationLoader, FrontendBuildInfo, FrontendComponents}
import com.softwaremill.macwire._
import commercial.CommercialLifecycle
Expand Down Expand Up @@ -35,7 +35,7 @@ class AppLoader extends FrontendApplicationLoader {

trait CommercialServices {
def wsClient: WSClient
def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
implicit val executionContext: ExecutionContext

lazy val magentoService = wire[MagentoService]
Expand Down Expand Up @@ -79,5 +79,5 @@ trait AppComponents extends FrontendComponents with CommercialControllers with C
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonFilters].filters
override lazy val httpRequestHandler: HttpRequestHandler = wire[DevParametersHttpRequestHandler]

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
}
16 changes: 8 additions & 8 deletions commercial/app/model/merchandise/books/BookFinder.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package commercial.model.merchandise.books

import akka.actor.ActorSystem
import akka.pattern.CircuitBreaker
import akka.util.Timeout
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import org.apache.pekko.pattern.CircuitBreaker
import org.apache.pekko.util.Timeout
import commercial.model.feeds.{FeedParseException, FeedReadException, FeedReader, FeedRequest}
import commercial.model.merchandise.Book
import common.{Box, GuLogging}
Expand All @@ -15,10 +15,10 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

class BookFinder(actorSystem: ActorSystem, magentoService: MagentoService) extends GuLogging {
class BookFinder(pekkoActorSystem: PekkoActorSystem, magentoService: MagentoService) extends GuLogging {

private implicit val bookActorExecutionContext: ExecutionContext =
actorSystem.dispatchers.lookup("akka.actor.book-lookup")
pekkoActorSystem.dispatchers.lookup("akka.actor.book-lookup")
private implicit val bookActorTimeout: Timeout = 0.2.seconds
private implicit val magentoServiceImplicit = magentoService

Expand Down Expand Up @@ -48,7 +48,7 @@ object BookAgent extends GuLogging {
}
}

class MagentoService(actorSystem: ActorSystem, wsClient: WSClient) extends GuLogging {
class MagentoService(pekkoActorSystem: PekkoActorSystem, wsClient: WSClient) extends GuLogging {

private case class MagentoProperties(oauth: WSSignatureCalculator, urlPrefix: String)

Expand All @@ -72,10 +72,10 @@ class MagentoService(actorSystem: ActorSystem, wsClient: WSClient) extends GuLog
}

private implicit val bookLookupExecutionContext: ExecutionContext =
actorSystem.dispatchers.lookup("akka.actor.book-lookup")
pekkoActorSystem.dispatchers.lookup("akka.actor.book-lookup")

private final val circuitBreaker = new CircuitBreaker(
scheduler = actorSystem.scheduler,
scheduler = pekkoActorSystem.scheduler,
maxFailures = 5,
callTimeout = 3.seconds,
resetTimeout = 5.minutes,
Expand Down
8 changes: 6 additions & 2 deletions common/app/app/FrontendApplicationLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import play.api.routing.Router
import play.filters.csrf.CSRFComponents
import controllers.AssetsComponents
import play.api.{Application, ApplicationLoader, BuiltInComponents, LoggerConfigurator, OptionalDevContext}
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}

trait FrontendApplicationLoader extends ApplicationLoader {

Expand All @@ -36,10 +37,13 @@ trait FrontendComponents

lazy val prefix = "/"

implicit lazy val as = actorSystem
implicit val pekkoActorSystem: PekkoActorSystem = PekkoActorSystem.create()
applicationLifecycle.addStopHook(() => {
pekkoActorSystem.terminate()
})

lazy val jobScheduler = new JobScheduler(appContext)
lazy val akkaAsync = new AkkaAsync(environment, actorSystem)
lazy val akkaAsync = new AkkaAsync(environment, pekkoActorSystem)
lazy val appMetrics = ApplicationMetrics()
lazy val guardianConf = new GuardianConfiguration
lazy val mode = environment.mode
Expand Down
6 changes: 3 additions & 3 deletions common/app/common/AutoRefresh.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package common

import scala.concurrent.duration.FiniteDuration
import akka.actor.{ActorSystem, Cancellable}
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem, Cancellable}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -35,9 +35,9 @@ abstract class AutoRefresh[A](initialDelay: FiniteDuration, interval: FiniteDura
}
}

final def start()(implicit actorSystem: ActorSystem, executionContext: ExecutionContext): Unit = {
final def start()(implicit pekkoActorSystem: PekkoActorSystem, executionContext: ExecutionContext): Unit = {
log.info(s"Starting refresh cycle after $initialDelay repeatedly over $interval delay")
val cancellable = actorSystem.scheduler.scheduleWithFixedDelay(initialDelay, interval) { new Task() }
val cancellable = pekkoActorSystem.scheduler.scheduleWithFixedDelay(initialDelay, interval) { new Task() }
subscription = Some(cancellable)
}

Expand Down
12 changes: 6 additions & 6 deletions common/app/common/Logback/KinesisAdapter.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package common.Logback

import java.util.concurrent.ThreadPoolExecutor
import akka.actor.ActorSystem
import akka.dispatch.MessageDispatcher
import akka.pattern.CircuitBreaker
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import org.apache.pekko.dispatch.MessageDispatcher
import org.apache.pekko.pattern.CircuitBreaker
import ch.qos.logback.classic.spi.ILoggingEvent
import com.amazonaws.ClientConfiguration
import com.amazonaws.auth.AWSCredentialsProvider
Expand All @@ -16,16 +16,16 @@ import scala.concurrent.Future
import scala.annotation.nowarn

// LogbackOperationsPool must be wired as a singleton
class LogbackOperationsPool(val actorSystem: ActorSystem) {
val logbackOperations: MessageDispatcher = actorSystem.dispatchers.lookup("akka.logback-operations")
class LogbackOperationsPool(val pekkoActorSystem: PekkoActorSystem) {
val logbackOperations: MessageDispatcher = pekkoActorSystem.dispatchers.lookup("akka.logback-operations")
}

// The KinesisAppender[ILoggingEvent] blocks logging operations on putMessage. This overrides the KinesisAppender api, executing putMessage in an
// independent threadpool
class SafeBlockingKinesisAppender(logbackOperations: LogbackOperationsPool) extends KinesisAppender[ILoggingEvent] {

private val breaker = new CircuitBreaker(
logbackOperations.actorSystem.scheduler,
logbackOperations.pekkoActorSystem.scheduler,
maxFailures = 1,
callTimeout = 1.seconds,
resetTimeout = 10.seconds,
Expand Down
8 changes: 4 additions & 4 deletions common/app/common/akka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import scala.concurrent.duration._
import play.api.{Environment => PlayEnv, Mode}

import scala.concurrent.ExecutionContext
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}

class AkkaAsync(env: PlayEnv, actorSystem: ActorSystem) {
implicit val ec: ExecutionContext = actorSystem.dispatcher
class AkkaAsync(env: PlayEnv, pekkoActorSystem: PekkoActorSystem) {
implicit val ec: ExecutionContext = pekkoActorSystem.dispatcher

// "apply" isn't expressive and doesn't explain what it does.
// If you were considering using that function, use after1s instead as it doesn't leave any ambiguity.
Expand All @@ -20,6 +20,6 @@ class AkkaAsync(env: PlayEnv, actorSystem: ActorSystem) {
// want to check in
def after(delay: FiniteDuration)(body: => Unit): Unit =
if (env.mode != Mode.Test) {
actorSystem.scheduler.scheduleOnce(delay)(body)
pekkoActorSystem.scheduler.scheduleOnce(delay)(body)
}
}
2 changes: 1 addition & 1 deletion common/app/common/package.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package common

import java.util.concurrent.TimeoutException
import akka.pattern.CircuitBreakerOpenException
import com.gu.contentapi.client.model.ContentApiError
import com.gu.contentapi.client.model.v1.ErrorResponse
import conf.switches.Switch
Expand All @@ -17,6 +16,7 @@ import play.twirl.api.Html
import model.ApplicationContext
import http.ResultWithPreconnectPreload
import http.HttpPreconnections
import org.apache.pekko.pattern.CircuitBreakerOpenException
import renderers.{DCRLocalConnectException, DCRTimeoutException}

object `package`
Expand Down
8 changes: 4 additions & 4 deletions common/app/concurrent/BlockingOperations.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package concurrent

import akka.actor.ActorSystem
import akka.dispatch.MessageDispatcher
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import org.apache.pekko.dispatch.MessageDispatcher

import scala.concurrent.Future

class BlockingOperations(actorSystem: ActorSystem) {
private val blockingOperations: MessageDispatcher = actorSystem.dispatchers.lookup("akka.blocking-operations")
class BlockingOperations(pekkoActorSystem: PekkoActorSystem) {
private val blockingOperations: MessageDispatcher = pekkoActorSystem.dispatchers.lookup("akka.blocking-operations")

def executeBlocking[T](block: => T): Future[T] = {
Future(block)(blockingOperations)
Expand Down
6 changes: 3 additions & 3 deletions common/app/concurrent/CircuitBreakerRegistry.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package concurrent

import akka.actor.ActorSystem
import akka.pattern.CircuitBreaker
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import org.apache.pekko.pattern.CircuitBreaker
import common.GuLogging

import scala.concurrent.ExecutionContext.Implicits.global
Expand All @@ -11,7 +11,7 @@ object CircuitBreakerRegistry extends GuLogging {

def withConfig(
name: String,
system: ActorSystem,
system: PekkoActorSystem,
maxFailures: Int,
callTimeout: FiniteDuration,
resetTimeout: FiniteDuration,
Expand Down
4 changes: 2 additions & 2 deletions common/app/contentapi/ContentApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package contentapi

import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import com.github.nscala_time.time.Implicits._
import com.gu.contentapi.client.model._
import com.gu.contentapi.client.model.v1.{Edition => _, _}
Expand Down Expand Up @@ -136,7 +136,7 @@ final case class CircuitBreakingContentApiClient(

private[this] val circuitBreaker = CircuitBreakerRegistry.withConfig(
name = "content-api-client",
system = ActorSystem("content-api-client-circuit-breaker"),
system = PekkoActorSystem("content-api-client-circuit-breaker"),
maxFailures = contentApi.circuitBreakerErrorThreshold,
callTimeout = contentApi.timeout + Duration
.create(400, MILLISECONDS), // +400 to differentiate between circuit breaker and capi timeouts
Expand Down
4 changes: 2 additions & 2 deletions common/app/renderers/DotcomRenderingService.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package renderers

import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import com.gu.contentapi.client.model.v1.{Block, Blocks, Content}
import common.{DCRMetrics, GuLogging}
import concurrent.CircuitBreakerRegistry
Expand Down Expand Up @@ -49,7 +49,7 @@ class DotcomRenderingService extends GuLogging with ResultWithPreconnectPreload

private[this] val circuitBreaker = CircuitBreakerRegistry.withConfig(
name = "dotcom-rendering-client",
system = ActorSystem("dotcom-rendering-client-circuit-breaker"),
system = PekkoActorSystem("dotcom-rendering-client-circuit-breaker"),
maxFailures = Configuration.rendering.circuitBreakerMaxFailures,
callTimeout = Configuration.rendering.timeout.plus(200.millis),
resetTimeout = Configuration.rendering.timeout * 4,
Expand Down
2 changes: 1 addition & 1 deletion common/app/services/ConfigAgentTrait.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package services

import akka.util.Timeout
import app.LifecycleComponent
import com.gu.facia.api.models.{Front, _}
import com.gu.facia.client.ApiClient
Expand All @@ -11,6 +10,7 @@ import conf.Configuration
import fronts.FrontsApi
import model.pressed.CollectionConfig
import model.{ApplicationContext, FrontProperties, SeoDataJson}
import org.apache.pekko.util.Timeout
import play.api.inject.ApplicationLifecycle
import play.api.libs.json.Json

Expand Down
Loading

0 comments on commit b991611

Please sign in to comment.