From ef59f219c86028f18934863a70292ddf5f130ae1 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 24 Mar 2026 16:41:21 -0700 Subject: [PATCH 1/6] Improves a visibility about how many failed events went to DLQ. If error happens during the DLQ persist, shows the status, action and response for better visibility. --- lib/logstash/plugin_mixins/elasticsearch/common.rb | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 8e7f92408..add0e1455 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -238,7 +238,12 @@ def handle_dlq_response(message, action, status, response) if @dlq_writer # TODO: Change this to send a map with { :status => status, :action => action } in the future detailed_message = "#{message} status: #{status}, action: #{action_params}, response: #{response}" - @dlq_writer.write(event, "#{detailed_message}") + begin + @dlq_writer.write(event, "#{detailed_message}") + rescue => e + @logger.error("Failed to write event to DLQ", + error_message: e.message, exception: e.class, + status: status, action: action_params, response: response) else log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn @@ -273,6 +278,7 @@ def submit(actions) end actions_to_retry = [] + dlq_count = 0 responses.each_with_index do |response,idx| action_type, action_props = response.first @@ -296,6 +302,7 @@ def submit(actions) elsif @dlq_codes.include?(status) handle_dlq_response("Could not index event to Elasticsearch.", action, status, response) @document_level_metrics.increment(:dlq_routed) + dlq_count += 1 next else # only log what the user whitelisted @@ -305,6 +312,10 @@ def submit(actions) end end + if @dlq_writer && dlq_count > 0 + @logger.warn("Events could not be indexed and routed to DLQ, count: #{dlq_count}") + end + actions_to_retry end From 2199b89e6b5e95389bc93eb16a0f7f00a1d258f6 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 24 Mar 2026 16:59:36 -0700 Subject: [PATCH 2/6] Oops, block closure wasn't committed. --- lib/logstash/plugin_mixins/elasticsearch/common.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index add0e1455..dff199f20 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -244,6 +244,7 @@ def handle_dlq_response(message, action, status, response) @logger.error("Failed to write event to DLQ", error_message: e.message, exception: e.class, status: status, action: action_params, response: response) + end else log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn From bad8c0cddefdc094d00a99613d2ae7470f7b54a9 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 25 Mar 2026 14:07:39 -0700 Subject: [PATCH 3/6] Version bump, add changelog, put back the origin exception throw after testing. --- CHANGELOG.md | 3 +++ lib/logstash/plugin_mixins/elasticsearch/common.rb | 1 + logstash-output-elasticsearch.gemspec | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f63676add..a4a14ba43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 12.1.3 + - Improves the logging experience when DLQ used [#1253](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1253) + ## 12.1.2 - Fix: replace deprecated `File.exists?` with `File.exist?` for Ruby 3.4 (JRuby 10) compatibility [#1238](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1238) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index dff199f20..5463ef7ec 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -244,6 +244,7 @@ def handle_dlq_response(message, action, status, response) @logger.error("Failed to write event to DLQ", error_message: e.message, exception: e.class, status: status, action: action_params, response: response) + raise e end else log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index a9f5c6adf..8e59f19af 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '12.1.2' + s.version = '12.1.3' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From c72668bda37d7eb88e660a7787bc8dab6ff18e44 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 25 Mar 2026 14:26:54 -0700 Subject: [PATCH 4/6] Capture a sample event for each status to show what is happening with ES. --- lib/logstash/plugin_mixins/elasticsearch/common.rb | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 5463ef7ec..8d97e0fa3 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -280,7 +280,7 @@ def submit(actions) end actions_to_retry = [] - dlq_count = 0 + dlq_reouted_stats = {} responses.each_with_index do |response,idx| action_type, action_props = response.first @@ -304,7 +304,11 @@ def submit(actions) elsif @dlq_codes.include?(status) handle_dlq_response("Could not index event to Elasticsearch.", action, status, response) @document_level_metrics.increment(:dlq_routed) - dlq_count += 1 + if dlq_reouted_stats.key?(status) + dlq_reouted_stats[status][:count] += 1 + else + dlq_reouted_stats[status] = { :count => 1, :sample_event => { :action => action, :response => response } } + end next else # only log what the user whitelisted @@ -314,8 +318,9 @@ def submit(actions) end end - if @dlq_writer && dlq_count > 0 - @logger.warn("Events could not be indexed and routed to DLQ, count: #{dlq_count}") + if @dlq_writer && !dlq_reouted_stats.empty? + total = dlq_reouted_stats.values.sum { |entry| entry[:count] } + @logger.warn("Events could not be indexed and routing to DLQ", :count => total, :dlq_reouted_stats => dlq_reouted_stats) end actions_to_retry From a3304702be5ced59c7ced69b289c83d4c2f935bf Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 25 Mar 2026 14:34:26 -0700 Subject: [PATCH 5/6] Rollback the dlq writer begin-end statement which sample events logged just before handle_dlq_response --- lib/logstash/plugin_mixins/elasticsearch/common.rb | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 8d97e0fa3..0c6950d40 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -238,14 +238,7 @@ def handle_dlq_response(message, action, status, response) if @dlq_writer # TODO: Change this to send a map with { :status => status, :action => action } in the future detailed_message = "#{message} status: #{status}, action: #{action_params}, response: #{response}" - begin - @dlq_writer.write(event, "#{detailed_message}") - rescue => e - @logger.error("Failed to write event to DLQ", - error_message: e.message, exception: e.class, - status: status, action: action_params, response: response) - raise e - end + @dlq_writer.write(event, "#{detailed_message}") else log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn From b306761fc74b221e7428cfb6573b6579ed9c6084 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 14 Apr 2026 11:25:36 -0700 Subject: [PATCH 6/6] Fix the typo wrong naming --- .../plugin_mixins/elasticsearch/common.rb | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 0c6950d40..5921e3ffb 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -241,7 +241,6 @@ def handle_dlq_response(message, action, status, response) @dlq_writer.write(event, "#{detailed_message}") else log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn - @logger.public_send(log_level, message, status: status, action: action_params, response: response) end end @@ -273,7 +272,7 @@ def submit(actions) end actions_to_retry = [] - dlq_reouted_stats = {} + dlq_routed_stats = {} responses.each_with_index do |response,idx| action_type, action_props = response.first @@ -297,10 +296,10 @@ def submit(actions) elsif @dlq_codes.include?(status) handle_dlq_response("Could not index event to Elasticsearch.", action, status, response) @document_level_metrics.increment(:dlq_routed) - if dlq_reouted_stats.key?(status) - dlq_reouted_stats[status][:count] += 1 + if dlq_routed_stats.key?(status) + dlq_routed_stats[status][:count] += 1 else - dlq_reouted_stats[status] = { :count => 1, :sample_event => { :action => action, :response => response } } + dlq_routed_stats[status] = { :count => 1, :sample_event => { :action => action, :response => response } } end next else @@ -311,9 +310,9 @@ def submit(actions) end end - if @dlq_writer && !dlq_reouted_stats.empty? - total = dlq_reouted_stats.values.sum { |entry| entry[:count] } - @logger.warn("Events could not be indexed and routing to DLQ", :count => total, :dlq_reouted_stats => dlq_reouted_stats) + if @dlq_writer && !dlq_routed_stats.empty? + total = dlq_routed_stats.values.sum { |entry| entry[:count] } + @logger.warn("Events could not be indexed and routing to DLQ", :count => total, :dlq_routed_stats => dlq_routed_stats) end actions_to_retry