Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add notifications related to currently displayed scenario #7184

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.ui.api
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.ui.api.description.NotificationApiEndpoints
import pl.touk.nussknacker.ui.notifications.NotificationService
import pl.touk.nussknacker.ui.notifications.NotificationService.NotificationsScope
import pl.touk.nussknacker.ui.security.api.AuthManager

import scala.concurrent.ExecutionContext
Expand All @@ -22,8 +23,14 @@ class NotificationApiHttpService(
notificationApiEndpoints.notificationEndpoint
.serverSecurityLogic(authorizeKnownUser[Unit])
.serverLogic { implicit loggedUser => processNameOpt =>
val scope = processNameOpt match {
case Some(processName) =>
NotificationsScope.NotificationsForLoggedUserAndScenario(loggedUser, processName)
case None =>
NotificationsScope.NotificationsForLoggedUser(loggedUser)
}
notificationService
.notifications(processNameOpt)
.notifications(scope)
.map { notificationList => success(notificationList) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepo
ModifyCommentError
}
import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, DeploymentComment}
import pl.touk.nussknacker.ui.process.scenarioactivity.ScenarioActivityService
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.process.{ProcessService, ScenarioAttachmentService}
import pl.touk.nussknacker.ui.security.api.{AuthManager, LoggedUser}
import pl.touk.nussknacker.ui.server.HeadersSupport.ContentDisposition
Expand All @@ -39,7 +39,7 @@ import scala.concurrent.{ExecutionContext, Future}

class ScenarioActivityApiHttpService(
authManager: AuthManager,
scenarioActivityService: ScenarioActivityService,
scenarioActivityService: FetchScenarioActivityService,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename parameter as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

scenarioActivityRepository: ScenarioActivityRepository,
scenarioService: ProcessService,
scenarioAuthorizer: AuthorizeProcess,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package pl.touk.nussknacker.ui.notifications

import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.ui.notifications.NotificationService.NotificationsScope
import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, ScenarioActionRepository}
import pl.touk.nussknacker.ui.process.scenarioactivity.ScenarioActivityService
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.util.ScenarioActivityUtils.ScenarioActivityOps

Expand All @@ -15,41 +16,58 @@ final case class NotificationConfig(duration: FiniteDuration)

trait NotificationService {

def notifications(processName: Option[ProcessName])(
implicit user: LoggedUser,
ec: ExecutionContext
def notifications(scope: NotificationsScope)(
implicit ec: ExecutionContext
): Future[List[Notification]]

}

object NotificationService {

sealed trait NotificationsScope

object NotificationsScope {

final case class NotificationsForLoggedUser(
user: LoggedUser,
) extends NotificationsScope

final case class NotificationsForLoggedUserAndScenario(
user: LoggedUser,
processName: ProcessName,
) extends NotificationsScope

}

}

class NotificationServiceImpl(
scenarioActivityService: ScenarioActivityService,
scenarioActivityService: FetchScenarioActivityService,
scenarioActionRepository: ScenarioActionRepository,
dbioRunner: DBIOActionRunner,
config: NotificationConfig,
clock: Clock = Clock.systemUTC()
) extends NotificationService {

override def notifications(
processNameOpt: Option[ProcessName]
)(implicit user: LoggedUser, ec: ExecutionContext): Future[List[Notification]] = {
scope: NotificationsScope,
)(implicit ec: ExecutionContext): Future[List[Notification]] = {
val now = clock.instant()
val limit = now.minusMillis(config.duration.toMillis)
processNameOpt match {
case Some(processName) =>
scope match {
case NotificationsScope.NotificationsForLoggedUser(user) =>
notificationsForUserActions(user, limit)
case NotificationsScope.NotificationsForLoggedUserAndScenario(user, processName) =>
for {
notificationsForUserActions <- notificationsForUserActions(limit)
notificationsForScenarioActivities <- notificationsForScenarioActivities(processName, limit)
notificationsForUserActions <- notificationsForUserActions(user, limit)
notificationsForScenarioActivities <- notificationsForScenarioActivities(user, processName, limit)
} yield notificationsForUserActions ++ notificationsForScenarioActivities
case None =>
notificationsForUserActions(limit)
}

}

private def notificationsForUserActions(limit: Instant)(
implicit user: LoggedUser,
ec: ExecutionContext
private def notificationsForUserActions(user: LoggedUser, limit: Instant)(
implicit ec: ExecutionContext
): Future[List[Notification]] = dbioRunner.run {
scenarioActionRepository
.getUserActionsAfter(
Expand All @@ -76,12 +94,11 @@ class NotificationServiceImpl(
})
}

private def notificationsForScenarioActivities(processName: ProcessName, limit: Instant)(
implicit user: LoggedUser,
ec: ExecutionContext
private def notificationsForScenarioActivities(user: LoggedUser, processName: ProcessName, limit: Instant)(
implicit ec: ExecutionContext
): Future[List[Notification]] = {
for {
allActivities <- scenarioActivityService.fetchActivities(processName, Some(limit)).value.map {
allActivities <- scenarioActivityService.fetchActivities(processName, Some(limit))(user).value.map {
case Right(activities) => activities
case Left(_) => List.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@ import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository
import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, FetchingProcessRepository}
import pl.touk.nussknacker.ui.process.scenarioactivity.ScenarioActivityService.ScenarioActivityFetchError
import pl.touk.nussknacker.ui.process.scenarioactivity.ScenarioActivityService.ScenarioActivityFetchError.NoScenario
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService.ScenarioActivityFetchError
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService.ScenarioActivityFetchError.NoScenario
import pl.touk.nussknacker.ui.security.api.LoggedUser

import java.time.Instant
import scala.concurrent.{ExecutionContext, Future}
import scala.math.Ordered.orderingToOrdered

class ScenarioActivityService(
// TODO: In current implementation, we do not have a single ScenarioActivity-related service.
// - there is repository, that also encapsulates some very basic logic of validations/conversions
// - there is a logging decorator for that repository
// - the logic of fetching activities is used in multiple places, so it is encapsulated in this FetchScenarioActivityService
// - a complete ScenarioActivityService should be implemented, that would encapsulate logic from this service, and from repository and its decorator
class FetchScenarioActivityService(
deploymentManagerDispatcher: DeploymentManagerDispatcher,
scenarioActivityRepository: ScenarioActivityRepository,
fetchingProcessRepository: FetchingProcessRepository[Future],
Expand Down Expand Up @@ -58,7 +63,7 @@ class ScenarioActivityService(

}

object ScenarioActivityService {
object FetchScenarioActivityService {
sealed trait ScenarioActivityFetchError

object ScenarioActivityFetchError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import pl.touk.nussknacker.ui.process.processingtype.loader.ProcessingTypeDataLo
import pl.touk.nussknacker.ui.process.processingtype.provider.ReloadableProcessingTypeDataProvider
import pl.touk.nussknacker.ui.process.repository._
import pl.touk.nussknacker.ui.process.repository.activities.{DbScenarioActivityRepository, ScenarioActivityRepository}
import pl.touk.nussknacker.ui.process.scenarioactivity.ScenarioActivityService
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.process.test.{PreliminaryScenarioTestDataSerDe, ScenarioTestService}
import pl.touk.nussknacker.ui.process.version.{ScenarioGraphVersionRepository, ScenarioGraphVersionService}
import pl.touk.nussknacker.ui.processreport.ProcessCounter
Expand Down Expand Up @@ -314,7 +314,7 @@ class AkkaHttpBasedRouteProvider(
processService,
fragmentRepository
)
val scenarioActivityService = new ScenarioActivityService(
val scenarioActivityService = new FetchScenarioActivityService(
dmDispatcher,
scenarioActivityRepository,
futureProcessRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory}
import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues
import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, PatientScalaFutures}
import pl.touk.nussknacker.ui.listener.ProcessChangeListener
import pl.touk.nussknacker.ui.notifications.NotificationService.NotificationsScope
import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions._
import pl.touk.nussknacker.ui.process.deployment._
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
Expand All @@ -38,7 +39,7 @@ import pl.touk.nussknacker.ui.process.repository.{
DbScenarioActionRepository,
ScenarioWithDetailsEntity
}
import pl.touk.nussknacker.ui.process.scenarioactivity.ScenarioActivityService
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.validation.UIProcessValidator

Expand Down Expand Up @@ -75,7 +76,7 @@ class NotificationServiceTest
processRepository,
)

private val scenarioActivityService = new ScenarioActivityService(
private val scenarioActivityService = new FetchScenarioActivityService(
deploymentManagerDispatcher = dmDispatcher,
scenarioActivityRepository = scenarioActivityRepository,
fetchingProcessRepository = processRepository,
Expand All @@ -100,7 +101,9 @@ class NotificationServiceTest
val (deploymentService, notificationService) = createServices(deploymentManager)

def notificationsFor(user: LoggedUser): List[Notification] =
notificationService.notifications(None)(user, global).futureValue
notificationService
.notifications(NotificationsScope.NotificationsForLoggedUser(user))(global)
.futureValue

def deployProcess(
givenDeployResult: Try[Option[ExternalDeploymentId]],
Expand Down Expand Up @@ -152,7 +155,9 @@ class NotificationServiceTest
val (deploymentService, notificationService) = createServices(deploymentManager)

def notificationsFor(user: LoggedUser): List[Notification] =
notificationService.notifications(Some(processName))(user, global).futureValue
notificationService
.notifications(NotificationsScope.NotificationsForLoggedUserAndScenario(user, processName))(global)
.futureValue

def deployProcess(
givenDeployResult: Try[Option[ExternalDeploymentId]],
Expand Down Expand Up @@ -249,14 +254,21 @@ class NotificationServiceTest

val user = TestFactory.adminUser("fooUser", "fooUser")
deployProcess(Success(None), user)
val notificationsAfterDeploy = notificationService.notifications(None)(user, global).futureValue
val notificationsAfterDeploy =
notificationService
.notifications(NotificationsScope.NotificationsForLoggedUser(user))(global)
.futureValue

notificationsAfterDeploy should have length 1
val deployNotificationId = notificationsAfterDeploy.head.id

deploymentService
.markActionExecutionFinished("Streaming", passedDeploymentId.value.toActionIdOpt.value)
.futureValue
val notificationAfterExecutionFinished = notificationService.notifications(None)(user, global).futureValue
val notificationAfterExecutionFinished =
notificationService
.notifications(NotificationsScope.NotificationsForLoggedUser(user))(global)
.futureValue
// old notification about deployment is replaced by notification about deployment execution finished which has other id
notificationAfterExecutionFinished should have length 1
notificationAfterExecutionFinished.head.id should not equal deployNotificationId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesReposi
import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus
import pl.touk.nussknacker.engine.management.periodic.model._
import pl.touk.nussknacker.engine.management.periodic.service._
import pl.touk.nussknacker.engine.management.periodic.util.DeterministicUUIDFromLong
import pl.touk.nussknacker.engine.management.periodic.util.DeterministicUUIDFromLong.longUUID
import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor

Expand Down Expand Up @@ -76,7 +77,11 @@ class PeriodicProcessService(
activities = deploymentsWithStatuses.map { case (deployment, metadata) =>
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(processIdWithName.id.value),
scenarioActivityId = ScenarioActivityId(longUUID(deployment.id.value)),
// The periodic process executions are stored in the PeriodicProcessService datasource, with ids of type Long
// We need the ScenarioActivityId to be a unique UUID, generated in an idempotent way from Long id.
// It is important, because if the ScenarioActivityId would change, the activity may be treated as a new one,
// and, for example, GUI may have to refresh more often than necessary .
scenarioActivityId = ScenarioActivityId(DeterministicUUIDFromLong.longUUID(deployment.id.value)),
user = ScenarioUser.internalNuUser,
date = metadata.dateDeployed.getOrElse(metadata.dateFinished),
scenarioVersionId = Some(ScenarioVersionId.from(deployment.periodicProcess.processVersion.versionId)),
Expand Down