Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ class LogStash::Outputs::Base < LogStash::Plugin

# The codec used for output data. Output codecs are a convenient method for encoding your data before it leaves the output, without needing a separate filter in your Logstash pipeline.
config :codec, :validate => :codec, :default => "plain"
# TODO remove this in Logstash 6.0
# when we no longer support the :legacy type
# This is hacky, but it can only be herne
config :workers, :type => :number, :default => 1
Comment thread
andrewvc marked this conversation as resolved.

# Set or return concurrency type
def self.concurrency(type = nil)
if type
if type == :legacy
self.logger.warn("Output plugin #{self.name} declares `concurrency :legacy` which is removed. Defaulting to :single. Please update the plugin to use `concurrency :single` or `concurrency :shared`.")
type = :single
end
if ![:shared, :single].include?(type)
raise ArgumentError, "Invalid concurrency type '#{type}', must be one of :shared, :single"
end
Comment thread
andrewvc marked this conversation as resolved.
Outdated
@concurrency = type
else
@concurrency || :legacy # default is :legacyo
@concurrency || :single
end
end

Expand All @@ -57,12 +60,6 @@ def self.threadsafe?
concurrency == :shared
end

# Deprecated: Favor `concurrency :single`
# Remove in Logstash 6.0.0
def self.declare_workers_not_supported!(message = nil)
concurrency :single
end

public

def self.plugin_type
Expand All @@ -72,12 +69,16 @@ def self.plugin_type
public
def initialize(params = {})
super
config_init(@params)

if self.workers != 1
raise LogStash::ConfigurationError, "You are using a plugin that doesn't support workers but have set the workers value explicitly! This plugin uses the #{concurrency} and doesn't need this option"
# Outputs that never declared a concurrency strategy used the now-removed :legacy
# strategy, which accepted a `workers` setting. Strip it so existing configs with
# `workers => 1` don't blow up.
if !self.class.instance_variable_defined?(:@concurrency) && @params.delete("workers")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was on the fence about adding a new @concurrency_explicit boolean to detect this, glad to switch to that approach if reviewers find this pragile.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would find it simpler to just use the :deprecated => "Extra text for deprecation log" facility already supported by the config mixin.

self.logger.warn("Output plugin #{self.class.name}: the `workers` setting is no longer used and will be removed in a future release.")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we do our own logging, we should use the deprecation_logger#deprecated() so that it goes to the deprecation log.

end

config_init(@params)

# If we're running with a single thread we must enforce single-threaded concurrency by default
# Maybe in a future version we'll assume output plugins are threadsafe
@single_worker_mutex = Mutex.new
Expand Down
24 changes: 13 additions & 11 deletions logstash-core/spec/logstash/outputs/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class LogStash::Outputs::NOOPShared < ::LogStash::Outputs::Base
def register; end
end

class LogStash::Outputs::NOOPLegacy < ::LogStash::Outputs::Base
class LogStash::Outputs::NOOPDefault < ::LogStash::Outputs::Base
def register; end
end

Expand All @@ -60,8 +60,7 @@ def multi_receive_encoded(events_and_encoded)
let(:klass) { LogStash::Outputs::NOOPSingle }

it "should instantiate cleanly" do
params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 }
worker_params = params.dup; worker_params["workers"] = 1
params = { "dummy_option" => "potatoes", "codec" => "json" }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's a bug that pre-existed your change: the params local variable is assigned but not used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!


expect { subject }.not_to raise_error
end
Expand All @@ -79,19 +78,22 @@ def multi_receive_encoded(events_and_encoded)
end
end

context "legacy" do
let(:klass) { LogStash::Outputs::NOOPLegacy }
context "default (no concurrency declared)" do
let(:klass) { LogStash::Outputs::NOOPDefault }

it "should set concurrency correctly" do
expect(subject.concurrency).to eq(:legacy)
it "should default concurrency to :single" do
expect(subject.concurrency).to eq(:single)
end

it "should default the # of workers to 1" do
expect(subject.workers).to eq(1)
it "should accept the deprecated workers setting without error" do
expect { klass.new("workers" => 1) }.not_to raise_error
end

it "should default concurrency to :legacy" do
expect(subject.concurrency).to eq(:legacy)
it "should log a warning when workers is set" do
logger = klass.logger
expect(logger).to receive(:warn).with(/workers.*no longer used/)
output = klass.new("workers" => 1)
expect(output.params).not_to include("workers")
end
end

