Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
834859e
Exposed HdrHistogram memory occupation and aggregated at pipeline level
andsel Mar 30, 2026
d1a129b
[Test] added test coverage of the computation when batch metrics is e…
andsel Mar 30, 2026
e14381a
Added logging of estimated space for each pipeline
andsel Mar 30, 2026
2ac6354
Minor, expanded * import
andsel Mar 30, 2026
de0ab9c
Fix logger format string
andsel Mar 31, 2026
e575d84
Commented lifetime part to be enabled after #18911
andsel Mar 31, 2026
5b90df1
[Test] Spread the disabling of batch sampling mode in Pipeline's Acti…
andsel Mar 31, 2026
ad19aa1
[Test] Spread the disabling of batch sampling mode for pipeline and p…
andsel Mar 31, 2026
3faf8fe
Disable batch structure monitoring for internal '.monitoring-logstash…
andsel Apr 2, 2026
df29ae5
Adds a check on the collector being enabled before logging batch size…
andsel Apr 2, 2026
b0f27a7
[Minor] cleaned commented test code
andsel Apr 7, 2026
1ac40d5
[Test] Fixed tests after addition of lifecycle policy
andsel Apr 7, 2026
b42faa4
Changed count of lifetime policy datapoints to count also for the sta…
andsel Apr 7, 2026
6c91d6a
Fixed datapoints sumup for policies considering also the staging snap…
andsel Apr 7, 2026
85c4f1e
Fixed naming, avoid 'occupation' and use 'memory consumption'
andsel Apr 9, 2026
0a513db
Fixed typo
andsel Apr 9, 2026
92c1627
Better wording to comments
andsel Apr 9, 2026
d5c462d
[Docs] Updates Logstash tuning guide to describe that more memory is …
andsel Apr 13, 2026
34c4590
[Doc] reworded the note
andsel Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/reference/tuning-logstash.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@
* Threads in Java have names and you can use the `jstack`, `top`, and the VisualVM graphical tools to figure out which resources a given thread uses.
* On Linux platforms, Logstash labels its threads with descriptive names. For example, inputs show up as `[base]<inputname`, and pipeline workers show up as `[base]>workerN`, where N is an integer. Where possible, other threads are also labeled to help you identify their purpose.

::::{note}
Enabling `pipeline.batch.metrics.sampling_mode` leads to increased heap consumption for each pipeline. You can identify this increased consumption by enabling debug logging for the `org.logstash.execution.AbstractPipelineExt` logger and searching the logs for a specific string like:

```shell
[2026-03-30T17:28:45,791][INFO ][org.logstash.execution.AbstractPipelineExt] Pipeline `main` batch metrics estimated memory occupation
```

The system estimates and logs the memory consumed by tracking structural metrics for each pipeline batch.

You can globally disable batch metrics sampling by setting `pipeline.batch.metrics.sampling_mode` to disabled in [logstash.yml](/reference/logstash-settings-file.md). To enable it for specific pipelines, configure this setting per pipeline in [pipelines.yml](/reference/multiple-pipelines.md).

Check notice on line 73 in docs/reference/tuning-logstash.md

View workflow job for this annotation

GitHub Actions / build / vale

Elastic.WordChoice: Consider using 'deactivated, deselected, hidden, turned off, unavailable' instead of 'disabled', unless the term is in the UI.

Check notice on line 73 in docs/reference/tuning-logstash.md

View workflow job for this annotation

GitHub Actions / build / vale

Elastic.WordChoice: Consider using 'deactivate, deselect, hide, turn off' instead of 'disable', unless the term is in the UI.
::::
Comment thread
andsel marked this conversation as resolved.





## Optimizing batch sizes [batch-size-optimization]
{applies_to}`stack: preview 9.4.0`
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ def start
@finished_execution.make_false
@finished_run.make_false

log_batch_metrics_memory_consumption

@thread = Thread.new do
error_log_params = ->(e) {
exception_logging_keys(e,
Expand Down
80 changes: 68 additions & 12 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
require "stud/try"
require 'timeout'
require "thread"
require 'java'

class DummyInput < LogStash::Inputs::Base
config_name "dummyinput"
Expand Down Expand Up @@ -219,16 +220,21 @@ def flush(options)
let(:pipeline_settings) do
{
"dead_letter_queue.enable" => dead_letter_queue_enabled,
"path.dead_letter_queue" => dead_letter_queue_path
"path.dead_letter_queue" => dead_letter_queue_path,
"pipeline.batch.metrics.sampling_mode" => batch_sampling_mode,
}
end
let(:batch_sampling_mode) { "disabled" }
let(:max_retry) {10} #times
let(:timeout) {120} #seconds

before :each do
pipeline_workers_setting = LogStash::SETTINGS.get_setting("pipeline.workers")
allow(pipeline_workers_setting).to receive(:default).and_return(worker_thread_count)

pipeline_workers_setting = pipeline_settings_obj.get_setting("pipeline.workers")
allow(pipeline_workers_setting).to receive(:default).and_return(worker_thread_count)

pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
Comment thread
estolfo marked this conversation as resolved.
end

Expand Down Expand Up @@ -390,7 +396,7 @@ def flush(options)
mutate { add_tag => "miss" }
}
}
CONFIG
CONFIG

