You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I use MqttSourceConnector and would like to replicate shared MQTT subscription to all workers.
connect.mqtt.clean=true
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=4
connect.mqtt.kcql=INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM $share/g/chat
connect.mqtt.password=***********
connect.mqtt.share.replicate=true
As a result only 1 worker is created. To fix this issue I need update kcql string to the following:
connect.mqtt.kcql=INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM $share/g/chat;INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM $share/g/chat
Just duplicate kcql string with ';' separator that is not good and proper way
Please review these questions before submitting any issue?
What version of the Stream Reactor are you reporting this issue for?
I think if connect.mqtt.share.replicate=true the kcql query should be one like INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM $share/g/chat and replicated to all workers
The text was updated successfully, but these errors were encountered:
The way @f1faust described it, there might be a bug in io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector.scala. Duplication of the KCQL statement seems required to trigger the else-path of if (maxTasks == 1 || kcql.length == 1) in line 52 (4th in snippet), since kcql.length evaluates to 2 in that case:
override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
val settings = MqttSourceSettings(MqttSourceConfig(configProps.asScala.toMap))
val kcql = settings.kcql
if (maxTasks == 1 || kcql.length == 1) {
Collections.singletonList(configProps)
} else {
val groups = kcql.length / maxTasks + kcql.length % maxTasks
// If the option is enabled, copy every KCQL instruction with a shared subscription to every tasks, otherwise
// the shared subscriptions are distributed as every other instructions.
val (replicated, distributed) =
if (settings.replicateShared) kcql.partition(shouldReplicate) else (Array[String](), kcql)
val configs = Array.fill(maxTasks)(replicated)
.zipAll(distributed.grouped(groups).toList, Array[String](), Array[String]())
.map(z => z._2 ++ z._1)
.filter(_.nonEmpty)
.zipWithIndex
.map {
case (p, index) =>
val map = settings.copy(kcql = p, clientId = settings.clientId + "-" + index).asMap()
configProps.asScala
.filterNot { case (k, _) => map.containsKey(k) }
.foreach { case (k, v) => map.put(k, v) }
map
}
configs.toList.asJava
}
}
Not sure I'll find the time to evaluate my suspicion or commit a fix in the short term, but felt like sharing nonetheless.
Issue Guidelines
I use MqttSourceConnector and would like to replicate shared MQTT subscription to all workers.
connect.mqtt.clean=true
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=4
connect.mqtt.kcql=INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM
$share/g/chat
connect.mqtt.password=***********
connect.mqtt.share.replicate=true
As a result only 1 worker is created. To fix this issue I need update kcql string to the following:
connect.mqtt.kcql=INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM
$share/g/chat
;INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM$share/g/chat
Just duplicate kcql string with ';' separator that is not good and proper way
Please review these questions before submitting any issue?
What version of the Stream Reactor are you reporting this issue for?
3.0.1
Have you read the docs?
yes
What is the expected behaviour?
I think if connect.mqtt.share.replicate=true the kcql query should be one like INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM
$share/g/chat
and replicated to all workersThe text was updated successfully, but these errors were encountered: