From ff3b5447662fe758ede7516ffd0705b956dd3226 Mon Sep 17 00:00:00 2001 From: kongming Date: Tue, 30 Jul 2019 20:14:46 +0800 Subject: [PATCH 1/2] support Springboot style demo --- kafka-spring-demo/vpc-ssl/pom.xml | 115 ++++++++++-------- .../kafka/ons/KafkaSpringbootDemo.java | 62 ++++++++++ .../src/main/resources/application.yml | 30 +++++ 3 files changed, 158 insertions(+), 49 deletions(-) create mode 100644 kafka-spring-demo/vpc-ssl/src/main/java/com/aliyun/openservices/kafka/ons/KafkaSpringbootDemo.java create mode 100644 kafka-spring-demo/vpc-ssl/src/main/resources/application.yml diff --git a/kafka-spring-demo/vpc-ssl/pom.xml b/kafka-spring-demo/vpc-ssl/pom.xml index 84e8dc5..6fffa27 100644 --- a/kafka-spring-demo/vpc-ssl/pom.xml +++ b/kafka-spring-demo/vpc-ssl/pom.xml @@ -11,11 +11,18 @@ 4.3.2.RELEASE + + + org.springframework.boot + spring-boot-starter-parent + 1.5.2.RELEASE + + + org.springframework.kafka spring-kafka - 1.2.3.RELEASE @@ -25,66 +32,71 @@ + + + org.springframework.boot + spring-boot-starter-web + org.apache.kafka kafka-clients 0.10.0.0 - - - org.springframework - spring-core - ${spring.version} - + + + + + + - - org.springframework - spring-web - ${spring.version} - + + + + + - - org.springframework - spring-oxm - ${spring.version} - + + + + + - - org.springframework - spring-tx - ${spring.version} - + + + + + - - org.springframework - spring-jdbc - ${spring.version} - + + + + + - - org.springframework - spring-webmvc - ${spring.version} - + + + + + - - org.springframework - spring-aop - ${spring.version} - + + + + + - - org.springframework - spring-context-support - ${spring.version} - + + + + + - - org.springframework - spring-test - ${spring.version} - + + + + + org.slf4j @@ -104,8 +116,8 @@ maven-compiler-plugin 2.3.2 - 1.6 - 1.6 + 1.8 + 1.8 utf-8 @@ -136,6 +148,11 @@ + + org.springframework.boot + spring-boot-maven-plugin + + diff --git a/kafka-spring-demo/vpc-ssl/src/main/java/com/aliyun/openservices/kafka/ons/KafkaSpringbootDemo.java b/kafka-spring-demo/vpc-ssl/src/main/java/com/aliyun/openservices/kafka/ons/KafkaSpringbootDemo.java new file mode 100644 index 0000000..467a6ff --- /dev/null +++ b/kafka-spring-demo/vpc-ssl/src/main/java/com/aliyun/openservices/kafka/ons/KafkaSpringbootDemo.java @@ -0,0 +1,62 @@ +package com.aliyun.openservices.kafka.ons; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; + +import java.util.Optional; + +@SpringBootApplication +public class KafkaSpringbootDemo { + + public static void main(String[] args) throws Exception { + System.setProperty("java.security.auth.login.config", KafkaSpringbootDemo.class.getClassLoader().getResource("kafka_client_jaas.conf").getPath()); + ConfigurableApplicationContext context = SpringApplication.run(KafkaSpringbootDemo.class, args); + //给Consumer初始化一些时间,以便从latest开始消费 + Thread.sleep(6000); + + context.getBean(MessageSendService.class).sendOneMessage(); + + Thread.sleep(3000); + System.exit(0); + } + + @Component + public static class KafkaReceiveService { + + @KafkaListener(topics = {"kongming_test"}) + public void listen(ConsumerRecord record) { + + Optional kafkaMessage = Optional.ofNullable(record.value()); + + if (kafkaMessage.isPresent()) { + + Object message = kafkaMessage.get(); + + System.out.println("----------------- record =" + record); + System.out.println("------------------ message =" + message); + } + + } + } + + @Component + public static class MessageSendService { + + @Autowired + private KafkaTemplate kafkaTemplate; + + public void sendOneMessage() throws Exception { + ListenableFuture> result = kafkaTemplate.send("kongming_test", "hello aliKafka"); + kafkaTemplate.flush(); + System.out.println("producer send ok " + result.get().getProducerRecord()); + } + } +} \ No newline at end of file diff --git a/kafka-spring-demo/vpc-ssl/src/main/resources/application.yml b/kafka-spring-demo/vpc-ssl/src/main/resources/application.yml new file mode 100644 index 0000000..ac6336d --- /dev/null +++ b/kafka-spring-demo/vpc-ssl/src/main/resources/application.yml @@ -0,0 +1,30 @@ +spring: + kafka: + bootstrap-servers: 121.199.59.159:9093,47.97.205.122:9093,47.111.96.104:9093 + + properties: + ssl.truststore.location: /Users/kongming.lrq/sources/aliware-kafka-demos/kafka-java-demo/vpc-ssl/src/main/resources/kafka.client.truststore.jks + ssl.truststore.password: KafkaOnsClient + security.protocol: SASL_SSL + sasl.mechanism: PLAIN + + producer: + retries: 10 + batch-size: 16384 + buffer-memory: 33554432 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + properties: + linger.ms: 1 + + consumer: + group-id: kongming_test_cg + enable-auto-commit: false + auto-commit-interval: 100 + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + session.timeout.ms: 15000 + + + From caa98722fe72f19f00b36f04bf722a03288e9add Mon Sep 17 00:00:00 2001 From: kongming Date: Tue, 30 Jul 2019 20:18:24 +0800 Subject: [PATCH 2/2] remove local info with XXX --- .../aliyun/openservices/kafka/ons/KafkaSpringbootDemo.java | 4 ++-- .../vpc-ssl/src/main/resources/application.yml | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka-spring-demo/vpc-ssl/src/main/java/com/aliyun/openservices/kafka/ons/KafkaSpringbootDemo.java b/kafka-spring-demo/vpc-ssl/src/main/java/com/aliyun/openservices/kafka/ons/KafkaSpringbootDemo.java index 467a6ff..243bb2a 100644 --- a/kafka-spring-demo/vpc-ssl/src/main/java/com/aliyun/openservices/kafka/ons/KafkaSpringbootDemo.java +++ b/kafka-spring-demo/vpc-ssl/src/main/java/com/aliyun/openservices/kafka/ons/KafkaSpringbootDemo.java @@ -31,7 +31,7 @@ public static void main(String[] args) throws Exception { @Component public static class KafkaReceiveService { - @KafkaListener(topics = {"kongming_test"}) + @KafkaListener(topics = {"XXXX"}) public void listen(ConsumerRecord record) { Optional kafkaMessage = Optional.ofNullable(record.value()); @@ -54,7 +54,7 @@ public static class MessageSendService { private KafkaTemplate kafkaTemplate; public void sendOneMessage() throws Exception { - ListenableFuture> result = kafkaTemplate.send("kongming_test", "hello aliKafka"); + ListenableFuture> result = kafkaTemplate.send("XXXXX", "hello aliKafka"); kafkaTemplate.flush(); System.out.println("producer send ok " + result.get().getProducerRecord()); } diff --git a/kafka-spring-demo/vpc-ssl/src/main/resources/application.yml b/kafka-spring-demo/vpc-ssl/src/main/resources/application.yml index ac6336d..6b1fa26 100644 --- a/kafka-spring-demo/vpc-ssl/src/main/resources/application.yml +++ b/kafka-spring-demo/vpc-ssl/src/main/resources/application.yml @@ -1,9 +1,9 @@ spring: kafka: - bootstrap-servers: 121.199.59.159:9093,47.97.205.122:9093,47.111.96.104:9093 + bootstrap-servers: XXXXX properties: - ssl.truststore.location: /Users/kongming.lrq/sources/aliware-kafka-demos/kafka-java-demo/vpc-ssl/src/main/resources/kafka.client.truststore.jks + ssl.truststore.location: XXX/kafka.client.truststore.jks ssl.truststore.password: KafkaOnsClient security.protocol: SASL_SSL sasl.mechanism: PLAIN @@ -18,7 +18,7 @@ spring: linger.ms: 1 consumer: - group-id: kongming_test_cg + group-id: XXXX enable-auto-commit: false auto-commit-interval: 100 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer