diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 934feda5b..26a9e2529 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -450,6 +450,8 @@ def stop_after_successful_connection_thread # Convert the event into a 3-tuple of action, params and event hash def event_action_tuple(event) + ensure_dynamic_ilm_resources!(event) if ilm_in_use? && ilm_has_sprintf? + params = common_event_params(event) params[:_type] = get_event_type(event) if use_event_type?(nil) @@ -566,6 +568,11 @@ def resolve_document_id(event, event_id) private :resolve_document_id def resolve_index!(event, event_index) + if ilm_in_use? && ilm_has_sprintf? + dynamic_alias = resolve_dynamic_ilm_rollover_alias!(event) + return dynamic_alias + end + sprintf_index = @event_target.call(event) raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation # if it's not a data stream, sprintf_index is the @index with resolved placeholders. diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index 76dd0bd59..df78663fe 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -4,12 +4,55 @@ module Ilm ILM_POLICY_PATH = "default-ilm-policy.json" def setup_ilm + return if ilm_has_sprintf? + logger.warn("Overwriting supplied index #{@index} with rollover alias #{@ilm_rollover_alias}") unless default_index?(@index) @index = @ilm_rollover_alias maybe_create_rollover_alias maybe_create_ilm_policy end + def ilm_has_sprintf? + (@ilm_rollover_alias && @ilm_rollover_alias.match(/%{.*?}/)) || + (@ilm_policy && @ilm_policy.match(/%{.*?}/)) + end + + def resolve_dynamic_ilm_rollover_alias!(event) + resolve_dynamic_ilm_value!(event, @ilm_rollover_alias, "ilm_rollover_alias") + end + + def resolve_dynamic_ilm_policy!(event) + resolve_dynamic_ilm_value!(event, @ilm_policy, "ilm_policy") + end + + def ensure_dynamic_ilm_resources!(event) + return unless ilm_in_use? && ilm_has_sprintf? + + resolved_alias = resolve_dynamic_ilm_rollover_alias!(event) + resolved_policy = resolve_dynamic_ilm_policy!(event) + + @dynamic_ilm_validation_lock ||= Mutex.new + @dynamic_ilm_validated_keys ||= {} + validation_key = "#{resolved_alias}|#{resolved_policy}" + return if @dynamic_ilm_validated_keys[validation_key] + + @dynamic_ilm_validation_lock.synchronize do + return if @dynamic_ilm_validated_keys[validation_key] + + unless client.ilm_policy_exists?(resolved_policy) + raise EventMappingError, "Dynamic ILM policy '#{resolved_policy}' does not exist. " \ + "Pre-create policy, template, and rollover alias resources before using dynamic ILM." + end + + unless client.rollover_alias_exists?(resolved_alias) + raise EventMappingError, "Dynamic ILM rollover alias '#{resolved_alias}' does not exist. " \ + "Pre-create policy, template, and rollover alias resources before using dynamic ILM." + end + + @dynamic_ilm_validated_keys[validation_key] = true + end + end + def ilm_in_use? return @ilm_actually_enabled if defined?(@ilm_actually_enabled) @ilm_actually_enabled = @@ -32,6 +75,20 @@ def ilm_in_use? private + def resolve_dynamic_ilm_value!(event, pattern, setting_name) + resolved = event.sprintf(pattern) + + if resolved.nil? || resolved.empty? + raise EventMappingError, "#{setting_name} resolved to empty value from pattern: #{pattern}" + end + + if resolved.match(/%{.*?}/) + raise EventMappingError, "#{setting_name} contains unresolved placeholders: #{resolved}" + end + + resolved + end + def ilm_alias_set? default_index?(@index) || !default_rollover_alias?(@ilm_rollover_alias) end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 572c20716..8d30475fa 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -264,6 +264,58 @@ end end + describe "dynamic ILM resolution" do + let(:options) do + super().merge( + "ilm_enabled" => true, + "ilm_rollover_alias" => "%{[service]}", + "ilm_policy" => "%{[service]}-policy" + ) + end + + let(:event) { LogStash::Event.new("service" => "checkout") } + + before do + allow(subject).to receive(:ilm_in_use?).and_return(true) + end + + it "uses resolved rollover alias as the target index" do + allow(subject.client).to receive(:ilm_policy_exists?).with("checkout-policy").and_return(true) + allow(subject.client).to receive(:rollover_alias_exists?).with("checkout").and_return(true) + + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "checkout") + end + + it "validates dynamic policy and alias once per resolved combination" do + expect(subject.client).to receive(:ilm_policy_exists?).with("checkout-policy").once.and_return(true) + expect(subject.client).to receive(:rollover_alias_exists?).with("checkout").once.and_return(true) + + subject.send(:event_action_tuple, event) + subject.send(:event_action_tuple, event) + end + + it "raises an event mapping error when policy is missing" do + allow(subject.client).to receive(:ilm_policy_exists?).with("checkout-policy").and_return(false) + + expect { subject.send(:event_action_tuple, event) } + .to raise_error(LogStash::Outputs::ElasticSearch::EventMappingError, /Dynamic ILM policy 'checkout-policy' does not exist/) + end + + it "raises an event mapping error when placeholders are unresolved" do + unresolved_event = LogStash::Event.new({}) + + expect { subject.send(:event_action_tuple, unresolved_event) } + .to raise_error(LogStash::Outputs::ElasticSearch::EventMappingError, /contains unresolved placeholders/) + end + + it "skips startup ILM bootstrap in dynamic mode" do + expect(subject).not_to receive(:maybe_create_rollover_alias) + expect(subject).not_to receive(:maybe_create_ilm_policy) + + subject.send(:setup_ilm) + end + end + describe "with event integration metadata" do let(:event_fields) {{}} let(:event) { LogStash::Event.new(event_fields)}