context "raise an error when it's evaluated, should cancel the event execution and log the error" do
context "when type of evaluation doesn't have same type" do
Expand Down Expand Up @@ -429,7 +435,7 @@ def flush(options)
}
}
}
CONFIG
CONFIG

sample_one( [{ "path" => {"to" => {"value" => "101"}}}] ) do
expect(subject).to be nil
Expand Down Expand Up @@ -688,7 +694,7 @@ def flush(options)
context "when there is no command line -w N set" do
it "starts one filter thread" do
msg = "Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads"
pipeline = mock_java_pipeline_from_string(test_config_with_filters)
pipeline = mock_java_pipeline_from_string(test_config_with_filters, pipeline_settings_obj)
expect(pipeline.logger).to receive(:warn).with(msg,
hash_including({:count_was => worker_thread_count, :filters => ["dummyfilter"]}))
pipeline.start
Expand Down Expand Up @@ -772,7 +778,7 @@ def flush(options)
}

context "input and output close" do
let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers) }
let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) }
let(:output) { pipeline.outputs.first }
let(:input) { pipeline.inputs.first }

Expand Down Expand Up @@ -824,7 +830,7 @@ def flush(options)
let(:config) { "input { dummyinput {} } output { dummyoutput {} }"}

it "should start the flusher thread only after the pipeline is running" do
pipeline = mock_java_pipeline_from_string(config)
pipeline = mock_java_pipeline_from_string(config, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode))

expect(pipeline).to receive(:transition_to_running).ordered.and_call_original
expect(pipeline).to receive(:start_flusher).ordered.and_call_original
Expand Down Expand Up @@ -1049,7 +1055,7 @@ def flush(options)
context "metrics" do
config = "input { } filter { } output { }"

let(:settings) { LogStash::SETTINGS.clone }
let(:settings) { mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode) }
subject { mock_java_pipeline_from_string(config, settings, metric) }

after :each do
Expand Down Expand Up @@ -1132,6 +1138,56 @@ def flush(options)
expect(subject.metric.collector).to be(collector)
end
end

context "batch structure metric estimation" do
let(:collector) { ::LogStash::Instrument::Collector.new }
let(:metric) { ::LogStash::Instrument::Metric.new(collector) }

let(:sample_occupation) do
java_import 'org.HdrHistogram.Recorder'
# HistogramMetric uses HdrHistogram with 3 digits precision, so create a sample to have
# the rough histogram memory consumption.
sample = Recorder.new(3).interval_histogram

sample.estimated_footprint_in_bytes
end

# BatchStructureMetric has 4 policies
# Each window contains also the staging, so
# has to be summed up to the bare count of retention / resolution periods.
Comment thread
andsel marked this conversation as resolved.
# Note that the calculations correspond to the resolutions per 60 seconds multiplied by the number
# of minutes. For example, for the 5 minute rentention policy:
# 60 / 15 (resolution per 60 seconds) * 5 (minutes). The + 1 corresponds to the staging datapoint.
let(:last_1_minute_datapoints) { 60 / 3 + 1 }
let(:last_5_minutes_datapoints) { 5 * 60 / 15 + 1}
let(:last_15_minutes_datapoints) { 15 * 60 / 30 + 1 }
let(:lifetime_datapoints) { 2 }
let(:single_batch_metric_datapoints) do
last_1_minute_datapoints + last_5_minutes_datapoints + last_15_minutes_datapoints + lifetime_datapoints
end

# byte size and event count batch metrics has single_batch_metric_datapoints each plus
# other 3 datapoints used by each lifetime histogram metric
let(:total_datapoints) { 2 * single_batch_metric_datapoints + 2 * 3 }
let(:expected_occupation) { sample_occupation * total_datapoints }

context "when enabled" do
# enable batch sampling else the batch metrics are not initialized
let(:batch_sampling_mode) { "full" }

it "should report the expected result" do
subject.initialize_flow_metrics
expect(subject.estimate_batch_metrics_occupation).to be(expected_occupation)
end
end

