Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MqttManager(
actualTopic.matches(
subscribedTopic.replaceAll("\\$share/.*?/", "")
.replaceAll("\\+", "[^/]+")
.replaceAll("#", ".+")
.replaceAll("#", ".*")
.replace("$", ".+"),
)

Expand Down Expand Up @@ -172,9 +172,12 @@ class MqttManager(
}

object MqttManager {
def replaceSlashes(kcql: Kcql, topic: String): String =
def replaceSlashes(kcql: Kcql, topic: String): String = {
val sanitizedTopic = topic.replaceFirst("^/", "").replaceAll("/+", "_").replaceAll("/", "_")
kcql.getTarget match {
case "$" => topic.replaceFirst("^/", "").replaceAll("/+", "_").replaceAll("/", "_")
case other => other
case "$" => sanitizedTopic
case other if other.contains("$") => other.replace("$", sanitizedTopic)
case other => other
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,28 @@ class MqttManagerTest extends AnyWordSpec with Matchers {
MqttManager.replaceSlashes(kcql, source) should equal(expected)
}
}
"replace $ placeholder in a prefixed target with the sanitized MQTT topic" in {
val sources = Array(
"topic/my-device",
"topic/living_room/sensor",
)

val kcqls = Kcql.parseMultiple(
"INSERT INTO `prefix_$` SELECT * FROM topic/my-device; INSERT INTO `prefix_$` SELECT * FROM topic/living_room/sensor;",
)

val expectedTargets = Array(
"prefix_topic_my-device",
"prefix_topic_living_room_sensor",
)

val cases = kcqls.toArray.lazyZip(sources).lazyZip(expectedTargets)

cases.map {
case (kcql: Kcql, source: String, expected: String) =>
MqttManager.replaceSlashes(kcql, source) should equal(expected)
}
}
"do nothing in other cases and return the actual targeted topic" in {

val sources = Array("xyz", "zyx")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,20 @@ object JsonSimpleConverter extends StrictLogging {
case JObject(values) => handleObject(name, values)
case other => throw new IllegalStateException(s"$other Not handled in match")
}
private def handleArray(name: String, arr: List[_root_.org.json4s.JsonAST.JValue]) = {
val values = new util.ArrayList[AnyRef]()
val sv = convert(name, arr.head)
values.add(sv.value())
arr.tail.foreach { v =>
values.add(convert(name, v).value())
private def handleArray(name: String, arr: List[_root_.org.json4s.JsonAST.JValue]) =
arr match {
case Nil =>
new SchemaAndValue(SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), new util.ArrayList[AnyRef]())
case head :: tail =>
val values = new util.ArrayList[AnyRef]()
val sv = convert(name, head)
values.add(sv.value())
tail.foreach { v =>
values.add(convert(name, v).value())
}
val schema = SchemaBuilder.array(sv.schema()).optional().build()
new SchemaAndValue(schema, values)
}

val schema = SchemaBuilder.array(sv.schema()).optional().build()
new SchemaAndValue(schema, values)
}
private def handleObject(name: String, values: List[(String, json4s.JValue)]) = {
val builder = SchemaBuilder.struct().name(name.replace("/", "_"))
val fields = values.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.lenses.streamreactor.connect.converters.source

import io.lenses.streamreactor.common.converters.MsgKey
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.ConnectException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

Expand All @@ -28,7 +30,8 @@ class JsonSimpleConverterTest extends AnyWordSpec with Matchers {
val sourceTopic = "source_topic"

"JsonSimpleConverter" should {
"convert from json to the struct" in {

"convert a flat JSON object to a Struct" in {
val car = Car("LaFerrari", "Ferrari", 2015, 963, 0.0001)
val json = JacksonJson.toJson(car)
val converter = new JsonSimpleConverter
Expand All @@ -52,5 +55,93 @@ class JsonSimpleConverterTest extends AnyWordSpec with Matchers {
record.sourcePartition() shouldBe Collections.singletonMap(Converter.TopicKey, sourceTopic)
record.sourceOffset() shouldBe null
}

"convert an empty JSON array without throwing" in {
val sv = JsonSimpleConverter.convert(sourceTopic, "[]")
sv.schema().`type`() shouldBe Schema.Type.ARRAY
sv.schema().valueSchema() shouldBe Schema.OPTIONAL_STRING_SCHEMA
sv.schema().isOptional shouldBe true
sv.value().asInstanceOf[java.util.List[_]] shouldBe empty
}

"convert a non-empty JSON array of strings" in {
val sv = JsonSimpleConverter.convert(sourceTopic, """["a","b","c"]""")
val items = sv.value().asInstanceOf[java.util.List[_]].asScala
sv.schema().`type`() shouldBe Schema.Type.ARRAY
items shouldBe List("a", "b", "c")
}

"convert a non-empty JSON array of objects" in {
val sv = JsonSimpleConverter.convert(sourceTopic, """[{"x":1},{"x":2}]""")
val items = sv.value().asInstanceOf[java.util.List[_]].asScala
sv.schema().`type`() shouldBe Schema.Type.ARRAY
items should have size 2
items.head.asInstanceOf[Struct].get("x") shouldBe 1L
}

"convert a JSON object with an empty array field without throwing" in {
val sv = JsonSimpleConverter.convert(sourceTopic, """{"name":"bridge","extensions":[]}""")
val struct = sv.value().asInstanceOf[Struct]
struct.get("name") shouldBe "bridge"
struct.getArray("extensions") shouldBe empty
}

"convert a JSON object with a nested empty array field without throwing" in {
val json = """{"level1":{"items":[],"count":0}}"""
val sv = JsonSimpleConverter.convert(sourceTopic, json)
val l1 = sv.value().asInstanceOf[Struct].get("level1").asInstanceOf[Struct]
l1.getArray("items") shouldBe empty
l1.get("count") shouldBe 0L
}

"convert scalar JSON types correctly via JValue overload" in {
import org.json4s._
JsonSimpleConverter.convert(sourceTopic, JBool(true)).value() shouldBe true
JsonSimpleConverter.convert(sourceTopic, JBool(false)).value() shouldBe false
JsonSimpleConverter.convert(sourceTopic, JInt(42)).value() shouldBe 42L
JsonSimpleConverter.convert(sourceTopic, JDouble(3.14)).value() shouldBe 3.14
JsonSimpleConverter.convert(sourceTopic, JString("hello")).value() shouldBe "hello"
JsonSimpleConverter.convert(sourceTopic, JNull).value() shouldBe null
}

"convert a JSON object containing scalar fields of all types" in {
val json = """{"flag":true,"count":42,"ratio":3.14,"label":"hello","nothing":null}"""
val sv = JsonSimpleConverter.convert(sourceTopic, json)
val s = sv.value().asInstanceOf[Struct]
s.get("flag") shouldBe true
s.get("count") shouldBe 42L
s.get("ratio") shouldBe 3.14
s.get("label") shouldBe "hello"
s.get("nothing") shouldBe null
}

"throw ConnectException on invalid JSON" in {
val converter = new JsonSimpleConverter
intercept[ConnectException] {
converter.convert(topic, sourceTopic, "1", "not json {{{".getBytes)
}
}

"throw ConnectException on null input" in {
val converter = new JsonSimpleConverter
intercept[ConnectException] {
converter.convert(topic, sourceTopic, "1", null)
}
}

"use the source topic as the Kafka record key when no keys are specified" in {
val json = """{"id":"abc","value":1}"""
val converter = new JsonSimpleConverter
val record = converter.convert(topic, sourceTopic, "1", json.getBytes)
record.key() shouldBe MsgKey.getStruct(sourceTopic, "1")
}

"extract a field value as the Kafka record key when keys are specified" in {
val json = """{"id":"abc","value":1}"""
val converter = new JsonSimpleConverter
val record = converter.convert(topic, sourceTopic, "1", json.getBytes, keys = Seq("id"))
record.key() shouldBe "abc"
record.keySchema() shouldBe Schema.STRING_SCHEMA
}
}
}