Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add notifications related to currently displayed scenario #7184

Merged
4 changes: 2 additions & 2 deletions designer/client/cypress/e2e/connectionError.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ describe("Connection error", () => {
cy.get("[model-id='filter']").dblclick();
cy.wait("@validation");

cy.intercept("/api/notifications", { statusCode: 502 });
cy.intercept("/api/notifications?scenarioName=*", { statusCode: 502 });
cy.contains(/Backend connection issue/).should("be.visible");
cy.get("body").matchImage({
screenshotConfig,
});

cy.intercept("/api/notifications", (req) => {
cy.intercept("/api/notifications?scenarioName=*", (req) => {
req.continue();
});

Expand Down
6 changes: 2 additions & 4 deletions designer/client/src/containers/Notifications.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ const handleRefresh =
}
toRefresh.forEach((data) => {
switch (data) {
case "versions":
return dispatch(getScenarioActivities(scenarioName));
case "activity":
return dispatch(getScenarioActivities(scenarioName));
case "state":
Expand Down Expand Up @@ -90,7 +88,7 @@ export function Notifications(): JSX.Element {
const currentScenarioName = useSelector(getProcessName);

const refresh = useCallback(() => {
HttpService.loadBackendNotifications()
HttpService.loadBackendNotifications(currentScenarioName)
.then((notifications) => {
handleChangeConnectionError(null);
dispatch(updateBackendNotifications(notifications.map(({ id }) => id)));
Expand Down Expand Up @@ -124,7 +122,7 @@ export function Notifications(): JSX.Element {

type NotificationType = "info" | "error" | "success";

type DataToRefresh = "versions" | "activity" | "state";
type DataToRefresh = "activity" | "state";

export type BackendNotification = {
id: string;
Expand Down
5 changes: 3 additions & 2 deletions designer/client/src/http/HttpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,9 @@ class HttpService {
this.#notificationActions = na;
}

loadBackendNotifications(): Promise<BackendNotification[]> {
return api.get<BackendNotification[]>("/notifications").then((d) => {
loadBackendNotifications(scenarioName: string | undefined): Promise<BackendNotification[]> {
Dzuming marked this conversation as resolved.
Show resolved Hide resolved
const path = scenarioName !== undefined ? `/notifications?scenarioName=${scenarioName}` : `/notifications`;
return api.get<BackendNotification[]>(path).then((d) => {
return d.data;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.ui.api
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.ui.api.description.NotificationApiEndpoints
import pl.touk.nussknacker.ui.notifications.NotificationService
import pl.touk.nussknacker.ui.notifications.NotificationService.NotificationsScope
import pl.touk.nussknacker.ui.security.api.AuthManager

import scala.concurrent.ExecutionContext
Expand All @@ -21,8 +22,15 @@ class NotificationApiHttpService(
expose {
notificationApiEndpoints.notificationEndpoint
.serverSecurityLogic(authorizeKnownUser[Unit])
.serverLogic { implicit loggedUser => _ =>
notificationService.notifications
.serverLogic { implicit loggedUser => processNameOpt =>
val scope = processNameOpt match {
case Some(processName) =>
NotificationsScope.NotificationsForLoggedUserAndScenario(loggedUser, processName)
case None =>
NotificationsScope.NotificationsForLoggedUser(loggedUser)
}
notificationService
.notifications(scope)
.map { notificationList => success(notificationList) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.ScenarioActi
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.{Comment => _, _}
import pl.touk.nussknacker.ui.api.description.scenarioActivity.{Dtos, Endpoints}
import pl.touk.nussknacker.ui.process.ProcessService.GetScenarioWithDetailsOptions
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository
import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository.{
CommentModificationMetadata,
DeleteAttachmentError,
ModifyCommentError
}
import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, DeploymentComment}
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.process.{ProcessService, ScenarioAttachmentService}
import pl.touk.nussknacker.ui.security.api.{AuthManager, LoggedUser}
import pl.touk.nussknacker.ui.server.HeadersSupport.ContentDisposition
Expand All @@ -39,7 +39,7 @@ import scala.concurrent.{ExecutionContext, Future}

class ScenarioActivityApiHttpService(
authManager: AuthManager,
deploymentManagerDispatcher: DeploymentManagerDispatcher,
fetchScenarioActivityService: FetchScenarioActivityService,
scenarioActivityRepository: ScenarioActivityRepository,
scenarioService: ProcessService,
scenarioAuthorizer: AuthorizeProcess,
Expand Down Expand Up @@ -240,18 +240,10 @@ class ScenarioActivityApiHttpService(

private def fetchActivities(
processIdWithName: ProcessIdWithName
)(implicit loggedUser: LoggedUser): EitherT[Future, ScenarioActivityError, List[Dtos.ScenarioActivity]] =
)(implicit loggedUser: LoggedUser): EitherT[Future, ScenarioActivityError, List[Dtos.ScenarioActivity]] = {
EitherT.right {
for {
generalActivities <- dbioActionRunner.run(scenarioActivityRepository.findActivities(processIdWithName.id))
deploymentManager <- deploymentManagerDispatcher.deploymentManager(processIdWithName)
deploymentManagerSpecificActivities <- deploymentManager match {
case Some(manager: ManagerSpecificScenarioActivitiesStoredByManager) =>
manager.managerSpecificScenarioActivities(processIdWithName)
case Some(_) | None =>
Future.successful(List.empty)
}
combinedActivities = generalActivities ++ deploymentManagerSpecificActivities
combinedActivities <- fetchScenarioActivityService.fetchActivities(processIdWithName, after = None)
// The API endpoint returning scenario activities does not yet have support for filtering. We made a decision to:
// - for activities not related to deployments: always display them on FE
// - for activities related to batch deployments: always display them on FE
Expand All @@ -268,6 +260,7 @@ class ScenarioActivityApiHttpService(
sortedResult = combinedSuccessfulActivities.map(toDto).toList.sortBy(_.date)
} yield sortedResult
}
}

private def toDto(scenarioComment: ScenarioComment): Dtos.ScenarioActivityComment = scenarioComment match {
case ScenarioComment.WithContent(comment, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import pl.touk.nussknacker.ui.notifications.{DataToRefresh, Notification}
import sttp.model.StatusCode.Ok
import sttp.tapir.EndpointIO.Example
import sttp.tapir.json.circe.jsonBody
import sttp.tapir.{EndpointInput, statusCode}
import sttp.tapir.{Codec, CodecFormat, EndpointInput, query, statusCode}

class NotificationApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpointDefinitions {

lazy val notificationEndpoint: SecuredEndpoint[Unit, Unit, List[Notification], Any] =
lazy val notificationEndpoint: SecuredEndpoint[Option[ProcessName], Unit, List[Notification], Any] =
baseNuApiEndpoint
.summary("Endpoint to display notifications")
.tag("Notifications")
.get
.in("notifications")
.in(query[Option[ProcessName]]("scenarioName"))
.out(
statusCode(Ok).and(
jsonBody[List[Notification]].example(
Expand All @@ -30,7 +31,6 @@ class NotificationApiEndpoints(auth: EndpointInput[AuthCredentials]) extends Bas
message = "Deployment finished",
`type` = None,
toRefresh = List(
DataToRefresh(DataToRefresh.versions.id),
DataToRefresh(DataToRefresh.activity.id),
DataToRefresh(DataToRefresh.state.id)
)
Expand All @@ -42,4 +42,8 @@ class NotificationApiEndpoints(auth: EndpointInput[AuthCredentials]) extends Bas
)
.withSecurity(auth)

private implicit val processNameCodec: Codec[String, ProcessName, CodecFormat.TextPlain] = {
Codec.string.map(str => ProcessName(str))(_.value)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object Notification {
Some(name),
s"${displayableActionName(actionName)} finished",
None,
List(DataToRefresh.versions, DataToRefresh.activity, DataToRefresh.state)
List(DataToRefresh.activity, DataToRefresh.state)
)
}

Expand All @@ -64,7 +64,22 @@ object Notification {
Some(name),
s"${displayableActionName(actionName)} execution finished",
None,
List(DataToRefresh.versions, DataToRefresh.activity, DataToRefresh.state)
List(DataToRefresh.activity, DataToRefresh.state)
)
}

def scenarioStateUpdateNotification(
id: String,
activityName: String,
name: ProcessName
): Notification = {
// We don't want to display this notification, because it causes the activities toolbar to refresh
Notification(
id = id,
scenarioName = Some(name),
message = activityName,
`type` = None,
toRefresh = List(DataToRefresh.activity, DataToRefresh.state)
)
}

Expand Down Expand Up @@ -92,5 +107,5 @@ object DataToRefresh extends Enumeration {
implicit val typeDecoder: Decoder[DataToRefresh.Value] = Decoder.decodeEnumeration(DataToRefresh)

type DataToRefresh = Value
val versions, activity, state = Value
val activity, state = Value
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,85 @@
package pl.touk.nussknacker.ui.notifications

import pl.touk.nussknacker.engine.api.deployment.{ProcessActionState, ScenarioActionName}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.ui.notifications.NotificationService.NotificationsScope
import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, ScenarioActionRepository}
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.util.ScenarioActivityUtils.ScenarioActivityOps

import java.time.Clock
import java.time.{Clock, Instant}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

final case class NotificationConfig(duration: FiniteDuration)

trait NotificationService {

def notifications(implicit user: LoggedUser, ec: ExecutionContext): Future[List[Notification]]
def notifications(scope: NotificationsScope)(
implicit ec: ExecutionContext
): Future[List[Notification]]

}

object NotificationService {

sealed trait NotificationsScope

object NotificationsScope {

final case class NotificationsForLoggedUser(
user: LoggedUser,
) extends NotificationsScope

final case class NotificationsForLoggedUserAndScenario(
user: LoggedUser,
processName: ProcessName,
) extends NotificationsScope

}

}

class NotificationServiceImpl(
fetchScenarioActivityService: FetchScenarioActivityService,
scenarioActionRepository: ScenarioActionRepository,
dbioRunner: DBIOActionRunner,
config: NotificationConfig,
clock: Clock = Clock.systemUTC()
) extends NotificationService {

override def notifications(implicit user: LoggedUser, ec: ExecutionContext): Future[List[Notification]] = {
override def notifications(
scope: NotificationsScope,
)(implicit ec: ExecutionContext): Future[List[Notification]] = {
val now = clock.instant()
val limit = now.minusMillis(config.duration.toMillis)
dbioRunner
.run(
scenarioActionRepository.getUserActionsAfter(
user,
Set(ScenarioActionName.Deploy, ScenarioActionName.Cancel),
ProcessActionState.FinishedStates + ProcessActionState.Failed,
limit
)
scope match {
case NotificationsScope.NotificationsForLoggedUser(user) =>
notificationsForUserActions(user, limit)
case NotificationsScope.NotificationsForLoggedUserAndScenario(user, processName) =>
for {
notificationsForUserActions <- notificationsForUserActions(user, limit)
notificationsForScenarioActivities <- notificationsForScenarioActivities(user, processName, limit)
} yield notificationsForUserActions ++ notificationsForScenarioActivities
}

}

private def notificationsForUserActions(user: LoggedUser, limit: Instant)(
implicit ec: ExecutionContext
): Future[List[Notification]] = dbioRunner.run {
scenarioActionRepository
.getUserActionsAfter(
user,
Set(ScenarioActionName.Deploy, ScenarioActionName.Cancel),
ProcessActionState.FinishedStates + ProcessActionState.Failed,
limit
)
.map(_.map { case (action, scenarioName) =>
action.state match {
case ProcessActionState.Finished =>
Notification
.actionFinishedNotification(action.id.toString, action.actionName, scenarioName)
Notification.actionFinishedNotification(action.id.toString, action.actionName, scenarioName)
case ProcessActionState.Failed =>
Notification
.actionFailedNotification(action.id.toString, action.actionName, scenarioName, action.failureMessage)
Expand All @@ -54,4 +94,22 @@ class NotificationServiceImpl(
})
}

private def notificationsForScenarioActivities(user: LoggedUser, processName: ProcessName, limit: Instant)(
implicit ec: ExecutionContext
): Future[List[Notification]] = {
for {
allActivities <- fetchScenarioActivityService.fetchActivities(processName, Some(limit))(user).value.map {
case Right(activities) => activities
case Left(_) => List.empty
}
notificationsForScenarioActivities = allActivities.map { activity =>
Notification.scenarioStateUpdateNotification(
s"${activity.scenarioActivityId.value.toString}_${activity.lastModifiedAt.toEpochMilli}",
activity.activityType.entryName,
processName
)
}
} yield notificationsForScenarioActivities
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,

def findActivities(
scenarioId: ProcessId,
after: Option[Instant],
): DB[Seq[ScenarioActivity]] = {
doFindActivities(scenarioId).map(_.map(_._2))
doFindActivities(scenarioId, after).map(_.map(_._2))
}

def addActivity(
Expand Down Expand Up @@ -199,7 +200,7 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,
for {
attachmentEntities <- findAttachments(processId)
attachments = attachmentEntities.map(toDto)
activities <- doFindActivities(processId)
activities <- doFindActivities(processId, after = None)
comments = activities.flatMap { case (id, activity) => toComment(id, activity) }
} yield Legacy.ProcessActivity(
comments = comments.toList,
Expand All @@ -221,9 +222,11 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,

private def doFindActivities(
scenarioId: ProcessId,
after: Option[Instant],
): DB[Seq[(Long, ScenarioActivity)]] = {
scenarioActivityTable
.filter(_.scenarioId === scenarioId)
.filterOpt(after)((table, after) => table.createdAt > Timestamp.from(after))
// ScenarioActivity in domain represents a single, immutable event, so we interpret only finished operations as ScenarioActivities
.filter { entity =>
entity.state.isEmpty ||
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.ui.process.repository.activities

import db.util.DBIOActionInstances.DB
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId}
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.Legacy
Expand All @@ -15,14 +14,15 @@ import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepo
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.util.LoggedUserUtils.Ops

import java.time.Clock
import java.time.{Clock, Instant}

trait ScenarioActivityRepository {

def clock: Clock

def findActivities(
scenarioId: ProcessId,
after: Option[Instant] = None,
): DB[Seq[ScenarioActivity]]

def addActivity(
Expand Down
Loading