diff --git a/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala b/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala index 9e229a316..61802ee51 100644 --- a/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala +++ b/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala @@ -69,7 +69,7 @@ class MqttManager( actualTopic.matches( subscribedTopic.replaceAll("\\$share/.*?/", "") .replaceAll("\\+", "[^/]+") - .replaceAll("#", ".+") + .replaceAll("#", ".*") .replace("$", ".+"), ) @@ -172,9 +172,12 @@ class MqttManager( } object MqttManager { - def replaceSlashes(kcql: Kcql, topic: String): String = + def replaceSlashes(kcql: Kcql, topic: String): String = { + val sanitizedTopic = topic.replaceFirst("^/", "").replaceAll("/+", "_").replaceAll("/", "_") kcql.getTarget match { - case "$" => topic.replaceFirst("^/", "").replaceAll("/+", "_").replaceAll("/", "_") - case other => other + case "$" => sanitizedTopic + case other if other.contains("$") => other.replace("$", sanitizedTopic) + case other => other } + } } diff --git a/kafka-connect-mqtt/src/test/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManagerTest.scala b/kafka-connect-mqtt/src/test/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManagerTest.scala index b0fa77e5f..b96858e82 100644 --- a/kafka-connect-mqtt/src/test/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManagerTest.scala +++ b/kafka-connect-mqtt/src/test/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManagerTest.scala @@ -51,6 +51,28 @@ class MqttManagerTest extends AnyWordSpec with Matchers { MqttManager.replaceSlashes(kcql, source) should equal(expected) } } + "replace $ placeholder in a prefixed target with the sanitized MQTT topic" in { + val sources = Array( + "topic/my-device", + "topic/living_room/sensor", + ) + + val kcqls = Kcql.parseMultiple( + "INSERT INTO `prefix_$` SELECT * FROM topic/my-device; INSERT INTO `prefix_$` SELECT * FROM topic/living_room/sensor;", + ) + + val expectedTargets = Array( + "prefix_topic_my-device", + "prefix_topic_living_room_sensor", + ) + + val cases = kcqls.toArray.lazyZip(sources).lazyZip(expectedTargets) + + cases.map { + case (kcql: Kcql, source: String, expected: String) => + MqttManager.replaceSlashes(kcql, source) should equal(expected) + } + } "do nothing in other cases and return the actual targeted topic" in { val sources = Array("xyz", "zyx") diff --git a/kafka-connect-sql-common/src/main/scala/io/lenses/streamreactor/connect/converters/source/JsonSimpleConverter.scala b/kafka-connect-sql-common/src/main/scala/io/lenses/streamreactor/connect/converters/source/JsonSimpleConverter.scala index 66d7d5acc..bb3924cf2 100644 --- a/kafka-connect-sql-common/src/main/scala/io/lenses/streamreactor/connect/converters/source/JsonSimpleConverter.scala +++ b/kafka-connect-sql-common/src/main/scala/io/lenses/streamreactor/connect/converters/source/JsonSimpleConverter.scala @@ -117,17 +117,20 @@ object JsonSimpleConverter extends StrictLogging { case JObject(values) => handleObject(name, values) case other => throw new IllegalStateException(s"$other Not handled in match") } - private def handleArray(name: String, arr: List[_root_.org.json4s.JsonAST.JValue]) = { - val values = new util.ArrayList[AnyRef]() - val sv = convert(name, arr.head) - values.add(sv.value()) - arr.tail.foreach { v => - values.add(convert(name, v).value()) + private def handleArray(name: String, arr: List[_root_.org.json4s.JsonAST.JValue]) = + arr match { + case Nil => + new SchemaAndValue(SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), new util.ArrayList[AnyRef]()) + case head :: tail => + val values = new util.ArrayList[AnyRef]() + val sv = convert(name, head) + values.add(sv.value()) + tail.foreach { v => + values.add(convert(name, v).value()) + } + val schema = SchemaBuilder.array(sv.schema()).optional().build() + new SchemaAndValue(schema, values) } - - val schema = SchemaBuilder.array(sv.schema()).optional().build() - new SchemaAndValue(schema, values) - } private def handleObject(name: String, values: List[(String, json4s.JValue)]) = { val builder = SchemaBuilder.struct().name(name.replace("/", "_")) val fields = values.map { diff --git a/kafka-connect-sql-common/src/test/scala/io/lenses/streamreactor/connect/converters/source/JsonSimpleConverterTest.scala b/kafka-connect-sql-common/src/test/scala/io/lenses/streamreactor/connect/converters/source/JsonSimpleConverterTest.scala index c3f1abe1c..2aa6c3179 100644 --- a/kafka-connect-sql-common/src/test/scala/io/lenses/streamreactor/connect/converters/source/JsonSimpleConverterTest.scala +++ b/kafka-connect-sql-common/src/test/scala/io/lenses/streamreactor/connect/converters/source/JsonSimpleConverterTest.scala @@ -16,7 +16,9 @@ package io.lenses.streamreactor.connect.converters.source import io.lenses.streamreactor.common.converters.MsgKey +import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.errors.ConnectException import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -28,7 +30,8 @@ class JsonSimpleConverterTest extends AnyWordSpec with Matchers { val sourceTopic = "source_topic" "JsonSimpleConverter" should { - "convert from json to the struct" in { + + "convert a flat JSON object to a Struct" in { val car = Car("LaFerrari", "Ferrari", 2015, 963, 0.0001) val json = JacksonJson.toJson(car) val converter = new JsonSimpleConverter @@ -52,5 +55,93 @@ class JsonSimpleConverterTest extends AnyWordSpec with Matchers { record.sourcePartition() shouldBe Collections.singletonMap(Converter.TopicKey, sourceTopic) record.sourceOffset() shouldBe null } + + "convert an empty JSON array without throwing" in { + val sv = JsonSimpleConverter.convert(sourceTopic, "[]") + sv.schema().`type`() shouldBe Schema.Type.ARRAY + sv.schema().valueSchema() shouldBe Schema.OPTIONAL_STRING_SCHEMA + sv.schema().isOptional shouldBe true + sv.value().asInstanceOf[java.util.List[_]] shouldBe empty + } + + "convert a non-empty JSON array of strings" in { + val sv = JsonSimpleConverter.convert(sourceTopic, """["a","b","c"]""") + val items = sv.value().asInstanceOf[java.util.List[_]].asScala + sv.schema().`type`() shouldBe Schema.Type.ARRAY + items shouldBe List("a", "b", "c") + } + + "convert a non-empty JSON array of objects" in { + val sv = JsonSimpleConverter.convert(sourceTopic, """[{"x":1},{"x":2}]""") + val items = sv.value().asInstanceOf[java.util.List[_]].asScala + sv.schema().`type`() shouldBe Schema.Type.ARRAY + items should have size 2 + items.head.asInstanceOf[Struct].get("x") shouldBe 1L + } + + "convert a JSON object with an empty array field without throwing" in { + val sv = JsonSimpleConverter.convert(sourceTopic, """{"name":"bridge","extensions":[]}""") + val struct = sv.value().asInstanceOf[Struct] + struct.get("name") shouldBe "bridge" + struct.getArray("extensions") shouldBe empty + } + + "convert a JSON object with a nested empty array field without throwing" in { + val json = """{"level1":{"items":[],"count":0}}""" + val sv = JsonSimpleConverter.convert(sourceTopic, json) + val l1 = sv.value().asInstanceOf[Struct].get("level1").asInstanceOf[Struct] + l1.getArray("items") shouldBe empty + l1.get("count") shouldBe 0L + } + + "convert scalar JSON types correctly via JValue overload" in { + import org.json4s._ + JsonSimpleConverter.convert(sourceTopic, JBool(true)).value() shouldBe true + JsonSimpleConverter.convert(sourceTopic, JBool(false)).value() shouldBe false + JsonSimpleConverter.convert(sourceTopic, JInt(42)).value() shouldBe 42L + JsonSimpleConverter.convert(sourceTopic, JDouble(3.14)).value() shouldBe 3.14 + JsonSimpleConverter.convert(sourceTopic, JString("hello")).value() shouldBe "hello" + JsonSimpleConverter.convert(sourceTopic, JNull).value() shouldBe null + } + + "convert a JSON object containing scalar fields of all types" in { + val json = """{"flag":true,"count":42,"ratio":3.14,"label":"hello","nothing":null}""" + val sv = JsonSimpleConverter.convert(sourceTopic, json) + val s = sv.value().asInstanceOf[Struct] + s.get("flag") shouldBe true + s.get("count") shouldBe 42L + s.get("ratio") shouldBe 3.14 + s.get("label") shouldBe "hello" + s.get("nothing") shouldBe null + } + + "throw ConnectException on invalid JSON" in { + val converter = new JsonSimpleConverter + intercept[ConnectException] { + converter.convert(topic, sourceTopic, "1", "not json {{{".getBytes) + } + } + + "throw ConnectException on null input" in { + val converter = new JsonSimpleConverter + intercept[ConnectException] { + converter.convert(topic, sourceTopic, "1", null) + } + } + + "use the source topic as the Kafka record key when no keys are specified" in { + val json = """{"id":"abc","value":1}""" + val converter = new JsonSimpleConverter + val record = converter.convert(topic, sourceTopic, "1", json.getBytes) + record.key() shouldBe MsgKey.getStruct(sourceTopic, "1") + } + + "extract a field value as the Kafka record key when keys are specified" in { + val json = """{"id":"abc","value":1}""" + val converter = new JsonSimpleConverter + val record = converter.convert(topic, sourceTopic, "1", json.getBytes, keys = Seq("id")) + record.key() shouldBe "abc" + record.keySchema() shouldBe Schema.STRING_SCHEMA + } } }