Skip to content

Commit

Permalink
Merge pull request #7281 from TouK/1.18-ports9
Browse files Browse the repository at this point in the history
1.18 ports9
  • Loading branch information
arkadius authored Dec 3, 2024
2 parents 5a4bbe5 + 4531af0 commit ed41530
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ package object definition {
branchParam: Boolean,
hintText: Option[String],
label: String,
// This attribute is used only by external project
requiredParam: Boolean,
// This attribute is in use only by the external project and was introduced in the 1.18 version
// The option is for decoder backward compatibility to decode responses from older versions
// The option can be removed in future releases
requiredParam: Option[Boolean],
)

@JsonCodec(encodeOnly = true) final case class UIComponentDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object DefinitionsService {
branchParam = parameter.branchParam,
hintText = parameter.hintText,
label = parameter.label,
requiredParam = !parameter.isOptional,
requiredParam = Some(!parameter.isOptional),
)
}

Expand Down
5 changes: 3 additions & 2 deletions docs-internal/api/nu-designer-openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6993,7 +6993,6 @@ components:
- additionalVariables
- branchParam
- label
- requiredParam
properties:
name:
type: string
Expand Down Expand Up @@ -7027,7 +7026,9 @@ components:
label:
type: string
requiredParam:
type: boolean
type:
- boolean
- 'null'
UIValueParameterDto:
title: UIValueParameterDto
type: object
Expand Down
14 changes: 9 additions & 5 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@
* Flink upgrade to 1.19.1. Note: it is possible to use Nussknacker with older versions of Flink, but it requires some extra steps. See [Migration guide](MigrationGuide.md) for details.
* Performance optimisations of the serialisation of events passing through Flink's `DataStream`s.

### 1.18.1 (Not released yet)

