diff --git a/src/main/scala/net/heartsavior/spark/KafkaOffsetCommitterListener.scala b/src/main/scala/net/heartsavior/spark/KafkaOffsetCommitterListener.scala index 752fe39..305f529 100644 --- a/src/main/scala/net/heartsavior/spark/KafkaOffsetCommitterListener.scala +++ b/src/main/scala/net/heartsavior/spark/KafkaOffsetCommitterListener.scala @@ -38,8 +38,8 @@ class KafkaOffsetCommitterListener extends StreamingQueryListener with Logging { val query = SparkSession.active.streams.get(event.progress.id) if (query != null) { val exec = query match { - case query: StreamingQueryWrapper => Some(query.streamingQuery.lastExecution) - case query: StreamExecution => Some(query.lastExecution) + case query: StreamingQueryWrapper => Option(query.streamingQuery.lastExecution) + case query: StreamExecution => Option(query.lastExecution) case _ => logWarning(s"Unexpected type of streaming query: ${query.getClass}") None