Skip to content

Commit

Permalink
Add notifications related to currently displayed scenario (#7184)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgoworko authored and lciolecki committed Dec 10, 2024
1 parent eb3ccd5 commit 51d7e47
Show file tree
Hide file tree
Showing 22 changed files with 408 additions and 71 deletions.
4 changes: 2 additions & 2 deletions designer/client/cypress/e2e/connectionError.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ describe("Connection error", () => {
cy.get("[model-id='filter']").dblclick();
cy.wait("@validation");

cy.intercept("/api/notifications", { statusCode: 502 });
cy.intercept("/api/notifications?scenarioName=*", { statusCode: 502 });
cy.contains(/Backend connection issue/).should("be.visible");
cy.get("body").matchImage({
screenshotConfig,
});

cy.intercept("/api/notifications", (req) => {
cy.intercept("/api/notifications?scenarioName=*", (req) => {
req.continue();
});

Expand Down
6 changes: 2 additions & 4 deletions designer/client/src/containers/Notifications.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ const handleRefresh =
}
toRefresh.forEach((data) => {
switch (data) {
case "versions":
return dispatch(getScenarioActivities(scenarioName));
case "activity":
return dispatch(getScenarioActivities(scenarioName));
case "state":
Expand Down Expand Up @@ -90,7 +88,7 @@ export function Notifications(): JSX.Element {
const currentScenarioName = useSelector(getProcessName);

const refresh = useCallback(() => {
HttpService.loadBackendNotifications()
HttpService.loadBackendNotifications(currentScenarioName)
.then((notifications) => {
handleChangeConnectionError(null);
dispatch(updateBackendNotifications(notifications.map(({ id }) => id)));
Expand Down Expand Up @@ -124,7 +122,7 @@ export function Notifications(): JSX.Element {

type NotificationType = "info" | "error" | "success";

type DataToRefresh = "versions" | "activity" | "state";
type DataToRefresh = "activity" | "state";

export type BackendNotification = {
id: string;
Expand Down
5 changes: 3 additions & 2 deletions designer/client/src/http/HttpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,9 @@ class HttpService {
this.#notificationActions = na;
}

loadBackendNotifications(): Promise<BackendNotification[]> {
return api.get<BackendNotification[]>("/notifications").then((d) => {
loadBackendNotifications(scenarioName: string | undefined): Promise<BackendNotification[]> {
const path = scenarioName !== undefined ? `/notifications?scenarioName=${scenarioName}` : `/notifications`;
return api.get<BackendNotification[]>(path).then((d) => {
return d.data;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.ui.api
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.ui.api.description.NotificationApiEndpoints
import pl.touk.nussknacker.ui.notifications.NotificationService
import pl.touk.nussknacker.ui.notifications.NotificationService.NotificationsScope
import pl.touk.nussknacker.ui.security.api.AuthManager

import scala.concurrent.ExecutionContext
Expand All @@ -21,8 +22,15 @@ class NotificationApiHttpService(
expose {
notificationApiEndpoints.notificationEndpoint
.serverSecurityLogic(authorizeKnownUser[Unit])
.serverLogic { implicit loggedUser => _ =>
notificationService.notifications
.serverLogic { implicit loggedUser => processNameOpt =>
val scope = processNameOpt match {
case Some(processName) =>
NotificationsScope.NotificationsForLoggedUserAndScenario(loggedUser, processName)
case None =>
NotificationsScope.NotificationsForLoggedUser(loggedUser)
}
notificationService
.notifications(scope)
.map { notificationList => success(notificationList) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.ScenarioActi
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.{Comment => _, _}
import pl.touk.nussknacker.ui.api.description.scenarioActivity.{Dtos, Endpoints}
import pl.touk.nussknacker.ui.process.ProcessService.GetScenarioWithDetailsOptions
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository
import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository.{
CommentModificationMetadata,
DeleteAttachmentError,
ModifyCommentError
}
import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, DeploymentComment}
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.process.{ProcessService, ScenarioAttachmentService}
import pl.touk.nussknacker.ui.security.api.{AuthManager, LoggedUser}
import pl.touk.nussknacker.ui.server.HeadersSupport.ContentDisposition
Expand All @@ -39,7 +39,7 @@ import scala.concurrent.{ExecutionContext, Future}

class ScenarioActivityApiHttpService(
authManager: AuthManager,
deploymentManagerDispatcher: DeploymentManagerDispatcher,
fetchScenarioActivityService: FetchScenarioActivityService,
scenarioActivityRepository: ScenarioActivityRepository,
scenarioService: ProcessService,
scenarioAuthorizer: AuthorizeProcess,
Expand Down Expand Up @@ -240,18 +240,10 @@ class ScenarioActivityApiHttpService(

private def fetchActivities(
processIdWithName: ProcessIdWithName
)(implicit loggedUser: LoggedUser): EitherT[Future, ScenarioActivityError, List[Dtos.ScenarioActivity]] =
)(implicit loggedUser: LoggedUser): EitherT[Future, ScenarioActivityError, List[Dtos.ScenarioActivity]] = {
EitherT.right {
for {
generalActivities <- dbioActionRunner.run(scenarioActivityRepository.findActivities(processIdWithName.id))
deploymentManager <- deploymentManagerDispatcher.deploymentManager(processIdWithName)
deploymentManagerSpecificActivities <- deploymentManager match {
case Some(manager: ManagerSpecificScenarioActivitiesStoredByManager) =>
manager.managerSpecificScenarioActivities(processIdWithName)
case Some(_) | None =>
Future.successful(List.empty)
}
combinedActivities = generalActivities ++ deploymentManagerSpecificActivities
combinedActivities <- fetchScenarioActivityService.fetchActivities(processIdWithName, after = None)
// The API endpoint returning scenario activities does not yet have support for filtering. We made a decision to:
// - for activities not related to deployments: always display them on FE
// - for activities related to batch deployments: always display them on FE
Expand All @@ -268,6 +260,7 @@ class ScenarioActivityApiHttpService(
sortedResult = combinedSuccessfulActivities.map(toDto).toList.sortBy(_.date)
} yield sortedResult
}
}

private def toDto(scenarioComment: ScenarioComment): Dtos.ScenarioActivityComment = scenarioComment match {
case ScenarioComment.WithContent(comment, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import pl.touk.nussknacker.ui.notifications.{DataToRefresh, Notification}
import sttp.model.StatusCode.Ok
import sttp.tapir.EndpointIO.Example
import sttp.tapir.json.circe.jsonBody
import sttp.tapir.{EndpointInput, statusCode}
import sttp.tapir.{Codec, CodecFormat, EndpointInput, query, statusCode}

class NotificationApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpointDefinitions {

lazy val notificationEndpoint: SecuredEndpoint[Unit, Unit, List[Notification], Any] =
lazy val notificationEndpoint: SecuredEndpoint[Option[ProcessName], Unit, List[Notification], Any] =
baseNuApiEndpoint
.summary("Endpoint to display notifications")
.tag("Notifications")
.get
.in("notifications")
.in(query[Option[ProcessName]]("scenarioName"))
.out(
statusCode(Ok).and(
jsonBody[List[Notification]].example(
Expand All @@ -30,7 +31,6 @@ class NotificationApiEndpoints(auth: EndpointInput[AuthCredentials]) extends Bas
message = "Deployment finished",
`type` = None,
toRefresh = List(
DataToRefresh(DataToRefresh.versions.id),
DataToRefresh(DataToRefresh.activity.id),
DataToRefresh(DataToRefresh.state.id)
)
Expand All @@ -42,4 +42,8 @@ class NotificationApiEndpoints(auth: EndpointInput[AuthCredentials]) extends Bas
)
.withSecurity(auth)

private implicit val processNameCodec: Codec[String, ProcessName, CodecFormat.TextPlain] = {
Codec.string.map(str => ProcessName(str))(_.value)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object Notification {
Some(name),
s"${displayableActionName(actionName)} finished",
None,
List(DataToRefresh.versions, DataToRefresh.activity, DataToRefresh.state)
List(DataToRefresh.activity, DataToRefresh.state)
)
}

Expand All @@ -64,7 +64,22 @@ object Notification {
Some(name),
s"${displayableActionName(actionName)} execution finished",
None,
List(DataToRefresh.versions, DataToRefresh.activity, DataToRefresh.state)
List(DataToRefresh.activity, DataToRefresh.state)
)
}

def scenarioStateUpdateNotification(
id: String,
activityName: String,
name: ProcessName
): Notification = {
// We don't want to display this notification, because it causes the activities toolbar to refresh
Notification(
id = id,
scenarioName = Some(name),
message = activityName,
`type` = None,
toRefresh = List(DataToRefresh.activity, DataToRefresh.state)
)
}

Expand Down Expand Up @@ -92,5 +107,5 @@ object DataToRefresh extends Enumeration {
implicit val typeDecoder: Decoder[DataToRefresh.Value] = Decoder.decodeEnumeration(DataToRefresh)

type DataToRefresh = Value
val versions, activity, state = Value
val activity, state = Value
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,85 @@
package pl.touk.nussknacker.ui.notifications

import pl.touk.nussknacker.engine.api.deployment.{ProcessActionState, ScenarioActionName}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.ui.notifications.NotificationService.NotificationsScope
import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, ScenarioActionRepository}
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.util.ScenarioActivityUtils.ScenarioActivityOps

import java.time.Clock
import java.time.{Clock, Instant}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

final case class NotificationConfig(duration: FiniteDuration)

trait NotificationService {

def notifications(implicit user: LoggedUser, ec: ExecutionContext): Future[List[Notification]]
def notifications(scope: NotificationsScope)(
implicit ec: ExecutionContext
): Future[List[Notification]]

}

object NotificationService {

sealed trait NotificationsScope

object NotificationsScope {

final case class NotificationsForLoggedUser(
user: LoggedUser,
) extends NotificationsScope

final case class NotificationsForLoggedUserAndScenario(
user: LoggedUser,
processName: ProcessName,
) extends NotificationsScope

}

}

class NotificationServiceImpl(
fetchScenarioActivityService: FetchScenarioActivityService,
scenarioActionRepository: ScenarioActionRepository,
dbioRunner: DBIOActionRunner,
config: NotificationConfig,
clock: Clock = Clock.systemUTC()
) extends NotificationService {

override def notifications(implicit user: LoggedUser, ec: ExecutionContext): Future[List[Notification]] = {
override def notifications(
scope: NotificationsScope,
)(implicit ec: ExecutionContext): Future[List[Notification]] = {
val now = clock.instant()
val limit = now.minusMillis(config.duration.toMillis)
dbioRunner
.run(
scenarioActionRepository.getUserActionsAfter(
user,
Set(ScenarioActionName.Deploy, ScenarioActionName.Cancel),
ProcessActionState.FinishedStates + ProcessActionState.Failed,
limit
)
scope match {
case NotificationsScope.NotificationsForLoggedUser(user) =>
notificationsForUserActions(user, limit)
case NotificationsScope.NotificationsForLoggedUserAndScenario(user, processName) =>
for {
notificationsForUserActions <- notificationsForUserActions(user, limit)
notificationsForScenarioActivities <- notificationsForScenarioActivities(user, processName, limit)
} yield notificationsForUserActions ++ notificationsForScenarioActivities
}

}

private def notificationsForUserActions(user: LoggedUser, limit: Instant)(
implicit ec: ExecutionContext
): Future[List[Notification]] = dbioRunner.run {
scenarioActionRepository
.getUserActionsAfter(
user,
Set(ScenarioActionName.Deploy, ScenarioActionName.Cancel),
ProcessActionState.FinishedStates + ProcessActionState.Failed,
limit
)
.map(_.map { case (action, scenarioName) =>
action.state match {
case ProcessActionState.Finished =>
Notification
.actionFinishedNotification(action.id.toString, action.actionName, scenarioName)
Notification.actionFinishedNotification(action.id.toString, action.actionName, scenarioName)
case ProcessActionState.Failed =>
Notification
.actionFailedNotification(action.id.toString, action.actionName, scenarioName, action.failureMessage)
Expand All @@ -54,4 +94,22 @@ class NotificationServiceImpl(
})
}

private def notificationsForScenarioActivities(user: LoggedUser, processName: ProcessName, limit: Instant)(
implicit ec: ExecutionContext
): Future[List[Notification]] = {
for {
allActivities <- fetchScenarioActivityService.fetchActivities(processName, Some(limit))(user).value.map {
case Right(activities) => activities
case Left(_) => List.empty
}
notificationsForScenarioActivities = allActivities.map { activity =>
Notification.scenarioStateUpdateNotification(
s"${activity.scenarioActivityId.value.toString}_${activity.lastModifiedAt.toEpochMilli}",
activity.activityType.entryName,
processName
)
}
} yield notificationsForScenarioActivities
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,

def findActivities(
scenarioId: ProcessId,
after: Option[Instant],
): DB[Seq[ScenarioActivity]] = {
doFindActivities(scenarioId).map(_.map(_._2))
doFindActivities(scenarioId, after).map(_.map(_._2))
}

def addActivity(
Expand Down Expand Up @@ -199,7 +200,7 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,
for {
attachmentEntities <- findAttachments(processId)
attachments = attachmentEntities.map(toDto)
activities <- doFindActivities(processId)
activities <- doFindActivities(processId, after = None)
comments = activities.flatMap { case (id, activity) => toComment(id, activity) }
} yield Legacy.ProcessActivity(
comments = comments.toList,
Expand All @@ -221,9 +222,11 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,

private def doFindActivities(
scenarioId: ProcessId,
after: Option[Instant],
): DB[Seq[(Long, ScenarioActivity)]] = {
scenarioActivityTable
.filter(_.scenarioId === scenarioId)
.filterOpt(after)((table, after) => table.createdAt > Timestamp.from(after))
// ScenarioActivity in domain represents a single, immutable event, so we interpret only finished operations as ScenarioActivities
.filter { entity =>
entity.state.isEmpty ||
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.ui.process.repository.activities

import db.util.DBIOActionInstances.DB
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId}
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.Legacy
Expand All @@ -15,14 +14,15 @@ import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepo
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.util.LoggedUserUtils.Ops

import java.time.Clock
import java.time.{Clock, Instant}

trait ScenarioActivityRepository {

def clock: Clock

def findActivities(
scenarioId: ProcessId,
after: Option[Instant] = None,
): DB[Seq[ScenarioActivity]]

def addActivity(
Expand Down
Loading

0 comments on commit 51d7e47

Please sign in to comment.