Expand Down
10 changes: 0 additions & 10 deletions logstash-core/src/main/java/org/logstash/RubyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ public final class RubyUtil {

public static final RubyClass OUTPUT_STRATEGY_SIMPLE_ABSTRACT;

public static final RubyClass OUTPUT_STRATEGY_LEGACY;

public static final RubyClass OUTPUT_STRATEGY_SINGLE;

public static final RubyClass OUTPUT_STRATEGY_SHARED;
Expand Down Expand Up @@ -373,10 +371,6 @@ public final class RubyUtil {
context, "SimpleAbstractStrategy", OUTPUT_STRATEGY_ABSTRACT,
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
OUTPUT_STRATEGY_LEGACY = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
context, "Legacy", OUTPUT_STRATEGY_ABSTRACT,
OutputStrategyExt.LegacyOutputStrategyExt::new
);
OUTPUT_STRATEGY_SINGLE = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
context, "Single", OUTPUT_STRATEGY_SIMPLE_ABSTRACT,
OutputStrategyExt.SingleOutputStrategyExt::new
Expand All @@ -389,17 +383,13 @@ public final class RubyUtil {
OUTPUT_STRATEGY_ABSTRACT.defineMethods(context, OutputStrategyExt.SimpleAbstractOutputStrategyExt.class);
OUTPUT_STRATEGY_SHARED.defineMethods(context, OutputStrategyExt.SharedOutputStrategyExt.class);
OUTPUT_STRATEGY_SINGLE.defineMethods(context, OutputStrategyExt.SingleOutputStrategyExt.class);
OUTPUT_STRATEGY_LEGACY.defineMethods(context, OutputStrategyExt.LegacyOutputStrategyExt.class);
final OutputStrategyExt.OutputStrategyRegistryExt outputStrategyRegistry =
OutputStrategyExt.OutputStrategyRegistryExt.instance(
context, OUTPUT_DELEGATOR_STRATEGIES
);
outputStrategyRegistry.register(
context, RUBY.newSymbol("shared"), OUTPUT_STRATEGY_SHARED
);
outputStrategyRegistry.register(
context, RUBY.newSymbol("legacy"), OUTPUT_STRATEGY_LEGACY
);
outputStrategyRegistry.register(
context, RUBY.newSymbol("single"), OUTPUT_STRATEGY_SINGLE
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
package org.logstash.config.ir.compiler;

import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.api.Convert;
import org.jruby.api.Create;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.internal.runtime.methods.DynamicMethod;
Expand All @@ -37,8 +33,6 @@
import org.logstash.execution.ExecutionContextExt;
import org.logstash.plugins.factory.ContextualizerExt;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

public final class OutputStrategyExt {
Expand Down Expand Up @@ -166,85 +160,6 @@ protected abstract IRubyObject output(ThreadContext context, IRubyObject events)
protected abstract IRubyObject reg(ThreadContext context);
}

@JRubyClass(name = "Legacy", parent = "AbstractStrategy")
public static final class LegacyOutputStrategyExt extends OutputStrategyExt.AbstractOutputStrategyExt {

private static final long serialVersionUID = 1L;

private transient BlockingQueue<IRubyObject> workerQueue;

private transient IRubyObject workerCount;

private @SuppressWarnings({"rawtypes"}) RubyArray workers;

public LegacyOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod(required = 4)
public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) {
final RubyClass outputClass = (RubyClass) args[0];
final IRubyObject metric = args[1];
final ExecutionContextExt executionContext = (ExecutionContextExt) args[2];
final RubyHash pluginArgs = (RubyHash) args[3];
workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers"));
if (workerCount.isNil()) {
workerCount = RubyFixnum.one(context.runtime);
}
final int count = Convert.toInt(context, workerCount.convertToInteger());
workerQueue = new ArrayBlockingQueue<>(count);
workers = (RubyArray) Create.allocArray(context, count);
for (int i = 0; i < count; ++i) {
final IRubyObject output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs);
initOutputCallsite(outputClass);
output.callMethod(context, "metric=", metric);
workers.append(context, output);
workerQueue.add(output);
}
return this;
}

@JRubyMethod(name = "worker_count")
public IRubyObject workerCount() {
return workerCount;
}

@JRubyMethod
public IRubyObject workers() {
return workers;
}

@Override
public IRubyObject getRubyPlugin(final ThreadContext context) {
return workers.isEmpty() ? context.nil : (IRubyObject) workers.get(0);
}

@Override
protected IRubyObject output(final ThreadContext context, final IRubyObject events) throws InterruptedException {
final IRubyObject worker = workerQueue.take();
try {
invokeOutput(context, events, worker);
return context.nil;
} finally {
workerQueue.put(worker);
}
}

@Override
@SuppressWarnings("unchecked")
protected IRubyObject close(final ThreadContext context) {
workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "do_close"));
return this;
}

@Override
@SuppressWarnings("unchecked")
protected IRubyObject reg(final ThreadContext context) {
workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "register"));
return this;
}
}

@JRubyClass(name = "SimpleAbstractStrategy", parent = "AbstractStrategy")
public abstract static class SimpleAbstractOutputStrategyExt
extends OutputStrategyExt.AbstractOutputStrategyExt {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ public void singleConcurrencyStrategyIsDefault() {
public void outputStrategyTests() {
StrategyPair[] outputStrategies = new StrategyPair[]{
new StrategyPair("shared", OutputStrategyExt.SharedOutputStrategyExt.class),
new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class),
new StrategyPair("legacy", OutputStrategyExt.LegacyOutputStrategyExt.class)
new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class)
};

for (StrategyPair pair : outputStrategies) {
Expand Down Expand Up @@ -184,8 +183,7 @@ public void outputStrategyTests() {
public void outputStrategyMethodDelegationTests() {
RubySymbol[] outputStrategies = new RubySymbol[]{
RUBY.newSymbol("shared"),
RUBY.newSymbol("single"),
RUBY.newSymbol("legacy")
RUBY.newSymbol("single")
};
final ThreadContext context = RUBY.getCurrentContext();
for (RubySymbol symbol : outputStrategies) {
Expand Down
Loading