diff --git a/.gitignore b/.gitignore index 61cb493..e9b00f4 100644 --- a/.gitignore +++ b/.gitignore @@ -7,8 +7,9 @@ Thumbs.db #* *# -# Build Files # +*.log +# Build Files # bin target build diff --git a/assets/03_hands-on.md b/assets/03_hands-on.md index bdf5676..e01e0d0 100644 --- a/assets/03_hands-on.md +++ b/assets/03_hands-on.md @@ -172,7 +172,9 @@ Select a trace and view the results. Notice that the trace now extends from the Let's take it one step further to see what happens when multiple consumers/streams process the same event. The Customer Stream Count can be found on the `solutions` branch so you'll first need to checkout that branch before running the Stream. -> Before continuing, double check that the `tracingEnabled` flag is `true` in [gradle.properties](https://github.com/schroedermatt/stream-processing-workshop/blob/main/gradle.properties#L2) +Before continuing, +* double check that the `tracingEnabled` flag is `true` in [gradle.properties](https://github.com/schroedermatt/stream-processing-workshop/blob/main/gradle.properties#L2) +* kill the "Top Customer Artists" process ```bash # run from the root of the stream-processing-workshop diff --git a/build.gradle b/build.gradle index c1c9e92..ff68b45 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ dockerCompose { tracing { useComposeFiles = [ - './observability/jaeger/docker-compose.yml' + './observability/tempo/docker-compose.yml' ] } } diff --git a/gradle.properties b/gradle.properties index 8e8e371..36bf93b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,10 @@ runtimeMode=kafka # options: true, false -tracingEnabled=false +tracingEnabled=true + +# options: true, false +fileLoggingEnabled=true # initial load volumes (only needed if running the mockdata-daemon) initialLoadCustomers=20 diff --git a/kafka/build.gradle b/kafka/build.gradle index 8f65848..81e38d2 100644 --- a/kafka/build.gradle +++ b/kafka/build.gradle @@ -12,8 +12,11 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-redis' implementation 'redis.clients:jedis' - implementation 'org.apache.kafka:kafka-clients:3.3.1' - implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.4.2' + implementation 'org.apache.kafka:kafka-clients:3.4.0' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2' + implementation 'net.logstash.logback:logstash-logback-encoder:7.3' + implementation 'org.apache.logging.log4j:log4j-api:2.20.0' + implementation 'org.apache.logging.log4j:log4j-core:2.20.0' } test { diff --git a/kafka/src/main/java/org/msse/demo/customer/KafkaCustomerService.java b/kafka/src/main/java/org/msse/demo/customer/KafkaCustomerService.java index bdaf4eb..7bf33d9 100644 --- a/kafka/src/main/java/org/msse/demo/customer/KafkaCustomerService.java +++ b/kafka/src/main/java/org/msse/demo/customer/KafkaCustomerService.java @@ -11,6 +11,8 @@ import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; +import static net.logstash.logback.argument.StructuredArguments.v; + @Slf4j @Service @Profile("kafka") @@ -46,16 +48,16 @@ public long customerCount() { @SneakyThrows private void produceCustomer(FullCustomer customer) { - log.info("Producing Customer ({}) to Kafka", customer.customer().id()); + log.info("Producing Customer ({}) to Kafka", v("customer_id", customer.customer().id())); kafkaProducer.send(new ProducerRecord<>(topics.customers(), customer.customer().id(), customer.customer())).get(); - log.info("Producing Address ({}) for Customer ({}) to Kafka", customer.address().id(), customer.customer().id()); + log.info("Producing Address ({}) for Customer ({}) to Kafka", v("address_id", customer.address().id()), v("customer_id", customer.customer().id())); kafkaProducer.send(new ProducerRecord<>(topics.addresses(), customer.address().id(), customer.address())).get(); - log.info("Producing Phone ({}) for Customer ({}) to Kafka", customer.phone().id(), customer.customer().id()); + log.info("Producing Phone ({}) for Customer ({}) to Kafka", v("phone_id", customer.phone().id()), v("customer_id", customer.customer().id())); kafkaProducer.send(new ProducerRecord<>(topics.phones(), customer.phone().id(), customer.phone())).get(); - log.info("Producing Email ({}) for Customer ({}) to Kafka", customer.email().id(), customer.customer().id()); + log.info("Producing Email ({}) for Customer ({}) to Kafka", v("email_id", customer.email().id()), v("customer_id", customer.customer().id())); kafkaProducer.send(new ProducerRecord<>(topics.emails(), customer.email().id(), customer.email())).get(); } } diff --git a/kafka/src/main/java/org/msse/demo/music/KafkaMusicService.java b/kafka/src/main/java/org/msse/demo/music/KafkaMusicService.java index d41e773..d850b7a 100644 --- a/kafka/src/main/java/org/msse/demo/music/KafkaMusicService.java +++ b/kafka/src/main/java/org/msse/demo/music/KafkaMusicService.java @@ -17,6 +17,8 @@ import java.util.Optional; +import static net.logstash.logback.argument.StructuredArguments.v; + @Slf4j @Service @Profile("kafka") @@ -32,7 +34,7 @@ public class KafkaMusicService implements MusicService { public Artist createArtist() { Artist artist = musicCache.createArtist(); - log.info("Producing Artist ({}) to Kafka", artist.id()); + log.info("Producing Artist ({}) to Kafka", v("artist_id", artist.id())); send(topics.artists(), artist.id(), artist); return artist; @@ -42,7 +44,7 @@ public Artist createArtist() { public Artist createArtist(String artistId) { Artist artist = musicCache.createArtist(artistId); - log.info("Producing Artist ({}) to Kafka", artist.id()); + log.info("Producing Artist ({}) to Kafka", v("artist_id", artist.id())); send(topics.artists(), artist.id(), artist); return artist; @@ -58,7 +60,9 @@ public Optional createVenue() { Optional venue = musicCache.createVenue(); venue.ifPresent(value -> { - log.info("Producing Venue ({}) at Address ({}) to Kafka", value.id(), value.addressid()); + log.info("Producing Venue ({}) at Address ({}) to Kafka", + v("venue_id", value.id()), + v("address_id", value.addressid())); send(topics.venues(), value.id(), value); }); @@ -70,7 +74,9 @@ public Optional createVenue(String addressId) { Optional venue = musicCache.createVenue(addressId); venue.ifPresent(value -> { - log.info("Producing Venue ({}) at Address ({}) to Kafka", value.id(), value.addressid()); + log.info("Producing Venue ({}) at Address ({}) to Kafka", + v("venue_id", value.id()), + v("address_id", value.addressid())); send(topics.venues(), value.id(), value); }); @@ -87,7 +93,9 @@ public Optional createEvent() { Optional event = musicCache.createEvent(); event.ifPresent(value -> { - log.info("Producing Event ({}) at Venue ({}) for Artist ({}) to Kafka", value.id(), value.venueid(), value.artistid()); + log.info("Producing Event ({}) at Venue ({}) for Artist ({}) to Kafka", + v("event_id", value.id()), + v("venue_id", value.venueid()), value.artistid()); send(topics.events(), value.id(), value); }); @@ -99,7 +107,10 @@ public Optional createEvent(String artistId, String venueId) { Optional event = musicCache.createEvent(artistId, venueId); event.ifPresent(value -> { - log.info("Producing Event ({}) at Venue ({}) for Artist ({}) to Kafka", value.id(), value.venueid(), value.artistid()); + log.info("Producing Event ({}) at Venue ({}) for Artist ({}) to Kafka", + v("event_id", value.id()), + v("venue_id", value.venueid()), + v("artist_id", value.artistid())); send(topics.events(), value.id(), value); }); @@ -116,7 +127,10 @@ public Optional bookTicket() { Optional ticket = musicCache.bookTicket(); ticket.ifPresent(value -> { - log.info("Producing Ticket ({}) for Customer ({}) to Event ({}) to Kafka", value.id(), value.customerid(), value.eventid()); + log.info("Producing Ticket ({}) for Customer ({}) to Event ({}) to Kafka", + v("ticket_id", value.id()), + v("customer_id", value.customerid()), + v("event_id", value.eventid())); send(topics.tickets(), value.id(), value); }); @@ -128,7 +142,10 @@ public Optional bookTicket(String eventId, String customerId) { Optional ticket = musicCache.bookTicket(eventId, customerId); ticket.ifPresent(value -> { - log.info("Producing Ticket ({}) for Customer ({}) to Event ({}) to Kafka", value.id(), value.customerid(), value.eventid()); + log.info("Producing Ticket ({}) for Customer ({}) to Event ({}) to Kafka", + v("ticket_id", value.id()), + v("customer_id", value.customerid()), + v("event_id", value.eventid())); send(topics.tickets(), value.id(), value); }); @@ -145,7 +162,10 @@ public Optional streamArtist() { Optional stream = musicCache.streamArtist(); stream.ifPresent(value -> { - log.info("Producing Stream ({}) for Artist ({}) from Customer ({}) to Kafka", value.id(), value.artistid(), value.customerid()); + log.info("Producing Stream ({}) for Artist ({}) from Customer ({}) to Kafka", + v("stream_id", value.id()), + v("artist_id", value.artistid()), + v("customer_id", value.customerid())); send(topics.streams(), value.id(), value); }); @@ -157,7 +177,10 @@ public Optional streamArtist(String artistId, String customerId) { Optional stream = musicCache.streamArtist(customerId, artistId); stream.ifPresent(value -> { - log.info("Producing Stream ({}) for Artist ({}) from Customer ({}) to Kafka", value.id(), value.artistid(), value.customerid()); + log.info("Producing Stream ({}) for Artist ({}) from Customer ({}) to Kafka", + v("stream_id", value.id()), + v("artist_id", value.artistid()), + v("customer_id", value.customerid())); send(topics.streams(), value.id(), value); }); diff --git a/mockdata-api/build.gradle b/mockdata-api/build.gradle index df24ec8..0b397de 100644 --- a/mockdata-api/build.gradle +++ b/mockdata-api/build.gradle @@ -13,7 +13,10 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-web' + implementation 'net.logstash.logback:logstash-logback-encoder:7.3' + // conditional processing in the logback file + implementation 'org.codehaus.janino:janino:3.1.9' testImplementation 'org.springframework.boot:spring-boot-starter-test' } @@ -24,6 +27,7 @@ task bootRunAPI(type: BootRun, dependsOn: 'build') { mainClass = project.mainClassName doFirst() { + if ("postgres" == project.runtimeMode) { systemProperty 'spring.profiles.active', "postgres" } else if ("kafka" == project.runtimeMode) { @@ -32,6 +36,10 @@ task bootRunAPI(type: BootRun, dependsOn: 'build') { systemProperty 'spring.profiles.active', "kafka,ccloud" } + if ("true" == project.fileLoggingEnabled) { + systemProperty "logback.file-appender.enabled", "true" + } + // configure tracing properties if enabled if ("true" == project.tracingEnabled) { systemProperty "otel.traces.exporter", 'otlp' diff --git a/mockdata-api/src/main/resources/logback.xml b/mockdata-api/src/main/resources/logback.xml new file mode 100644 index 0000000..bb95fd5 --- /dev/null +++ b/mockdata-api/src/main/resources/logback.xml @@ -0,0 +1,31 @@ + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%.-4thread] %-5level %logger{6} [%X{trace_id}-%X{span_id}-%X{trace_flags}] %msg%n + + + + + + + + ../data-demo.log + + {"service":"data-demo"} + + + + + + + + + + + + + + + + + diff --git a/mockdata-daemon/build.gradle b/mockdata-daemon/build.gradle index 4bd9eaf..cf2f213 100644 --- a/mockdata-daemon/build.gradle +++ b/mockdata-daemon/build.gradle @@ -12,6 +12,13 @@ dependencies { implementation project(':kafka') implementation 'org.springframework.boot:spring-boot-starter' + + implementation 'org.springframework.boot:spring-boot-starter-actuator' + implementation 'org.springframework.boot:spring-boot-starter-web' + implementation 'net.logstash.logback:logstash-logback-encoder:7.3' + + // conditional processing in the logback file + implementation 'org.codehaus.janino:janino:3.1.9' } task bootRunDaemon(type: BootRun, dependsOn: 'build') { @@ -36,6 +43,10 @@ task bootRunDaemon(type: BootRun, dependsOn: 'build') { systemProperty 'initial-load.tickets', project.initialLoadTickets systemProperty 'initial-load.streams', project.initialLoadStreams + if ("true" == project.fileLoggingEnabled) { + systemProperty "logback.file-appender.enabled", "true" + } + // configure tracing properties if enabled if ("true" == project.tracingEnabled) { systemProperty "otel.traces.exporter", 'otlp' diff --git a/mockdata-daemon/src/main/resources/logback.xml b/mockdata-daemon/src/main/resources/logback.xml new file mode 100644 index 0000000..20868d1 --- /dev/null +++ b/mockdata-daemon/src/main/resources/logback.xml @@ -0,0 +1,31 @@ + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%.-4thread] %-5level %logger{6} [%X{trace_id}-%X{span_id}-%X{trace_flags}] %msg%n + + + + + + + + ../file.log + + {"service":"data-demo"} + + + + + + + + + + + + + + + + + diff --git a/observability/tempo/docker-compose.yml b/observability/tempo/docker-compose.yml index fccf5fa..8337bde 100644 --- a/observability/tempo/docker-compose.yml +++ b/observability/tempo/docker-compose.yml @@ -6,14 +6,18 @@ networks: name: dev-local services: +################### +# TEMPO - TRACING # +################### + # Uncomment k6-tracing to generate fake traces... - k6-tracing: - image: ghcr.io/grafana/xk6-client-tracing:v0.0.2 - environment: - - ENDPOINT=otel-collector:4317 - restart: always - depends_on: - - otel-collector +# k6-tracing: +# image: ghcr.io/grafana/xk6-client-tracing:v0.0.2 +# environment: +# - ENDPOINT=otel-collector:4317 +# restart: always +# depends_on: +# - otel-collector # And put them in an OTEL collector pipeline... # https://opentelemetry.io/docs/collector/configuration @@ -39,6 +43,10 @@ services: - "4318" # otlp http - "9411" # zipkin +######################## +# PROMETHEUS - METRICS # +######################## + prometheus: image: prom/prometheus:latest command: @@ -50,15 +58,51 @@ services: ports: - "19090:9090" +############### +# LOKI - LOGS # +############### + + loki: + image: grafana/loki:2.8.0 + ports: + - "3100:3100" + command: -config.file=/etc/loki/local-config.yaml + + promtail: + image: grafana/promtail:2.8.0 + volumes: + - ../../:/var/log + command: -config.file=/etc/promtail/config.yml + grafana: image: grafana/grafana:9.4.3 volumes: - ./grafana-datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml environment: + #- GF_PATHS_PROVISIONING=/etc/grafana/provisioning - GF_AUTH_ANONYMOUS_ENABLED=true - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin - GF_AUTH_DISABLE_LOGIN_FORM=true - GF_FEATURE_TOGGLES_ENABLE=traceqlEditor + entrypoint: + - sh + - -euc + - | + mkdir -p /etc/grafana/provisioning/datasources + cat < /etc/grafana/provisioning/datasources/ds.yaml + apiVersion: 1 + datasources: + - name: Loki + type: loki + access: proxy + orgId: 1 + url: http://loki:3100 + basicAuth: false + isDefault: false + version: 1 + editable: false + EOF + /run.sh ports: - "13000:3000"