context "when disabled" do
it "must return nil" do
subject.initialize_flow_metrics
expect(subject.estimate_batch_metrics_occupation).to be_nil
end
end
end
end
end

Expand Down Expand Up @@ -1190,13 +1246,13 @@ def flush(options)
end
end
context "Pipeline created with too many filters" do
# create pipeline with 2000 filters
# 2000 filters is more than a thread stack of size 2MB can handle
# create pipeline with 2500 filters
# 2500 filters is more than a thread stack of size 2MB can handle
let(:config) do
<<-EOS
input { dummy_input {} }
filter {
#{" nil_flushing_filter {}\n" * 2000}
#{" nil_flushing_filter {}\n" * 2500}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewer
2000 filters produced flaky tests, when run singularly, so raised a little bit the limit.

}
output { dummy_output {} }
EOS
Expand Down Expand Up @@ -1368,7 +1424,7 @@ def flush(options)
EOS
end

subject { mock_java_pipeline_from_string(config) }
subject { mock_java_pipeline_from_string(config, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) }

context "when the pipeline is not started" do
after :each do
Expand All @@ -1395,7 +1451,7 @@ def flush(options)
}
EOS
end
subject { mock_java_pipeline_from_string(config) }
subject { mock_java_pipeline_from_string(config, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) }

context "when the pipeline is not started" do
after :each do
Expand Down
5 changes: 4 additions & 1 deletion logstash-core/spec/logstash/pipeline_action/delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
describe LogStash::PipelineAction::Delete do
let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" }
let(:pipeline_id) { :main }
let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) }
let(:pipeline) do
# Disable batch metrics mode to avoid having to initialize the entire flow metrics part as well
mock_java_pipeline_from_string(pipeline_config, mock_settings("pipeline.batch.metrics.sampling_mode" => "disabled"))
end
let(:pipelines) do
LogStash::PipelinesRegistry.new.tap do |chm|
chm.create_pipeline(pipeline_id, pipeline) { true }
Expand Down
5 changes: 4 additions & 1 deletion logstash-core/spec/logstash/pipeline_action/reload_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
let(:pipeline_id) { :main }
let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { dummyblockinginput { id => 'new' } } output { null {} }", { "pipeline.reloadable" => true}) }
let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" }
let(:pipeline) { mock_java_pipeline_from_string(pipeline_config, mock_settings("pipeline.reloadable" => true)) }
let(:pipeline) do
settings = mock_settings({"pipeline.reloadable" => true, "pipeline.batch.metrics.sampling_mode" => "disabled"})
mock_java_pipeline_from_string(pipeline_config, settings)
end
let(:pipelines) { r = LogStash::PipelinesRegistry.new; r.create_pipeline(pipeline_id, pipeline) { true }; r }
let(:agent) { double("agent") }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
describe LogStash::PipelineAction::StopAndDelete do
let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" }
let(:pipeline_id) { :main }
let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) }
let(:pipeline) do
# Disable batch metrics mode to avoid having to initialize the entire flow metrics part as well
mock_java_pipeline_from_string(pipeline_config, mock_settings("pipeline.batch.metrics.sampling_mode" => "disabled"))
end
let(:pipelines) do
LogStash::PipelinesRegistry.new.tap do |chm|
chm.create_pipeline(pipeline_id, pipeline) { true }
Expand Down
5 changes: 4 additions & 1 deletion logstash-core/spec/logstash/pipeline_action/stop_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
describe LogStash::PipelineAction::Stop do
let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" }
let(:pipeline_id) { :main }
let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) }
let(:pipeline) do
# Disable batch metrics mode to avoid having to initialize the entire flow metrics part as well
mock_java_pipeline_from_string(pipeline_config, mock_settings("pipeline.batch.metrics.sampling_mode" => "disabled"))
end
let(:pipelines) { chm = LogStash::PipelinesRegistry.new; chm.create_pipeline(pipeline_id, pipeline) { true }; chm }
let(:agent) { double("agent") }

Expand Down
3 changes: 2 additions & 1 deletion logstash-core/spec/logstash/pipeline_reporter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
let(:config) do
"input { generator { count => #{generator_count} } } output { dummyoutput {} } "
end
let(:pipeline) { Kernel.send(pipeline_setup, config)}
# Disable batch metrics mode to avoid having to initialize the entire flow metrics part as well
let(:pipeline) { Kernel.send(pipeline_setup, config, mock_settings("pipeline.batch.metrics.sampling_mode" => "disabled"))}
let(:reporter) { pipeline.reporter }

let(:do_setup_plugin_registry) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,43 @@ public final IRubyObject openQueue(final ThreadContext context) {
return context.nil;
}

