diff --git a/designer/client/src/components/graph/node-modal/editors/expression/AceWithSettings.tsx b/designer/client/src/components/graph/node-modal/editors/expression/AceWithSettings.tsx
index 374e97835df..51d7b19a96a 100644
--- a/designer/client/src/components/graph/node-modal/editors/expression/AceWithSettings.tsx
+++ b/designer/client/src/components/graph/node-modal/editors/expression/AceWithSettings.tsx
@@ -41,23 +41,19 @@ export default forwardRef(function AceWithSettings(
const editor = editorRef.current?.editor;
const selection = editor?.session.selection;
- // before setting cursor position ensure all position calculations are actual
- const prepare = () => editor?.renderer.updateFull(true);
-
const scrollToView = throttle(
() => {
- document.activeElement.scrollIntoView({ block: "nearest", inline: "nearest" });
+ // before setting cursor position ensure all position calculations are actual
+ editor?.renderer.updateFull(true);
+ const activeElement = editor.container.querySelector(".ace_cursor") || document.activeElement;
+ activeElement.scrollIntoView({ block: "nearest", inline: "nearest" });
},
150,
{ leading: false },
);
- editor?.addEventListener("mousedown", prepare);
- editor?.addEventListener("mouseup", scrollToView);
selection?.on("changeCursor", scrollToView);
return () => {
- editor?.removeEventListener("mousedown", prepare);
- editor?.removeEventListener("mouseup", scrollToView);
selection?.off("changeCursor", scrollToView);
};
}, []);
diff --git a/designer/client/src/components/toolbars/activities/ActivitiesPanelRow.tsx b/designer/client/src/components/toolbars/activities/ActivitiesPanelRow.tsx
index 3f956ee9bc6..d6b53011f02 100644
--- a/designer/client/src/components/toolbars/activities/ActivitiesPanelRow.tsx
+++ b/designer/client/src/components/toolbars/activities/ActivitiesPanelRow.tsx
@@ -27,7 +27,8 @@ export const ActivitiesPanelRow = memo(({ index, style, setRowHeight, handleShow
() => activities.findIndex((activeItem) => activeItem.uiType === "item" && activeItem.type === "SCENARIO_DEPLOYED"),
[activities],
);
- const isRunning = firstDeployedIndex === index && scenarioState.status.name === "RUNNING";
+ const scenarioStatusesToActiveDeploy = ["RUNNING", "SCHEDULED"];
+ const isDeploymentActive = firstDeployedIndex === index && scenarioStatusesToActiveDeploy.includes(scenarioState.status.name);
const isFirstDateItem = activities.findIndex((activeItem) => activeItem.uiType === "date") === index;
useEffect(() => {
@@ -41,7 +42,7 @@ export const ActivitiesPanelRow = memo(({ index, style, setRowHeight, handleShow
case "item": {
return (
-
+
);
}
@@ -69,7 +70,7 @@ export const ActivitiesPanelRow = memo(({ index, style, setRowHeight, handleShow
return null;
}
}
- }, [activity, handleHideRows, handleShowRows, isRunning, isFirstDateItem, searchQuery, t]);
+ }, [activity, handleHideRows, handleShowRows, isDeploymentActive, isFirstDateItem, searchQuery, t]);
return (
diff --git a/designer/client/src/components/toolbars/activities/ActivityPanelRowItem/ActivityItem.tsx b/designer/client/src/components/toolbars/activities/ActivityPanelRowItem/ActivityItem.tsx
index 9ca4a896f1a..1508436cf31 100644
--- a/designer/client/src/components/toolbars/activities/ActivityPanelRowItem/ActivityItem.tsx
+++ b/designer/client/src/components/toolbars/activities/ActivityPanelRowItem/ActivityItem.tsx
@@ -28,7 +28,7 @@ const StyledActivityBody = styled("div")(({ theme }) => ({
export const ActivityItem = forwardRef(
(
- { activity, isRunning, searchQuery }: { activity: ItemActivity; isRunning: boolean; searchQuery: string },
+ { activity, isDeploymentActive, searchQuery }: { activity: ItemActivity; isDeploymentActive: boolean; searchQuery: string },
ref: ForwardedRef
,
) => {
const { t } = useTranslation();
@@ -55,7 +55,7 @@ export const ActivityItem = forwardRef(
>
({
gap: theme.spacing(0.5),
}));
-const StyledActivityItemHeader = styled("div")<{ isHighlighted: boolean; isRunning: boolean; isActiveFound: boolean }>(
- ({ theme, isHighlighted, isRunning, isActiveFound }) => ({
+const StyledActivityItemHeader = styled("div")<{ isHighlighted: boolean; isDeploymentActive: boolean; isActiveFound: boolean }>(
+ ({ theme, isHighlighted, isDeploymentActive, isActiveFound }) => ({
display: "flex",
alignItems: "center",
padding: theme.spacing(0.5, 0.5, 0.5, 0.75),
borderRadius: theme.spacing(0.5),
- ...getHeaderColors(theme, isHighlighted, isRunning, isActiveFound),
+ ...getHeaderColors(theme, isHighlighted, isDeploymentActive, isActiveFound),
}),
);
@@ -167,7 +167,7 @@ const HeaderActivity = ({
interface Props {
activity: ItemActivity;
- isRunning: boolean;
+ isDeploymentActive: boolean;
isActiveFound: boolean;
isFound: boolean;
searchQuery: string;
@@ -230,7 +230,7 @@ const WithOpenVersion = ({
);
};
-const ActivityItemHeader = ({ activity, isRunning, isFound, isActiveFound, searchQuery }: Props) => {
+const ActivityItemHeader = ({ activity, isDeploymentActive, isFound, isActiveFound, searchQuery }: Props) => {
const scenario = useSelector(getScenario);
const { processVersionId } = scenario || {};
@@ -279,7 +279,7 @@ const ActivityItemHeader = ({ activity, isRunning, isFound, isActiveFound, searc
]);
return (
-
+
{getHeaderTitle}
diff --git a/designer/client/src/components/toolbars/activities/helpers/activityItemColors.ts b/designer/client/src/components/toolbars/activities/helpers/activityItemColors.ts
index d0192a16818..0afdb0b454c 100644
--- a/designer/client/src/components/toolbars/activities/helpers/activityItemColors.ts
+++ b/designer/client/src/components/toolbars/activities/helpers/activityItemColors.ts
@@ -12,8 +12,8 @@ const runningHeaderBackground = (theme: Theme) => blend(theme.palette.background
const activeFoundItemBackground = (theme: Theme) => blend(theme.palette.background.paper, theme.palette.primary.main, 0.2);
const foundItemBackground = (theme: Theme) => blend(theme.palette.background.paper, theme.palette.primary.main, 0.08);
-export const getHeaderColors = (theme: Theme, isHighlighted: boolean, isRunning: boolean, isActiveFound: boolean) => {
- if (isRunning && isActiveFound) {
+export const getHeaderColors = (theme: Theme, isHighlighted: boolean, isDeploymentActive: boolean, isActiveFound: boolean) => {
+ if (isDeploymentActive && isActiveFound) {
return {
backgroundColor: runningActiveFoundHeaderBackground(theme),
border: activeBorder(theme),
@@ -27,7 +27,7 @@ export const getHeaderColors = (theme: Theme, isHighlighted: boolean, isRunning:
};
}
- if (isRunning) {
+ if (isDeploymentActive) {
return {
backgroundColor: runningHeaderBackground(theme),
border: defaultBorder(theme),
diff --git a/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/definition/package.scala b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/definition/package.scala
index f28942f62d6..11dcd56935f 100644
--- a/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/definition/package.scala
+++ b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/definition/package.scala
@@ -50,8 +50,10 @@ package object definition {
branchParam: Boolean,
hintText: Option[String],
label: String,
- // This attribute is used only by external project
- requiredParam: Boolean,
+ // This attribute is in use only by the external project and was introduced in the 1.18 version
+ // The option is for decoder backward compatibility to decode responses from older versions
+ // The option can be removed in future releases
+ requiredParam: Option[Boolean],
)
@JsonCodec(encodeOnly = true) final case class UIComponentDefinition(
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/definition/DefinitionsService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/definition/DefinitionsService.scala
index d9cd34c4017..1bb5980f09f 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/definition/DefinitionsService.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/definition/DefinitionsService.scala
@@ -170,7 +170,7 @@ object DefinitionsService {
branchParam = parameter.branchParam,
hintText = parameter.hintText,
label = parameter.label,
- requiredParam = !parameter.isOptional,
+ requiredParam = Some(!parameter.isOptional),
)
}
diff --git a/docs-internal/api/nu-designer-openapi.yaml b/docs-internal/api/nu-designer-openapi.yaml
index 215692d8583..e98d3188696 100644
--- a/docs-internal/api/nu-designer-openapi.yaml
+++ b/docs-internal/api/nu-designer-openapi.yaml
@@ -7002,7 +7002,6 @@ components:
- additionalVariables
- branchParam
- label
- - requiredParam
properties:
name:
type: string
@@ -7036,7 +7035,9 @@ components:
label:
type: string
requiredParam:
- type: boolean
+ type:
+ - boolean
+ - 'null'
UIValueParameterDto:
title: UIValueParameterDto
type: object
diff --git a/docs/Changelog.md b/docs/Changelog.md
index b323ce6b6e6..7e2e73657b8 100644
--- a/docs/Changelog.md
+++ b/docs/Changelog.md
@@ -22,6 +22,11 @@
* [#7207](https://github.com/TouK/nussknacker/pull/7207) Fixed minor clipboard, keyboard and focus related bugs
* [#7237](https://github.com/TouK/nussknacker/pull/7237) Fix: ToJsonEncoder keeps order fields during encoding map
+* [#7240](https://github.com/TouK/nussknacker/pull/7240) Fixed race condition problem during SpEL expression evaluation
+* [#7269](https://github.com/TouK/nussknacker/pull/7269) Fixed focus scrolling in expression editor
+* [#7270](https://github.com/TouK/nussknacker/pull/7270) Savepoint deserialization fixup - some taken savepoints (e.g. for scenarios with async enrichers) were not deserializable which led to errors during redeployments on Flink
+* [#7279](https://github.com/TouK/nussknacker/pull/7279) Fixed Flink TaskManager and Designer containers restarts in installation example
+* [#7283](https://github.com/TouK/nussknacker/pull/7283) fix deployment status indicator for periodic scenarios type
### 1.18.0 (22 November 2024)
diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala
index 131137fd1bd..7d89fb427cd 100644
--- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala
+++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.process.typeinformation
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
-import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.api.java.typeutils.{ListTypeInfo, MapTypeInfo, MultisetTypeInfo, RowTypeInfo}
import org.apache.flink.types.Row
import pl.touk.nussknacker.engine.api.context.ValidationContext
@@ -99,20 +98,10 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection
}
private def createScalaMapTypeInformation(typingResult: TypedObjectTypingResult) =
- TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult)
+ TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType))
private def createJavaMapTypeInformation(typingResult: TypedObjectTypingResult) =
- TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult)
-
- protected def constructIntermediateCompatibilityResult(
- newNestedSerializers: Array[TypeSerializer[_]],
- oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
- ): CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] = {
- CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(
- newNestedSerializers.map(_.snapshotConfiguration()),
- oldNestedSerializerSnapshots
- )
- }
+ TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType))
def forValueWithContext[T](
validationContext: ValidationContext,
diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala
index c2f7fd96633..df03b2c5c15 100644
--- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala
+++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala
@@ -3,42 +3,36 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject
import java.{util => jutil}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
-import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult
case class TypedJavaMapTypeInformation(
- informations: Map[String, TypeInformation[_]],
- buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
+ informations: Map[String, TypeInformation[_]]
) extends TypedObjectBasedTypeInformation[jutil.Map[String, AnyRef]](informations) {
override def createSerializer(
serializers: Array[(String, TypeSerializer[_])]
): TypeSerializer[jutil.Map[String, AnyRef]] =
- TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
+ TypedJavaMapSerializer(serializers)
}
@SerialVersionUID(1L)
case class TypedJavaMapSerializer(
- override val serializers: Array[(String, TypeSerializer[_])],
- override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
+ override val serializers: Array[(String, TypeSerializer[_])]
) extends TypedObjectBasedTypeSerializer[jutil.Map[String, AnyRef]](serializers)
with BaseJavaMapBasedSerializer[AnyRef, jutil.Map[String, AnyRef]] {
override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[jutil.Map[String, AnyRef]] =
- TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
+ TypedJavaMapSerializer(serializers)
override def createInstance(): jutil.Map[String, AnyRef] = new jutil.HashMap()
override def snapshotConfiguration(
snapshots: Array[(String, TypeSerializerSnapshot[_])]
- ): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) {
- override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult =
- buildIntermediateSchemaCompatibilityResultFunction
- }
+ ): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots)
}
-abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] {
+final class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] {
def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
this()
@@ -48,6 +42,6 @@ abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializer
override protected def restoreSerializer(
restored: Array[(String, TypeSerializer[_])]
): TypeSerializer[jutil.Map[String, AnyRef]] =
- TypedJavaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult)
+ TypedJavaMapSerializer(restored)
}
diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala
index 530cba98149..e7ca4c1e5a8 100644
--- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala
+++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala
@@ -4,6 +4,7 @@ import com.github.ghik.silencer.silent
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult
import org.apache.flink.api.common.typeutils.{
CompositeTypeSerializerUtil,
TypeSerializer,
@@ -11,7 +12,6 @@ import org.apache.flink.api.common.typeutils.{
TypeSerializerSnapshot
}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult
import scala.reflect.ClassTag
@@ -57,15 +57,6 @@ abstract class TypedObjectBasedTypeInformation[T: ClassTag](informations: Array[
def createSerializer(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T]
}
-object TypedObjectBasedTypeInformation {
-
- type BuildIntermediateSchemaCompatibilityResult = (
- Array[TypeSerializer[_]],
- Array[TypeSerializerSnapshot[_]]
- ) => CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing]
-
-}
-
//We use Array instead of List here, as we need access by index, which is faster for array
abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, TypeSerializer[_])])
extends TypeSerializer[T]
@@ -132,17 +123,13 @@ abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String,
def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T]
- def buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
}
abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnapshot[T] with LazyLogging {
- protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _
+ private val constructIntermediateCompatibilityResultMethodName = "constructIntermediateCompatibilityResult"
- def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
- this()
- this.serializersSnapshots = serializers
- }
+ protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _
override def getCurrentVersion: Int = 1
@@ -182,10 +169,10 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps
val newKeys = newSerializers.map(_._1)
val commons = currentKeys.intersect(newKeys)
- val newSerializersToUse = newSerializers.filter(k => commons.contains(k._1))
- val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1))
+ val newSerializersToUse: Array[(String, TypeSerializer[_])] = newSerializers.filter(k => commons.contains(k._1))
+ val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1))
- val fieldsCompatibility = buildIntermediateSchemaCompatibilityResult(
+ val fieldsCompatibility = constructIntermediateCompatibilityResultProxied(
newSerializersToUse.map(_._2),
snapshotsToUse.map(_._2)
)
@@ -237,7 +224,33 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps
}
}
- val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult
+ private def constructIntermediateCompatibilityResultProxied(
+ newNestedSerializers: Array[TypeSerializer[_]],
+ nestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
+ ): IntermediateCompatibilityResult[_] = {
+ // signature of CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult has been changed between flink 1.18/1.19
+ // Because of contract of serialization/deserialization of TypeSerializerSnapshot in can't be easily provided by TypeInformationDetection SPI mechanism
+ try {
+ val newMethod = classOf[CompositeTypeSerializerUtil].getMethod(
+ constructIntermediateCompatibilityResultMethodName,
+ classOf[Array[TypeSerializerSnapshot[_]]],
+ classOf[Array[TypeSerializerSnapshot[_]]]
+ )
+ newMethod
+ .invoke(null, newNestedSerializers.map(_.snapshotConfiguration()), nestedSerializerSnapshots)
+ .asInstanceOf[IntermediateCompatibilityResult[_]]
+ } catch {
+ case _: NoSuchMethodException =>
+ val oldMethod = classOf[CompositeTypeSerializerUtil].getMethod(
+ constructIntermediateCompatibilityResultMethodName,
+ classOf[Array[TypeSerializer[_]]],
+ classOf[Array[TypeSerializerSnapshot[_]]]
+ )
+ oldMethod
+ .invoke(null, newNestedSerializers, nestedSerializerSnapshots)
+ .asInstanceOf[IntermediateCompatibilityResult[_]]
+ }
+ }
override def restoreSerializer(): TypeSerializer[T] = restoreSerializer(serializersSnapshots.map {
case (k, snapshot) => (k, snapshot.restoreSerializer())
diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala
index b987369b534..4b204c0418a 100644
--- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala
+++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala
@@ -3,24 +3,21 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
-import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult
case class TypedScalaMapTypeInformation(
- informations: Map[String, TypeInformation[_]],
- buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
+ informations: Map[String, TypeInformation[_]]
) extends TypedObjectBasedTypeInformation[Map[String, _ <: AnyRef]](informations) {
override def createSerializer(
serializers: Array[(String, TypeSerializer[_])]
): TypeSerializer[Map[String, _ <: AnyRef]] =
- TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
+ TypedScalaMapSerializer(serializers)
}
@SerialVersionUID(1L)
case class TypedScalaMapSerializer(
- override val serializers: Array[(String, TypeSerializer[_])],
- override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
+ override val serializers: Array[(String, TypeSerializer[_])]
) extends TypedObjectBasedTypeSerializer[Map[String, _ <: AnyRef]](serializers)
with LazyLogging {
@@ -36,20 +33,17 @@ case class TypedScalaMapSerializer(
override def get(value: Map[String, _ <: AnyRef], k: String): AnyRef = value.getOrElse(k, null)
override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[Map[String, _ <: AnyRef]] =
- TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
+ TypedScalaMapSerializer(serializers)
override def createInstance(): Map[String, _ <: AnyRef] = Map.empty
override def snapshotConfiguration(
snapshots: Array[(String, TypeSerializerSnapshot[_])]
- ): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) {
- override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult =
- buildIntermediateSchemaCompatibilityResultFunction
- }
+ ): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots)
}
-abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] {
+final class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] {
def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
this()
@@ -59,6 +53,6 @@ abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerialize
override protected def restoreSerializer(
restored: Array[(String, TypeSerializer[_])]
): TypeSerializer[Map[String, _ <: AnyRef]] =
- TypedScalaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult)
+ TypedScalaMapSerializer(restored)
}
diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala
index 8ed2795c1d6..f0b9b72f8c7 100644
--- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala
+++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala
@@ -264,20 +264,17 @@ class TypingResultAwareTypeInformationDetectionSpec
}
private def assertNested(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_] => Assertion)*): Unit = {
- inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) {
- case TypedScalaMapSerializer(array, _) =>
- array.zipAll(nested.toList, null, null).foreach {
- case ((name, serializer), (expectedName, expectedSerializer)) =>
- name shouldBe expectedName
- expectedSerializer(serializer)
- }
+ inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) =>
+ array.zipAll(nested.toList, null, null).foreach { case ((name, serializer), (expectedName, expectedSerializer)) =>
+ name shouldBe expectedName
+ expectedSerializer(serializer)
+ }
}
}
private def assertMapSerializers(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_])*) = {
- inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) {
- case TypedScalaMapSerializer(array, _) =>
- array.toList shouldBe nested.toList
+ inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) =>
+ array.toList shouldBe nested.toList
}
}
diff --git a/examples/installation/docker-compose.yml b/examples/installation/docker-compose.yml
index a8e977f117a..8bc5c3e84ad 100644
--- a/examples/installation/docker-compose.yml
+++ b/examples/installation/docker-compose.yml
@@ -40,7 +40,7 @@ services:
SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
INFLUXDB_URL: "http://influxdb:8086"
FLINK_REST_URL: "http://flink-jobmanager:8081"
- JDK_JAVA_OPTIONS: "-Xmx1024M"
+ JDK_JAVA_OPTIONS: "-Xmx400M -XX:MaxMetaspaceSize=300M -XX:MaxDirectMemorySize=100M"
USAGE_REPORTS_SOURCE: "example-installation-docker-compose"
depends_on:
postgres:
@@ -241,7 +241,7 @@ services:
deploy:
resources:
limits:
- memory: 1024M
+ memory: 1500M
telegraf:
image: telegraf:1.30.2
diff --git a/examples/installation/flink/flink-properties.yml b/examples/installation/flink/flink-properties.yml
index dd0b7fed688..c64ec68ca3e 100644
--- a/examples/installation/flink/flink-properties.yml
+++ b/examples/installation/flink/flink-properties.yml
@@ -1,4 +1,8 @@
taskmanager.numberOfTaskSlots: 8
+# Nu requires a little bit more metaspace than Flink default allocate based on process size
+taskmanager.memory.process.size: 1500m
+taskmanager.memory.jvm-metaspace.size: 400m
+
state.backend.type: filesystem
state.checkpoints.dir: file:///opt/flink/data/checkpoints
state.savepoints.dir: file:///opt/flink/data/savepoints
diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala
index a521545c97f..40fd86d1d1f 100644
--- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala
+++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala
@@ -31,6 +31,7 @@ import pl.touk.nussknacker.engine.spel.SpelExpressionParseError.ExpressionCompil
import pl.touk.nussknacker.engine.spel.SpelExpressionParser.Flavour
import pl.touk.nussknacker.engine.spel.internal.EvaluationContextPreparer
+import java.util.concurrent.atomic.AtomicBoolean
import scala.util.control.NonFatal
/**
@@ -51,11 +52,24 @@ final case class ParsedSpelExpression(
parser: () => ValidatedNel[ExpressionParseError, Expression],
initial: Expression
) extends LazyLogging {
- @volatile var parsed: Expression = initial
+ @volatile var parsed: Expression = initial
+ private val firstInterpretationFinished = new AtomicBoolean()
def getValue[T](context: EvaluationContext, desiredResultType: Class[_]): T = {
- def value(): T = parsed.getValue(context, desiredResultType).asInstanceOf[T]
-
+ def value(): T = {
+ // There is a bug in Spring's SpelExpression class: interpretedCount variable is not synchronized with ReflectiveMethodExecutor.didArgumentConversionOccur.
+ // The latter mentioned method check argumentConversionOccurred Boolean which could be false not because conversion not occurred but because method.invoke()
+ // isn't finished yet. Due to this problem an expression that shouldn't be compiled might be compiled. It generates IllegalStateException errors in further evaluations of the expression.
+ if (!firstInterpretationFinished.get()) {
+ synchronized {
+ val valueToReturn = parsed.getValue(context, desiredResultType).asInstanceOf[T]
+ firstInterpretationFinished.set(true)
+ valueToReturn
+ }
+ } else {
+ parsed.getValue(context, desiredResultType).asInstanceOf[T]
+ }
+ }
try {
value()
} catch {
diff --git a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala
index 03a1c429cb2..d8ce5731bc1 100644
--- a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala
+++ b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala
@@ -11,6 +11,7 @@ import org.scalatest.OptionValues
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
import org.springframework.util.{NumberUtils, StringUtils}
import pl.touk.nussknacker.engine.api.context.ValidationContext
@@ -67,11 +68,14 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.time.chrono.{ChronoLocalDate, ChronoLocalDateTime}
import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneId, ZoneOffset}
import java.util
+import java.util.concurrent.Executors
import java.util.{Collections, Currency, List => JList, Locale, Map => JMap, Optional, UUID}
import scala.annotation.varargs
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.jdk.CollectionConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe._
+import scala.util.{Failure, Success}
class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesDetailedMessage with OptionValues {
@@ -2052,6 +2056,50 @@ class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesD
parsed.evaluateSync[Any](customCtx) shouldBe 11
}
+ // This test is ignored as it was indeterministic and ugly, but it was used to verify race condition problems on
+ // ParsedSpelExpression.getValue. Without the synchronized block inside its method the test would fail the majority of times
+ ignore(
+ "should not throw 'Failed to instantiate CompiledExpression' when getValue is called on ParsedSpelExpression by multiple threads"
+ ) {
+ val spelExpression =
+ parse[LocalDateTime]("T(java.time.LocalDateTime).now().minusDays(14)", ctx).validValue.expression
+ .asInstanceOf[SpelExpression]
+
+ val threadPool = Executors.newFixedThreadPool(1000)
+ implicit val customExecutionContext: ExecutionContext = ExecutionContext.fromExecutor(threadPool)
+
+ // A promise to signal when an exception occurs
+ val failurePromise = Promise[Unit]()
+
+ val tasks = (1 to 10000).map { _ =>
+ Future {
+ try {
+ Thread.sleep(100)
+ // evaluate calls getValue on problematic SpelExpression object
+ spelExpression.evaluate[LocalDateTime](Context("fooId"), Map.empty)
+ } catch {
+ // The real problematic exception is wrapped in SpelExpressionEvaluationException by evaluate method
+ case e: SpelExpressionEvaluationException =>
+ failurePromise.tryFailure(e.cause)
+ }
+ }
+ }
+ val firstFailureOrCompletion = Future.firstCompletedOf(Seq(Future.sequence(tasks), failurePromise.future))
+
+ firstFailureOrCompletion.onComplete {
+ case Success(_) =>
+ println("All tasks completed successfully.")
+ threadPool.shutdown()
+ case Failure(e: IllegalStateException) if e.getMessage == "Failed to instantiate CompiledExpression" =>
+ fail("Exception occurred due to race condition.", e)
+ threadPool.shutdown()
+ case Failure(e) =>
+ fail("Unknown exception occurred", e)
+ threadPool.shutdown()
+ }
+ Await.result(firstFailureOrCompletion, 15.seconds)
+ }
+
}
case class SampleObject(list: java.util.List[SampleValue])