Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ def stop_after_successful_connection_thread

# Convert the event into a 3-tuple of action, params and event hash
def event_action_tuple(event)
ensure_dynamic_ilm_resources!(event) if ilm_in_use? && ilm_has_sprintf?

params = common_event_params(event)
params[:_type] = get_event_type(event) if use_event_type?(nil)

Expand Down Expand Up @@ -566,6 +568,11 @@ def resolve_document_id(event, event_id)
private :resolve_document_id

def resolve_index!(event, event_index)
if ilm_in_use? && ilm_has_sprintf?
dynamic_alias = resolve_dynamic_ilm_rollover_alias!(event)
return dynamic_alias
end

sprintf_index = @event_target.call(event)
raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation
# if it's not a data stream, sprintf_index is the @index with resolved placeholders.
Expand Down
57 changes: 57 additions & 0 deletions lib/logstash/outputs/elasticsearch/ilm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,55 @@ module Ilm
ILM_POLICY_PATH = "default-ilm-policy.json"

def setup_ilm
return if ilm_has_sprintf?

logger.warn("Overwriting supplied index #{@index} with rollover alias #{@ilm_rollover_alias}") unless default_index?(@index)
@index = @ilm_rollover_alias
maybe_create_rollover_alias
maybe_create_ilm_policy
Comment thread
jithsungh marked this conversation as resolved.
end

def ilm_has_sprintf?
(@ilm_rollover_alias && @ilm_rollover_alias.match(/%{.*?}/)) ||
(@ilm_policy && @ilm_policy.match(/%{.*?}/))
end

def resolve_dynamic_ilm_rollover_alias!(event)
resolve_dynamic_ilm_value!(event, @ilm_rollover_alias, "ilm_rollover_alias")
end

def resolve_dynamic_ilm_policy!(event)
resolve_dynamic_ilm_value!(event, @ilm_policy, "ilm_policy")
end

def ensure_dynamic_ilm_resources!(event)
return unless ilm_in_use? && ilm_has_sprintf?

resolved_alias = resolve_dynamic_ilm_rollover_alias!(event)
resolved_policy = resolve_dynamic_ilm_policy!(event)

@dynamic_ilm_validation_lock ||= Mutex.new
@dynamic_ilm_validated_keys ||= {}
validation_key = "#{resolved_alias}|#{resolved_policy}"
return if @dynamic_ilm_validated_keys[validation_key]

@dynamic_ilm_validation_lock.synchronize do
return if @dynamic_ilm_validated_keys[validation_key]

Comment thread
jithsungh marked this conversation as resolved.
unless client.ilm_policy_exists?(resolved_policy)
raise EventMappingError, "Dynamic ILM policy '#{resolved_policy}' does not exist. " \
"Pre-create policy, template, and rollover alias resources before using dynamic ILM."
end

unless client.rollover_alias_exists?(resolved_alias)
raise EventMappingError, "Dynamic ILM rollover alias '#{resolved_alias}' does not exist. " \
"Pre-create policy, template, and rollover alias resources before using dynamic ILM."
end

@dynamic_ilm_validated_keys[validation_key] = true
end
end

def ilm_in_use?
return @ilm_actually_enabled if defined?(@ilm_actually_enabled)
@ilm_actually_enabled =
Expand All @@ -32,6 +75,20 @@ def ilm_in_use?

private

def resolve_dynamic_ilm_value!(event, pattern, setting_name)
resolved = event.sprintf(pattern)

if resolved.nil? || resolved.empty?
raise EventMappingError, "#{setting_name} resolved to empty value from pattern: #{pattern}"
end

if resolved.match(/%{.*?}/)
raise EventMappingError, "#{setting_name} contains unresolved placeholders: #{resolved}"
end

resolved
end

def ilm_alias_set?
default_index?(@index) || !default_rollover_alias?(@ilm_rollover_alias)
end
Expand Down
52 changes: 52 additions & 0 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,58 @@
end
end

describe "dynamic ILM resolution" do
let(:options) do
super().merge(
"ilm_enabled" => true,
"ilm_rollover_alias" => "%{[service]}",
"ilm_policy" => "%{[service]}-policy"
)
end

let(:event) { LogStash::Event.new("service" => "checkout") }

before do
allow(subject).to receive(:ilm_in_use?).and_return(true)
end

it "uses resolved rollover alias as the target index" do
allow(subject.client).to receive(:ilm_policy_exists?).with("checkout-policy").and_return(true)
allow(subject.client).to receive(:rollover_alias_exists?).with("checkout").and_return(true)

expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "checkout")
end

it "validates dynamic policy and alias once per resolved combination" do
expect(subject.client).to receive(:ilm_policy_exists?).with("checkout-policy").once.and_return(true)
expect(subject.client).to receive(:rollover_alias_exists?).with("checkout").once.and_return(true)

subject.send(:event_action_tuple, event)
subject.send(:event_action_tuple, event)
end

it "raises an event mapping error when policy is missing" do
allow(subject.client).to receive(:ilm_policy_exists?).with("checkout-policy").and_return(false)

expect { subject.send(:event_action_tuple, event) }
.to raise_error(LogStash::Outputs::ElasticSearch::EventMappingError, /Dynamic ILM policy 'checkout-policy' does not exist/)
end

it "raises an event mapping error when placeholders are unresolved" do
unresolved_event = LogStash::Event.new({})

expect { subject.send(:event_action_tuple, unresolved_event) }
.to raise_error(LogStash::Outputs::ElasticSearch::EventMappingError, /contains unresolved placeholders/)
end

it "skips startup ILM bootstrap in dynamic mode" do
expect(subject).not_to receive(:maybe_create_rollover_alias)
expect(subject).not_to receive(:maybe_create_ilm_policy)

subject.send(:setup_ilm)
end
end

describe "with event integration metadata" do
let(:event_fields) {{}}
let(:event) { LogStash::Event.new(event_fields)}
Expand Down