/**
* Needs to be invoked after initialize_flow_metrics
*
* @return the occupation of internal structures in bytes or nil if batch metrics are disabled
* */
@JRubyMethod(name = "estimate_batch_metrics_occupation")
public final IRubyObject estimateBatchMetricsInternalStructures(final ThreadContext context) {
if (!isBatchMetricsEnabled(context)) {
return context.nil;
}

return context.runtime.newFixnum(getEstimateBatchMetricsInternalStructures());
}

private int getEstimateBatchMetricsInternalStructures() {
if (batchStructureMetric == null) {
throw new IllegalStateException("Batch metrics estimation invoked before the internal data structures were instantiated");
Comment thread
estolfo marked this conversation as resolved.
}
int eventCountFootprint = batchEventCountStructureMetric.estimateBatchMetricsFootprintInBytes();
int byteSizeFootprint = batchStructureMetric.estimateBatchMetricsFootprintInBytes();
int histogramCollectorFootprint = filterQueueClient.estimateBatchMetricsFootprintInBytes();
return eventCountFootprint + byteSizeFootprint + histogramCollectorFootprint;
}

@JRubyMethod(name = "log_batch_metrics_memory_consumption")
public final IRubyObject logEstimatedBatchMetricMemoryConsumption(final ThreadContext context) {
if (metric.collector(context).isNil() || !getSetting(context, "metric.collect").isTrue()) {
LOGGER.debug("Metrics collection is disabled, skipping batch metrics logging");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If metrics collection is disabled, should we still log that metrics logging will be skipped? Or would a user expect that if metrics collection is disabled, no logging related to metrics will be done?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If metric collection is disabled, and can be only programmatically disabled, like in the monitoring pipeline, then no batch metrics log line is expected.
This debug log is here to recall the reason why no batch memory log is happening.
But can be eventually removed and just return nil.

return context.nil;
}

if (isBatchMetricsEnabled(context)) {
LOGGER.info("Pipeline `{}` batch metrics estimated memory consumption: {} bytes", pipelineId, getEstimateBatchMetricsInternalStructures());
}
return context.nil;
}

@JRubyMethod(name = "process_events_namespace_metric")
public final IRubyObject processEventsNamespaceMetric(final ThreadContext context) {
return metric.namespace(context, EVENTS_METRIC_NAMESPACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ public interface QueueReadClient {
public <E extends Exception> void executeWithTimers(final TimerMetric.ExceptionalRunnable<E> runnable) throws E;

boolean isEmpty();

int estimateBatchMetricsFootprintInBytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
Expand Down Expand Up @@ -110,6 +109,14 @@ public IRubyObject setPipelineMetric(final IRubyObject metric) {
return this;
}

/**
* Must be invoked after setPipelineMetric which configured the batch metric.
* */
@Override
public final int estimateBatchMetricsFootprintInBytes() {
return batchMetrics.estimateBatchMetricsFootprintInBytes();
}

@JRubyMethod(name = "set_batch_dimensions")
public IRubyObject rubySetBatchDimensions(final IRubyObject batchSize,
final IRubyObject waitForMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,12 @@ private void updateBatchSizeMetric(QueueBatch batch) {
LOG.error("Failed to calculate batch byte size for metrics", e);
}
}

public int estimateBatchMetricsFootprintInBytes() {
if (batchMetricMode != QueueFactoryExt.BatchMetricMode.DISABLED) {
return pipelineMetricBatchByteSizeFlowHistogram.estimateBatchMetricsFootprintInBytes() +
pipelineMetricBatchEventCountFlowHistogram.estimateBatchMetricsFootprintInBytes();
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ public Map<String, HistogramMetricData> getValue() {
return Collections.unmodifiableMap(rates);
}

public int estimateBatchMetricsFootprintInBytes() {
int singleHistogramFootprint = batchHistogram.estimateSingleHistogramFootprintInBytes();
int totalDatapointsCount = 0;
for (FlowMetricRetentionPolicy supportedPolicy : SUPPORTED_POLICIES) {
totalDatapointsCount += supportedPolicy.datapointsCount();
}
return singleHistogramFootprint * totalDatapointsCount;
}

/**
* Presentation class to render histogram data, leverage its JSON serializability.
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public long commitBarrierNanos(long referenceNanos) {
public long maxAgeBarrierNanos(long referenceNanos) {
return Long.MIN_VALUE;
}

@Override
public int datapointsCount() {
// The lifetime retention window keeps one committed capture and, once updated,
// one staged capture prior to compaction/reporting.
return 2;
}
};

private static final Set<FlowMetricRetentionPolicy> ALL;
Expand Down
Loading
Loading