diff --git a/logstash-core/lib/logstash/config/mixin.rb b/logstash-core/lib/logstash/config/mixin.rb index 5e04ec7307c..fd558392cd2 100644 --- a/logstash-core/lib/logstash/config/mixin.rb +++ b/logstash-core/lib/logstash/config/mixin.rb @@ -122,7 +122,7 @@ def config_init(params) opts = self.class.get_config[name] if opts && opts[:deprecated] extra = opts[:deprecated].is_a?(String) ? opts[:deprecated] : "" - extra.gsub!("%PLUGIN%", self.class.config_name) + extra.gsub!("%PLUGIN%", self.class.config_name || self.class.inspect) self.deprecation_logger.deprecated( "You are using a deprecated config setting #{name.inspect} set in " \ "#{self.class.config_name}. Deprecated settings will continue to work, " \ @@ -135,7 +135,7 @@ def config_init(params) if opts && opts[:obsolete] extra = opts[:obsolete].is_a?(String) ? opts[:obsolete] : "" - extra.gsub!("%PLUGIN%", self.class.config_name) + extra.gsub!("%PLUGIN%", self.class.config_name.to_s) raise LogStash::ConfigurationError, I18n.t("logstash.runner.configuration.obsolete", :name => name, :plugin => self.class.config_name, :extra => extra) diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index 13d10f92ca1..615bfa75951 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -33,17 +33,27 @@ class LogStash::Outputs::Base < LogStash::Plugin # The codec used for output data. Output codecs are a convenient method for encoding your data before it leaves the output, without needing a separate filter in your Logstash pipeline. config :codec, :validate => :codec, :default => "plain" - # TODO remove this in Logstash 6.0 - # when we no longer support the :legacy type - # This is hacky, but it can only be herne - config :workers, :type => :number, :default => 1 + + config :workers, :type => :number, :deprecated => "This parameter will be ignored." # Set or return concurrency type def self.concurrency(type = nil) if type + if type == :legacy + self.logger.warn("Output plugin #{self.name} declares `concurrency :legacy` which is removed. Defaulting to :single. Please update the plugin to use `concurrency :single` or `concurrency :shared`.") + type = :single + end + if !LogStash::OutputDelegatorStrategyRegistry.instance.types.include?(type) + raise ArgumentError, <<~MESSAGE.gsub("\n", " ") + The concurrency type `#{type.inspect}` specified for output plugin `#{config_name}` is not supported + on this version of Logstash. If you installed this plugin specifically on this Logstash version, + it is not compatible. If you are a plugin author, please see update your plugin to use one of + the supported plugin types: #{LogStash::OutputDelegatorStrategyRegistry.instance.types} + MESSAGE + end @concurrency = type else - @concurrency || :legacy # default is :legacyo + @concurrency || :single end end @@ -57,12 +67,6 @@ def self.threadsafe? concurrency == :shared end - # Deprecated: Favor `concurrency :single` - # Remove in Logstash 6.0.0 - def self.declare_workers_not_supported!(message = nil) - concurrency :single - end - public def self.plugin_type @@ -72,11 +76,8 @@ def self.plugin_type public def initialize(params = {}) super - config_init(@params) - if self.workers != 1 - raise LogStash::ConfigurationError, "You are using a plugin that doesn't support workers but have set the workers value explicitly! This plugin uses the #{concurrency} and doesn't need this option" - end + config_init(@params) # If we're running with a single thread we must enforce single-threaded concurrency by default # Maybe in a future version we'll assume output plugins are threadsafe diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index e1f52efd32d..61ad7bc8699 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -39,7 +39,7 @@ class LogStash::Outputs::NOOPShared < ::LogStash::Outputs::Base def register; end end -class LogStash::Outputs::NOOPLegacy < ::LogStash::Outputs::Base +class LogStash::Outputs::NOOPDefault < ::LogStash::Outputs::Base def register; end end @@ -58,11 +58,9 @@ def multi_receive_encoded(events_and_encoded) context "single" do let(:klass) { LogStash::Outputs::NOOPSingle } + let(:params) { { "dummy_option" => "potatoes", "codec" => "json" } } it "should instantiate cleanly" do - params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 } - worker_params = params.dup; worker_params["workers"] = 1 - expect { subject }.not_to raise_error end @@ -79,19 +77,15 @@ def multi_receive_encoded(events_and_encoded) end end - context "legacy" do - let(:klass) { LogStash::Outputs::NOOPLegacy } + context "default (no concurrency declared)" do + let(:klass) { LogStash::Outputs::NOOPDefault } - it "should set concurrency correctly" do - expect(subject.concurrency).to eq(:legacy) - end - - it "should default the # of workers to 1" do - expect(subject.workers).to eq(1) + it "should default concurrency to :single" do + expect(subject.concurrency).to eq(:single) end - it "should default concurrency to :legacy" do - expect(subject.concurrency).to eq(:legacy) + it "should accept the deprecated workers setting without error" do + expect { klass.new("workers" => 1) }.not_to raise_error end end diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index f4c296e3522..3bc56826398 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -149,8 +149,6 @@ public final class RubyUtil { public static final RubyClass OUTPUT_STRATEGY_SIMPLE_ABSTRACT; - public static final RubyClass OUTPUT_STRATEGY_LEGACY; - public static final RubyClass OUTPUT_STRATEGY_SINGLE; public static final RubyClass OUTPUT_STRATEGY_SHARED; @@ -373,10 +371,6 @@ public final class RubyUtil { context, "SimpleAbstractStrategy", OUTPUT_STRATEGY_ABSTRACT, ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR ); - OUTPUT_STRATEGY_LEGACY = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder( - context, "Legacy", OUTPUT_STRATEGY_ABSTRACT, - OutputStrategyExt.LegacyOutputStrategyExt::new - ); OUTPUT_STRATEGY_SINGLE = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder( context, "Single", OUTPUT_STRATEGY_SIMPLE_ABSTRACT, OutputStrategyExt.SingleOutputStrategyExt::new @@ -389,7 +383,6 @@ public final class RubyUtil { OUTPUT_STRATEGY_ABSTRACT.defineMethods(context, OutputStrategyExt.SimpleAbstractOutputStrategyExt.class); OUTPUT_STRATEGY_SHARED.defineMethods(context, OutputStrategyExt.SharedOutputStrategyExt.class); OUTPUT_STRATEGY_SINGLE.defineMethods(context, OutputStrategyExt.SingleOutputStrategyExt.class); - OUTPUT_STRATEGY_LEGACY.defineMethods(context, OutputStrategyExt.LegacyOutputStrategyExt.class); final OutputStrategyExt.OutputStrategyRegistryExt outputStrategyRegistry = OutputStrategyExt.OutputStrategyRegistryExt.instance( context, OUTPUT_DELEGATOR_STRATEGIES @@ -397,9 +390,6 @@ public final class RubyUtil { outputStrategyRegistry.register( context, RUBY.newSymbol("shared"), OUTPUT_STRATEGY_SHARED ); - outputStrategyRegistry.register( - context, RUBY.newSymbol("legacy"), OUTPUT_STRATEGY_LEGACY - ); outputStrategyRegistry.register( context, RUBY.newSymbol("single"), OUTPUT_STRATEGY_SINGLE ); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java index e2475529497..4cc9c62496b 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java @@ -21,13 +21,9 @@ package org.logstash.config.ir.compiler; import org.jruby.Ruby; -import org.jruby.RubyArray; import org.jruby.RubyClass; -import org.jruby.RubyFixnum; import org.jruby.RubyHash; import org.jruby.RubyObject; -import org.jruby.api.Convert; -import org.jruby.api.Create; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.internal.runtime.methods.DynamicMethod; @@ -37,8 +33,6 @@ import org.logstash.execution.ExecutionContextExt; import org.logstash.plugins.factory.ContextualizerExt; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; public final class OutputStrategyExt { @@ -166,85 +160,6 @@ protected abstract IRubyObject output(ThreadContext context, IRubyObject events) protected abstract IRubyObject reg(ThreadContext context); } - @JRubyClass(name = "Legacy", parent = "AbstractStrategy") - public static final class LegacyOutputStrategyExt extends OutputStrategyExt.AbstractOutputStrategyExt { - - private static final long serialVersionUID = 1L; - - private transient BlockingQueue workerQueue; - - private transient IRubyObject workerCount; - - private @SuppressWarnings({"rawtypes"}) RubyArray workers; - - public LegacyOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) { - super(runtime, metaClass); - } - - @JRubyMethod(required = 4) - public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) { - final RubyClass outputClass = (RubyClass) args[0]; - final IRubyObject metric = args[1]; - final ExecutionContextExt executionContext = (ExecutionContextExt) args[2]; - final RubyHash pluginArgs = (RubyHash) args[3]; - workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers")); - if (workerCount.isNil()) { - workerCount = RubyFixnum.one(context.runtime); - } - final int count = Convert.toInt(context, workerCount.convertToInteger()); - workerQueue = new ArrayBlockingQueue<>(count); - workers = (RubyArray) Create.allocArray(context, count); - for (int i = 0; i < count; ++i) { - final IRubyObject output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs); - initOutputCallsite(outputClass); - output.callMethod(context, "metric=", metric); - workers.append(context, output); - workerQueue.add(output); - } - return this; - } - - @JRubyMethod(name = "worker_count") - public IRubyObject workerCount() { - return workerCount; - } - - @JRubyMethod - public IRubyObject workers() { - return workers; - } - - @Override - public IRubyObject getRubyPlugin(final ThreadContext context) { - return workers.isEmpty() ? context.nil : (IRubyObject) workers.get(0); - } - - @Override - protected IRubyObject output(final ThreadContext context, final IRubyObject events) throws InterruptedException { - final IRubyObject worker = workerQueue.take(); - try { - invokeOutput(context, events, worker); - return context.nil; - } finally { - workerQueue.put(worker); - } - } - - @Override - @SuppressWarnings("unchecked") - protected IRubyObject close(final ThreadContext context) { - workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "do_close")); - return this; - } - - @Override - @SuppressWarnings("unchecked") - protected IRubyObject reg(final ThreadContext context) { - workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "register")); - return this; - } - } - @JRubyClass(name = "SimpleAbstractStrategy", parent = "AbstractStrategy") public abstract static class SimpleAbstractOutputStrategyExt extends OutputStrategyExt.AbstractOutputStrategyExt { diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java index 5c378d571ac..8d0200f3e57 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java @@ -154,8 +154,7 @@ public void singleConcurrencyStrategyIsDefault() { public void outputStrategyTests() { StrategyPair[] outputStrategies = new StrategyPair[]{ new StrategyPair("shared", OutputStrategyExt.SharedOutputStrategyExt.class), - new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class), - new StrategyPair("legacy", OutputStrategyExt.LegacyOutputStrategyExt.class) + new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class) }; for (StrategyPair pair : outputStrategies) { @@ -184,8 +183,7 @@ public void outputStrategyTests() { public void outputStrategyMethodDelegationTests() { RubySymbol[] outputStrategies = new RubySymbol[]{ RUBY.newSymbol("shared"), - RUBY.newSymbol("single"), - RUBY.newSymbol("legacy") + RUBY.newSymbol("single") }; final ThreadContext context = RUBY.getCurrentContext(); for (RubySymbol symbol : outputStrategies) {