* [#7207](https://github.com/TouK/nussknacker/pull/7207) Fixed minor clipboard, keyboard and focus related bugs
* [#7237](https://github.com/TouK/nussknacker/pull/7237) Fix: ToJsonEncoder keeps order fields during encoding map
* [#7240](https://github.com/TouK/nussknacker/pull/7240) Fixed race condition problem during SpEL expression evaluation
* [#7269](https://github.com/TouK/nussknacker/pull/7269) Fixed focus scrolling in expression editor
* [#7270](https://github.com/TouK/nussknacker/pull/7270) Savepoint deserialization fixup - some taken savepoints (e.g. for scenarios with async enrichers) were not deserializable which led to errors during redeployments on Flink
* [#7279](https://github.com/TouK/nussknacker/pull/7279) Fixed Flink TaskManager and Designer containers restarts in installation example

### 1.18.0 (22 November 2024)

* [#6944](https://github.com/TouK/nussknacker/pull/6944) [#7166](https://github.com/TouK/nussknacker/pull/7166) Changes around adhoc testing feature
Expand Down Expand Up @@ -132,11 +141,6 @@
* [#7190](https://github.com/TouK/nussknacker/pull/7190) Fix "Failed to get node validation" when opening fragment node details for referencing non-existing fragment
* [#7215](https://github.com/TouK/nussknacker/pull/7215) Change typing text to spinner during validation and provide delayed adding on enter until validation finishes in a scenario labels and fragment input

### 1.18.1 (Not released yet)

* [#7207](https://github.com/TouK/nussknacker/pull/7207) Fixed minor clipboard, keyboard and focus related bugs
* [#7270](https://github.com/TouK/nussknacker/pull/7270) Savepoint deserialization fixup - some taken savepoints (e.g. for scenarios with async enrichers) were not deserializable which led to errors during redeployments on Flink

## 1.17

#### Highlights
Expand Down
4 changes: 2 additions & 2 deletions examples/installation/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ services:
SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
INFLUXDB_URL: "http://influxdb:8086"
FLINK_REST_URL: "http://flink-jobmanager:8081"
JDK_JAVA_OPTIONS: "-Xmx1024M"
JDK_JAVA_OPTIONS: "-Xmx400M -XX:MaxMetaspaceSize=300M -XX:MaxDirectMemorySize=100M"
USAGE_REPORTS_SOURCE: "example-installation-docker-compose"
depends_on:
postgres:
Expand Down Expand Up @@ -241,7 +241,7 @@ services:
deploy:
resources:
limits:
memory: 1024M
memory: 1500M

telegraf:
image: telegraf:1.30.2
Expand Down
4 changes: 4 additions & 0 deletions examples/installation/flink/flink-properties.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
taskmanager.numberOfTaskSlots: 8
# Nu requires a little bit more metaspace than Flink default allocate based on process size
taskmanager.memory.process.size: 1500m
taskmanager.memory.jvm-metaspace.size: 400m

state.backend.type: filesystem
state.checkpoints.dir: file:///opt/flink/data/checkpoints
state.savepoints.dir: file:///opt/flink/data/savepoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import pl.touk.nussknacker.engine.spel.SpelExpressionParseError.ExpressionCompil
import pl.touk.nussknacker.engine.spel.SpelExpressionParser.Flavour
import pl.touk.nussknacker.engine.spel.internal.EvaluationContextPreparer

import java.util.concurrent.atomic.AtomicBoolean
import scala.util.control.NonFatal

/**
Expand All @@ -52,11 +53,24 @@ final case class ParsedSpelExpression(
parser: () => ValidatedNel[ExpressionParseError, Expression],
initial: Expression
) extends LazyLogging {
@volatile var parsed: Expression = initial
@volatile var parsed: Expression = initial
private val firstInterpretationFinished = new AtomicBoolean()

def getValue[T](context: EvaluationContext, desiredResultType: Class[_]): T = {
def value(): T = parsed.getValue(context, desiredResultType).asInstanceOf[T]

def value(): T = {
// There is a bug in Spring's SpelExpression class: interpretedCount variable is not synchronized with ReflectiveMethodExecutor.didArgumentConversionOccur.
// The latter mentioned method check argumentConversionOccurred Boolean which could be false not because conversion not occurred but because method.invoke()
// isn't finished yet. Due to this problem an expression that shouldn't be compiled might be compiled. It generates IllegalStateException errors in further evaluations of the expression.
if (!firstInterpretationFinished.get()) {
synchronized {
val valueToReturn = parsed.getValue(context, desiredResultType).asInstanceOf[T]
firstInterpretationFinished.set(true)
valueToReturn
}
} else {
parsed.getValue(context, desiredResultType).asInstanceOf[T]
}
}
try {
value()
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.scalatest.OptionValues
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
import org.springframework.util.{NumberUtils, StringUtils}
import pl.touk.nussknacker.engine.api.context.ValidationContext
Expand Down Expand Up @@ -67,11 +68,14 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.time.chrono.{ChronoLocalDate, ChronoLocalDateTime}
import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneId, ZoneOffset}
import java.util
import java.util.concurrent.Executors
import java.util.{Collections, Currency, List => JList, Locale, Map => JMap, Optional, UUID}
import scala.annotation.varargs
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.jdk.CollectionConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe._
import scala.util.{Failure, Success}

class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesDetailedMessage with OptionValues {

Expand Down Expand Up @@ -2057,6 +2061,50 @@ class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesD
parsed.evaluateSync[Any](customCtx) shouldBe 11
}

// This test is ignored as it was indeterministic and ugly, but it was used to verify race condition problems on
// ParsedSpelExpression.getValue. Without the synchronized block inside its method the test would fail the majority of times
ignore(
"should not throw 'Failed to instantiate CompiledExpression' when getValue is called on ParsedSpelExpression by multiple threads"
) {
val spelExpression =
parse[LocalDateTime]("T(java.time.LocalDateTime).now().minusDays(14)", ctx).validValue.expression
.asInstanceOf[SpelExpression]

val threadPool = Executors.newFixedThreadPool(1000)
implicit val customExecutionContext: ExecutionContext = ExecutionContext.fromExecutor(threadPool)

// A promise to signal when an exception occurs
val failurePromise = Promise[Unit]()

val tasks = (1 to 10000).map { _ =>
Future {
try {
Thread.sleep(100)
// evaluate calls getValue on problematic SpelExpression object
spelExpression.evaluate[LocalDateTime](Context("fooId"), Map.empty)
} catch {
// The real problematic exception is wrapped in SpelExpressionEvaluationException by evaluate method
case e: SpelExpressionEvaluationException =>
failurePromise.tryFailure(e.cause)
}
}
}
val firstFailureOrCompletion = Future.firstCompletedOf(Seq(Future.sequence(tasks), failurePromise.future))

firstFailureOrCompletion.onComplete {
case Success(_) =>
println("All tasks completed successfully.")
threadPool.shutdown()
case Failure(e: IllegalStateException) if e.getMessage == "Failed to instantiate CompiledExpression" =>
fail("Exception occurred due to race condition.", e)
threadPool.shutdown()
case Failure(e) =>
fail("Unknown exception occurred", e)
threadPool.shutdown()
}
Await.result(firstFailureOrCompletion, 15.seconds)
}

}

case class SampleObject(list: java.util.List[SampleValue])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ import java.time.format.DateTimeFormatter
import io.circe.{Encoder, Json}
import io.circe.Json._
import pl.touk.nussknacker.engine.api.DisplayJson
import pl.touk.nussknacker.engine.util.Implicits._

import java.util.ServiceLoader

import java.util.UUID
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -89,9 +86,10 @@ case class ToJsonEncoder(
// toString on keys.
private def encodeMap(map: Map[_, _]) = {
val mapWithStringKeys = map.view.map { case (k, v) =>
k.toString -> v
}.toMap
fromFields(mapWithStringKeys.mapValuesNow(encode))
k.toString -> encode(v)
}

fromFields(mapWithStringKeys)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ class ToJsonEncoderSpec extends AnyFunSpec with Matchers {

}

it("should convert map to json and keep order of keys") {
val map = ListMap(
"intNumber" -> 42,
"floatNumber" -> 42.42,
"someTimestamp" -> 1496930555793L,
"someString" -> "hello",
"booleanValue" -> true
)

val expectedJson =
"""{"intNumber":42,"floatNumber":42.42,"someTimestamp":1496930555793,"someString":"hello","booleanValue":true}"""

// We compare string because we want to check the order
encoder.encode(map).noSpaces shouldBe expectedJson
}

}

class CustomJsonEncoderCustomisation1 extends ToJsonEncoderCustomisation {
Expand Down

0 comments on commit ed41530

Please sign in to comment.