diff --git a/docs/reference/tuning-logstash.md b/docs/reference/tuning-logstash.md index 61602ab9c53..14578c228d3 100644 --- a/docs/reference/tuning-logstash.md +++ b/docs/reference/tuning-logstash.md @@ -61,6 +61,21 @@ If you plan to modify the default pipeline settings, take into account the follo * Threads in Java have names and you can use the `jstack`, `top`, and the VisualVM graphical tools to figure out which resources a given thread uses. * On Linux platforms, Logstash labels its threads with descriptive names. For example, inputs show up as `[base]workerN`, where N is an integer. Where possible, other threads are also labeled to help you identify their purpose. +::::{note} +Enabling `pipeline.batch.metrics.sampling_mode` leads to increased heap consumption for each pipeline. You can identify this increased consumption by enabling debug logging for the `org.logstash.execution.AbstractPipelineExt` logger and searching the logs for a specific string like: + +```shell +[2026-03-30T17:28:45,791][INFO ][org.logstash.execution.AbstractPipelineExt] Pipeline `main` batch metrics estimated memory occupation +``` + +The system estimates and logs the memory consumed by tracking structural metrics for each pipeline batch. + +You can globally disable batch metrics sampling by setting `pipeline.batch.metrics.sampling_mode` to disabled in [logstash.yml](/reference/logstash-settings-file.md). To enable it for specific pipelines, configure this setting per pipeline in [pipelines.yml](/reference/multiple-pipelines.md). +:::: + + + + ## Optimizing batch sizes [batch-size-optimization] {applies_to}`stack: preview 9.4.0` diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index d3005b2075e..71dfd5bc50f 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -144,6 +144,8 @@ def start @finished_execution.make_false @finished_run.make_false + log_batch_metrics_memory_consumption + @thread = Thread.new do error_log_params = ->(e) { exception_logging_keys(e, diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index ce7cee03862..571e9cb8068 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -25,6 +25,7 @@ require "stud/try" require 'timeout' require "thread" +require 'java' class DummyInput < LogStash::Inputs::Base config_name "dummyinput" @@ -219,9 +220,11 @@ def flush(options) let(:pipeline_settings) do { "dead_letter_queue.enable" => dead_letter_queue_enabled, - "path.dead_letter_queue" => dead_letter_queue_path + "path.dead_letter_queue" => dead_letter_queue_path, + "pipeline.batch.metrics.sampling_mode" => batch_sampling_mode, } end + let(:batch_sampling_mode) { "disabled" } let(:max_retry) {10} #times let(:timeout) {120} #seconds @@ -229,6 +232,9 @@ def flush(options) pipeline_workers_setting = LogStash::SETTINGS.get_setting("pipeline.workers") allow(pipeline_workers_setting).to receive(:default).and_return(worker_thread_count) + pipeline_workers_setting = pipeline_settings_obj.get_setting("pipeline.workers") + allow(pipeline_workers_setting).to receive(:default).and_return(worker_thread_count) + pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) } end @@ -390,7 +396,7 @@ def flush(options) mutate { add_tag => "miss" } } } - CONFIG + CONFIG context "raise an error when it's evaluated, should cancel the event execution and log the error" do context "when type of evaluation doesn't have same type" do @@ -429,7 +435,7 @@ def flush(options) } } } - CONFIG + CONFIG sample_one( [{ "path" => {"to" => {"value" => "101"}}}] ) do expect(subject).to be nil @@ -688,7 +694,7 @@ def flush(options) context "when there is no command line -w N set" do it "starts one filter thread" do msg = "Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads" - pipeline = mock_java_pipeline_from_string(test_config_with_filters) + pipeline = mock_java_pipeline_from_string(test_config_with_filters, pipeline_settings_obj) expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:count_was => worker_thread_count, :filters => ["dummyfilter"]})) pipeline.start @@ -772,7 +778,7 @@ def flush(options) } context "input and output close" do - let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers) } + let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) } let(:output) { pipeline.outputs.first } let(:input) { pipeline.inputs.first } @@ -824,7 +830,7 @@ def flush(options) let(:config) { "input { dummyinput {} } output { dummyoutput {} }"} it "should start the flusher thread only after the pipeline is running" do - pipeline = mock_java_pipeline_from_string(config) + pipeline = mock_java_pipeline_from_string(config, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) expect(pipeline).to receive(:transition_to_running).ordered.and_call_original expect(pipeline).to receive(:start_flusher).ordered.and_call_original @@ -1049,7 +1055,7 @@ def flush(options) context "metrics" do config = "input { } filter { } output { }" - let(:settings) { LogStash::SETTINGS.clone } + let(:settings) { mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode) } subject { mock_java_pipeline_from_string(config, settings, metric) } after :each do @@ -1132,6 +1138,56 @@ def flush(options) expect(subject.metric.collector).to be(collector) end end + + context "batch structure metric estimation" do + let(:collector) { ::LogStash::Instrument::Collector.new } + let(:metric) { ::LogStash::Instrument::Metric.new(collector) } + + let(:sample_occupation) do + java_import 'org.HdrHistogram.Recorder' + # HistogramMetric uses HdrHistogram with 3 digits precision, so create a sample to have + # the rough histogram memory consumption. + sample = Recorder.new(3).interval_histogram + + sample.estimated_footprint_in_bytes + end + + # BatchStructureMetric has 4 policies + # Each window contains also the staging, so + # has to be summed up to the bare count of retention / resolution periods. + # Note that the calculations correspond to the resolutions per 60 seconds multiplied by the number + # of minutes. For example, for the 5 minute rentention policy: + # 60 / 15 (resolution per 60 seconds) * 5 (minutes). The + 1 corresponds to the staging datapoint. + let(:last_1_minute_datapoints) { 60 / 3 + 1 } + let(:last_5_minutes_datapoints) { 5 * 60 / 15 + 1} + let(:last_15_minutes_datapoints) { 15 * 60 / 30 + 1 } + let(:lifetime_datapoints) { 2 } + let(:single_batch_metric_datapoints) do + last_1_minute_datapoints + last_5_minutes_datapoints + last_15_minutes_datapoints + lifetime_datapoints + end + + # byte size and event count batch metrics has single_batch_metric_datapoints each plus + # other 3 datapoints used by each lifetime histogram metric + let(:total_datapoints) { 2 * single_batch_metric_datapoints + 2 * 3 } + let(:expected_occupation) { sample_occupation * total_datapoints } + + context "when enabled" do + # enable batch sampling else the batch metrics are not initialized + let(:batch_sampling_mode) { "full" } + + it "should report the expected result" do + subject.initialize_flow_metrics + expect(subject.estimate_batch_metrics_occupation).to be(expected_occupation) + end + end + + context "when disabled" do + it "must return nil" do + subject.initialize_flow_metrics + expect(subject.estimate_batch_metrics_occupation).to be_nil + end + end + end end end @@ -1190,13 +1246,13 @@ def flush(options) end end context "Pipeline created with too many filters" do - # create pipeline with 2000 filters - # 2000 filters is more than a thread stack of size 2MB can handle + # create pipeline with 2500 filters + # 2500 filters is more than a thread stack of size 2MB can handle let(:config) do <<-EOS input { dummy_input {} } filter { - #{" nil_flushing_filter {}\n" * 2000} + #{" nil_flushing_filter {}\n" * 2500} } output { dummy_output {} } EOS @@ -1368,7 +1424,7 @@ def flush(options) EOS end - subject { mock_java_pipeline_from_string(config) } + subject { mock_java_pipeline_from_string(config, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) } context "when the pipeline is not started" do after :each do @@ -1395,7 +1451,7 @@ def flush(options) } EOS end - subject { mock_java_pipeline_from_string(config) } + subject { mock_java_pipeline_from_string(config, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) } context "when the pipeline is not started" do after :each do diff --git a/logstash-core/spec/logstash/pipeline_action/delete_spec.rb b/logstash-core/spec/logstash/pipeline_action/delete_spec.rb index 73193389ae0..369ff14c898 100644 --- a/logstash-core/spec/logstash/pipeline_action/delete_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/delete_spec.rb @@ -26,7 +26,10 @@ describe LogStash::PipelineAction::Delete do let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } let(:pipeline_id) { :main } - let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) } + let(:pipeline) do + # Disable batch metrics mode to avoid having to initialize the entire flow metrics part as well + mock_java_pipeline_from_string(pipeline_config, mock_settings("pipeline.batch.metrics.sampling_mode" => "disabled")) + end let(:pipelines) do LogStash::PipelinesRegistry.new.tap do |chm| chm.create_pipeline(pipeline_id, pipeline) { true } diff --git a/logstash-core/spec/logstash/pipeline_action/reload_spec.rb b/logstash-core/spec/logstash/pipeline_action/reload_spec.rb index 5e0f9e171fd..e92e45a01f7 100644 --- a/logstash-core/spec/logstash/pipeline_action/reload_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/reload_spec.rb @@ -26,7 +26,10 @@ let(:pipeline_id) { :main } let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { dummyblockinginput { id => 'new' } } output { null {} }", { "pipeline.reloadable" => true}) } let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } - let(:pipeline) { mock_java_pipeline_from_string(pipeline_config, mock_settings("pipeline.reloadable" => true)) } + let(:pipeline) do + settings = mock_settings({"pipeline.reloadable" => true, "pipeline.batch.metrics.sampling_mode" => "disabled"}) + mock_java_pipeline_from_string(pipeline_config, settings) + end let(:pipelines) { r = LogStash::PipelinesRegistry.new; r.create_pipeline(pipeline_id, pipeline) { true }; r } let(:agent) { double("agent") } diff --git a/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb b/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb index a32ed5eb0fa..3fb900e05c0 100644 --- a/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb @@ -26,7 +26,10 @@ describe LogStash::PipelineAction::StopAndDelete do let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } let(:pipeline_id) { :main } - let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) } + let(:pipeline) do + # Disable batch metrics mode to avoid having to initialize the entire flow metrics part as well + mock_java_pipeline_from_string(pipeline_config, mock_settings("pipeline.batch.metrics.sampling_mode" => "disabled")) + end let(:pipelines) do LogStash::PipelinesRegistry.new.tap do |chm| chm.create_pipeline(pipeline_id, pipeline) { true } diff --git a/logstash-core/spec/logstash/pipeline_action/stop_spec.rb b/logstash-core/spec/logstash/pipeline_action/stop_spec.rb index 8355abb8cfd..d1020652bb0 100644 --- a/logstash-core/spec/logstash/pipeline_action/stop_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/stop_spec.rb @@ -23,7 +23,10 @@ describe LogStash::PipelineAction::Stop do let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } let(:pipeline_id) { :main } - let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) } + let(:pipeline) do + # Disable batch metrics mode to avoid having to initialize the entire flow metrics part as well + mock_java_pipeline_from_string(pipeline_config, mock_settings("pipeline.batch.metrics.sampling_mode" => "disabled")) + end let(:pipelines) { chm = LogStash::PipelinesRegistry.new; chm.create_pipeline(pipeline_id, pipeline) { true }; chm } let(:agent) { double("agent") } diff --git a/logstash-core/spec/logstash/pipeline_reporter_spec.rb b/logstash-core/spec/logstash/pipeline_reporter_spec.rb index 2f541d1c347..402b3c0b8ba 100644 --- a/logstash-core/spec/logstash/pipeline_reporter_spec.rb +++ b/logstash-core/spec/logstash/pipeline_reporter_spec.rb @@ -26,7 +26,8 @@ let(:config) do "input { generator { count => #{generator_count} } } output { dummyoutput {} } " end - let(:pipeline) { Kernel.send(pipeline_setup, config)} + # Disable batch metrics mode to avoid having to initialize the entire flow metrics part as well + let(:pipeline) { Kernel.send(pipeline_setup, config, mock_settings("pipeline.batch.metrics.sampling_mode" => "disabled"))} let(:reporter) { pipeline.reporter } let(:do_setup_plugin_registry) do diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 35722710648..1ad9d751171 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -355,6 +355,43 @@ public final IRubyObject openQueue(final ThreadContext context) { return context.nil; } + /** + * Needs to be invoked after initialize_flow_metrics + * + * @return the occupation of internal structures in bytes or nil if batch metrics are disabled + * */ + @JRubyMethod(name = "estimate_batch_metrics_occupation") + public final IRubyObject estimateBatchMetricsInternalStructures(final ThreadContext context) { + if (!isBatchMetricsEnabled(context)) { + return context.nil; + } + + return context.runtime.newFixnum(getEstimateBatchMetricsInternalStructures()); + } + + private int getEstimateBatchMetricsInternalStructures() { + if (batchStructureMetric == null) { + throw new IllegalStateException("Batch metrics estimation invoked before the internal data structures were instantiated"); + } + int eventCountFootprint = batchEventCountStructureMetric.estimateBatchMetricsFootprintInBytes(); + int byteSizeFootprint = batchStructureMetric.estimateBatchMetricsFootprintInBytes(); + int histogramCollectorFootprint = filterQueueClient.estimateBatchMetricsFootprintInBytes(); + return eventCountFootprint + byteSizeFootprint + histogramCollectorFootprint; + } + + @JRubyMethod(name = "log_batch_metrics_memory_consumption") + public final IRubyObject logEstimatedBatchMetricMemoryConsumption(final ThreadContext context) { + if (metric.collector(context).isNil() || !getSetting(context, "metric.collect").isTrue()) { + LOGGER.debug("Metrics collection is disabled, skipping batch metrics logging"); + return context.nil; + } + + if (isBatchMetricsEnabled(context)) { + LOGGER.info("Pipeline `{}` batch metrics estimated memory consumption: {} bytes", pipelineId, getEstimateBatchMetricsInternalStructures()); + } + return context.nil; + } + @JRubyMethod(name = "process_events_namespace_metric") public final IRubyObject processEventsNamespaceMetric(final ThreadContext context) { return metric.namespace(context, EVENTS_METRIC_NAMESPACE); diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClient.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClient.java index 869a3c758de..8194c09e213 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClient.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClient.java @@ -40,4 +40,6 @@ public interface QueueReadClient { public void executeWithTimers(final TimerMetric.ExceptionalRunnable runnable) throws E; boolean isEmpty(); + + int estimateBatchMetricsFootprintInBytes(); } diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java index e07bbbae403..a420950b5ed 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java @@ -23,7 +23,6 @@ import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.RubyHash; -import org.jruby.RubyNumeric; import org.jruby.RubyObject; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; @@ -110,6 +109,14 @@ public IRubyObject setPipelineMetric(final IRubyObject metric) { return this; } + /** + * Must be invoked after setPipelineMetric which configured the batch metric. + * */ + @Override + public final int estimateBatchMetricsFootprintInBytes() { + return batchMetrics.estimateBatchMetricsFootprintInBytes(); + } + @JRubyMethod(name = "set_batch_dimensions") public IRubyObject rubySetBatchDimensions(final IRubyObject batchSize, final IRubyObject waitForMillis) { diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java index 35cebde0ee8..81eafc0ce1d 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java @@ -92,4 +92,12 @@ private void updateBatchSizeMetric(QueueBatch batch) { LOG.error("Failed to calculate batch byte size for metrics", e); } } + + public int estimateBatchMetricsFootprintInBytes() { + if (batchMetricMode != QueueFactoryExt.BatchMetricMode.DISABLED) { + return pipelineMetricBatchByteSizeFlowHistogram.estimateBatchMetricsFootprintInBytes() + + pipelineMetricBatchEventCountFlowHistogram.estimateBatchMetricsFootprintInBytes(); + } + return 0; + } } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/BatchStructureMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/BatchStructureMetric.java index 2b2f1e7adef..d3fff9012e3 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/BatchStructureMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/BatchStructureMetric.java @@ -104,6 +104,15 @@ public Map getValue() { return Collections.unmodifiableMap(rates); } + public int estimateBatchMetricsFootprintInBytes() { + int singleHistogramFootprint = batchHistogram.estimateSingleHistogramFootprintInBytes(); + int totalDatapointsCount = 0; + for (FlowMetricRetentionPolicy supportedPolicy : SUPPORTED_POLICIES) { + totalDatapointsCount += supportedPolicy.datapointsCount(); + } + return singleHistogramFootprint * totalDatapointsCount; + } + /** * Presentation class to render histogram data, leverage its JSON serializability. * */ diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/BuiltInFlowMetricRetentionPolicies.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/BuiltInFlowMetricRetentionPolicies.java index f427545d928..0429d9014f4 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/BuiltInFlowMetricRetentionPolicies.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/BuiltInFlowMetricRetentionPolicies.java @@ -73,6 +73,13 @@ public long commitBarrierNanos(long referenceNanos) { public long maxAgeBarrierNanos(long referenceNanos) { return Long.MIN_VALUE; } + + @Override + public int datapointsCount() { + // The lifetime retention window keeps one committed capture and, once updated, + // one staged capture prior to compaction/reporting. + return 2; + } }; private static final Set ALL; diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetricRetentionPolicy.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetricRetentionPolicy.java index 5f0ebf71c1d..4d030d8cd58 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetricRetentionPolicy.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetricRetentionPolicy.java @@ -35,6 +35,8 @@ interface FlowMetricRetentionPolicy { long maxAgeBarrierNanos(final long referenceNanos); + int datapointsCount(); + class RollingRetentionPolicy implements FlowMetricRetentionPolicy { final long resolutionNanos; @@ -45,6 +47,7 @@ class RollingRetentionPolicy implements FlowMetricRetentionPolicy { final boolean reportBeforeSatisfied; final transient String nameLower; + final int datapointsCount; RollingRetentionPolicy(final String name, final Duration maximumRetention, @@ -52,6 +55,9 @@ class RollingRetentionPolicy implements FlowMetricRetentionPolicy { final boolean reportBeforeSatisfied) { this.retentionNanos = maximumRetention.toNanos(); this.resolutionNanos = minimumResolution.toNanos(); + // Each window contains also the staging, so + // has to be summed up to the bare count of retention / resolution periods. + this.datapointsCount = (int) (retentionNanos / resolutionNanos) + 1; this.reportBeforeSatisfied = reportBeforeSatisfied; // we generally rely on query-time compaction, and only perform insertion-time compaction @@ -101,5 +107,10 @@ public long maxAgeBarrierNanos(final long referenceNanos) { public boolean reportBeforeSatisfied() { return this.reportBeforeSatisfied; } + + @Override + public int datapointsCount() { + return this.datapointsCount; + } } } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/UserMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/UserMetric.java index 4c015efb571..fd33a03b9fb 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/UserMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/UserMetric.java @@ -32,7 +32,7 @@ public IRubyObject yield(ThreadContext threadContext, IRubyObject[] iRubyObjects final IRubyObject result = metric.register(context, key, metricSupplier); final Class type = metricFactory.getType(); if (!type.isAssignableFrom(result.getJavaClass())) { - LOGGER.warn("UserMetric type mismatch for %s (expected: %s, received: %s); " + + LOGGER.warn("UserMetric type mismatch for {} (expected: {}, received: {}); " + "a null implementation will be substituted", key.asJavaString(), type, result.getJavaClass()); return metricFactory.nullImplementation(); } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/histogram/HistogramMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/histogram/HistogramMetric.java index a3ef3c009b9..66c0fcb1bbd 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/histogram/HistogramMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/histogram/HistogramMetric.java @@ -47,6 +47,16 @@ public void recordValue(long value) { // no op } + @Override + public int estimateBatchMetricsFootprintInBytes() { + return 0; + } + + @Override + public int estimateSingleHistogramFootprintInBytes() { + return 0; + } + @Override public ValueHistogram getValue() { // small empty histogram to avoid null checks in consumers @@ -58,4 +68,8 @@ public String getName() { return "NULL"; } }); + + int estimateBatchMetricsFootprintInBytes(); + + int estimateSingleHistogramFootprintInBytes(); } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/histogram/LifetimeHistogramMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/histogram/LifetimeHistogramMetric.java index f478173b28a..d25e59363c8 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/histogram/LifetimeHistogramMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/histogram/LifetimeHistogramMetric.java @@ -48,6 +48,17 @@ public void recordValue(long value) { recorder.recordValue(value); } + @Override + public int estimateBatchMetricsFootprintInBytes() { + // Recorder has at least a couple histograms, so we have no less than 3 instances + return 3 * estimateSingleHistogramFootprintInBytes(); + } + + @Override + public int estimateSingleHistogramFootprintInBytes() { + return this.lifetimeSnapshot.getEstimatedFootprintInBytes(); + } + @Override public synchronized ValueHistogram getValue() { Histogram uncommitted = recorder.getIntervalHistogram(); diff --git a/logstash-core/src/test/java/org/logstash/instrument/metrics/BatchStructureMetricTest.java b/logstash-core/src/test/java/org/logstash/instrument/metrics/BatchStructureMetricTest.java index caa9667fb44..f697318182e 100644 --- a/logstash-core/src/test/java/org/logstash/instrument/metrics/BatchStructureMetricTest.java +++ b/logstash-core/src/test/java/org/logstash/instrument/metrics/BatchStructureMetricTest.java @@ -20,6 +20,7 @@ package org.logstash.instrument.metrics; import org.HdrHistogram.Histogram; +import org.HdrHistogram.Recorder; import org.junit.Before; import org.junit.Test; import org.logstash.instrument.metrics.histogram.LifetimeHistogramMetric; @@ -169,4 +170,27 @@ public void givenRunningMetricWhenNoDataComesInForLastMinuteThenHistogramReflect assertEquals(100, lifetimeData.get50Percentile(), 10); assertEquals(200, lifetimeData.get90Percentile(), 10); } + + @Test + public void givenInstantiatedStructureMetricInstanceThenVerifyOccupationEstimation() { + // HistogramMetric uses HdrHistogram with 3 digits precision, so create a sample to have + // the rough histogram occupation. + Histogram sample = new Recorder(3).getIntervalHistogram(); + + int sampleOccupation = sample.getEstimatedFootprintInBytes(); + + //BatchStructureMetric has 4 policies + int totalDatapoints = BuiltInFlowMetricRetentionPolicies.LAST_1_MINUTE.datapointsCount() + + BuiltInFlowMetricRetentionPolicies.LAST_5_MINUTES.datapointsCount() + + BuiltInFlowMetricRetentionPolicies.LAST_15_MINUTES.datapointsCount() + + BuiltInFlowMetricRetentionPolicies.LIFETIME.datapointsCount(); + + // 60 seconds retention divided by the resolution of 3 seconds, plus the staging accumulator. + // The same reasoning applies to 5 and 15 minutes. + assertEquals(60/3 + 1 + 5 * 60/15 + 1 + 15 * 60/30 + 1 + 2, totalDatapoints); + + int expectedOccupation = sampleOccupation * totalDatapoints; + + assertEquals(expectedOccupation, sut.estimateBatchMetricsFootprintInBytes()); + } } \ No newline at end of file