Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ def stop_after_successful_connection_thread

# Convert the event into a 3-tuple of action, params and event hash
def event_action_tuple(event)
ensure_dynamic_ilm_resources!(event) if ilm_in_use? && ilm_has_sprintf?

params = common_event_params(event)
params[:_type] = get_event_type(event) if use_event_type?(nil)

Expand Down Expand Up @@ -566,6 +568,12 @@ def resolve_document_id(event, event_id)
private :resolve_document_id

def resolve_index!(event, event_index)
if ilm_in_use? && ilm_has_sprintf?
dynamic_alias = resolve_dynamic_ilm_rollover_alias!(event)
raise IndexInterpolationError, dynamic_alias if dynamic_alias.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation
Comment thread
jithsungh marked this conversation as resolved.
Outdated
return dynamic_alias
end

sprintf_index = @event_target.call(event)
raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation
# if it's not a data stream, sprintf_index is the @index with resolved placeholders.
Expand Down
57 changes: 57 additions & 0 deletions lib/logstash/outputs/elasticsearch/ilm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,55 @@ module Ilm
ILM_POLICY_PATH = "default-ilm-policy.json"

def setup_ilm
return if ilm_has_sprintf?

logger.warn("Overwriting supplied index #{@index} with rollover alias #{@ilm_rollover_alias}") unless default_index?(@index)
@index = @ilm_rollover_alias
maybe_create_rollover_alias
maybe_create_ilm_policy
Comment thread
jithsungh marked this conversation as resolved.
end

def ilm_has_sprintf?
(@ilm_rollover_alias && @ilm_rollover_alias.match(/%{.*?}/)) ||
(@ilm_policy && @ilm_policy.match(/%{.*?}/))
end

def resolve_dynamic_ilm_rollover_alias!(event)
resolve_dynamic_ilm_value!(event, @ilm_rollover_alias, "ilm_rollover_alias")
end

def resolve_dynamic_ilm_policy!(event)
resolve_dynamic_ilm_value!(event, @ilm_policy, "ilm_policy")
end

def ensure_dynamic_ilm_resources!(event)
return unless ilm_in_use? && ilm_has_sprintf?

resolved_alias = resolve_dynamic_ilm_rollover_alias!(event)
resolved_policy = resolve_dynamic_ilm_policy!(event)

@dynamic_ilm_validation_lock ||= Mutex.new
@dynamic_ilm_validated_keys ||= {}
validation_key = "#{resolved_alias}|#{resolved_policy}"
return if @dynamic_ilm_validated_keys[validation_key]

@dynamic_ilm_validation_lock.synchronize do
return if @dynamic_ilm_validated_keys[validation_key]

Comment thread
jithsungh marked this conversation as resolved.
unless client.ilm_policy_exists?(resolved_policy)
raise EventMappingError, "Dynamic ILM policy '#{resolved_policy}' does not exist. " \
"Pre-create policy, template, and rollover alias resources before using dynamic ILM."
end

unless client.rollover_alias_exists?(resolved_alias)
raise EventMappingError, "Dynamic ILM rollover alias '#{resolved_alias}' does not exist. " \
"Pre-create policy, template, and rollover alias resources before using dynamic ILM."
end

@dynamic_ilm_validated_keys[validation_key] = true
end
end

def ilm_in_use?
return @ilm_actually_enabled if defined?(@ilm_actually_enabled)
@ilm_actually_enabled =
Expand All @@ -32,6 +75,20 @@ def ilm_in_use?

private

def resolve_dynamic_ilm_value!(event, pattern, setting_name)
resolved = event.sprintf(pattern)

if resolved.nil? || resolved.empty?
raise EventMappingError, "#{setting_name} resolved to empty value from pattern: #{pattern}"
end

if resolved.match(/%{.*?}/)
raise EventMappingError, "#{setting_name} contains unresolved placeholders: #{resolved}"
end

resolved
end

def ilm_alias_set?
default_index?(@index) || !default_rollover_alias?(@ilm_rollover_alias)
end
Expand Down
52 changes: 52 additions & 0 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,58 @@
end
end

describe "dynamic ILM resolution" do
let(:options) do
super().merge(
"ilm_enabled" => true,
"ilm_rollover_alias" => "%{[service]}",
"ilm_policy" => "%{[service]}-policy"
)
end

let(:event) { LogStash::Event.new("service" => "checkout") }

before do
allow(subject).to receive(:ilm_in_use?).and_return(true)
end

it "uses resolved rollover alias as the target index" do
allow(subject.client).to receive(:ilm_policy_exists?).with("checkout-policy").and_return(true)
allow(subject.client).to receive(:rollover_alias_exists?).with("checkout").and_return(true)

expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "checkout")
end

it "validates dynamic policy and alias once per resolved combination" do
expect(subject.client).to receive(:ilm_policy_exists?).with("checkout-policy").once.and_return(true)
expect(subject.client).to receive(:rollover_alias_exists?).with("checkout").once.and_return(true)

subject.send(:event_action_tuple, event)
subject.send(:event_action_tuple, event)
end

it "raises an event mapping error when policy is missing" do
allow(subject.client).to receive(:ilm_policy_exists?).with("checkout-policy").and_return(false)

expect { subject.send(:event_action_tuple, event) }
.to raise_error(LogStash::Outputs::ElasticSearch::EventMappingError, /Dynamic ILM policy 'checkout-policy' does not exist/)
end

it "raises an event mapping error when placeholders are unresolved" do
unresolved_event = LogStash::Event.new({})

expect { subject.send(:event_action_tuple, unresolved_event) }
.to raise_error(LogStash::Outputs::ElasticSearch::EventMappingError, /contains unresolved placeholders/)
end

it "skips startup ILM bootstrap in dynamic mode" do
expect(subject).not_to receive(:maybe_create_rollover_alias)
expect(subject).not_to receive(:maybe_create_ilm_policy)

subject.send(:setup_ilm)
end
end

describe "with event integration metadata" do
let(:event_fields) {{}}
let(:event) { LogStash::Event.new(event_fields)}
Expand Down
Loading