Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/config/mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def config_init(params)
opts = self.class.get_config[name]
if opts && opts[:deprecated]
extra = opts[:deprecated].is_a?(String) ? opts[:deprecated] : ""
extra.gsub!("%PLUGIN%", self.class.config_name)
extra.gsub!("%PLUGIN%", self.class.config_name || self.class.inspect)
self.deprecation_logger.deprecated(
"You are using a deprecated config setting #{name.inspect} set in " \
"#{self.class.config_name}. Deprecated settings will continue to work, " \
Expand All @@ -135,7 +135,7 @@ def config_init(params)

if opts && opts[:obsolete]
extra = opts[:obsolete].is_a?(String) ? opts[:obsolete] : ""
extra.gsub!("%PLUGIN%", self.class.config_name)
extra.gsub!("%PLUGIN%", self.class.config_name.to_s)
raise LogStash::ConfigurationError,
I18n.t("logstash.runner.configuration.obsolete", :name => name,
:plugin => self.class.config_name, :extra => extra)
Expand Down
31 changes: 16 additions & 15 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,27 @@ class LogStash::Outputs::Base < LogStash::Plugin

# The codec used for output data. Output codecs are a convenient method for encoding your data before it leaves the output, without needing a separate filter in your Logstash pipeline.
config :codec, :validate => :codec, :default => "plain"
# TODO remove this in Logstash 6.0
# when we no longer support the :legacy type
# This is hacky, but it can only be herne
config :workers, :type => :number, :default => 1
Comment thread
andrewvc marked this conversation as resolved.

config :workers, :type => :number, :deprecated => "This parameter will be ignored."

# Set or return concurrency type
def self.concurrency(type = nil)
if type
if type == :legacy
self.logger.warn("Output plugin #{self.name} declares `concurrency :legacy` which is removed. Defaulting to :single. Please update the plugin to use `concurrency :single` or `concurrency :shared`.")
type = :single
end
if !LogStash::OutputDelegatorStrategyRegistry.instance.types.include?(type)
raise ArgumentError, <<~MESSAGE.gsub("\n", " ")
The concurrency type `#{type.inspect}` specified for output plugin `#{config_name}` is not supported
on this version of Logstash. If you installed this plugin specifically on this Logstash version,
it is not compatible. If you are a plugin author, please see update your plugin to use one of
the supported plugin types: #{LogStash::OutputDelegatorStrategyRegistry.instance.types}
MESSAGE
end
@concurrency = type
else
@concurrency || :legacy # default is :legacyo
@concurrency || :single
end
end

Expand All @@ -57,12 +67,6 @@ def self.threadsafe?
concurrency == :shared
end

# Deprecated: Favor `concurrency :single`
# Remove in Logstash 6.0.0
def self.declare_workers_not_supported!(message = nil)
concurrency :single
end

public

def self.plugin_type
Expand All @@ -72,11 +76,8 @@ def self.plugin_type
public
def initialize(params = {})
super
config_init(@params)

if self.workers != 1
raise LogStash::ConfigurationError, "You are using a plugin that doesn't support workers but have set the workers value explicitly! This plugin uses the #{concurrency} and doesn't need this option"
end
config_init(@params)

# If we're running with a single thread we must enforce single-threaded concurrency by default
# Maybe in a future version we'll assume output plugins are threadsafe
Expand Down
22 changes: 8 additions & 14 deletions logstash-core/spec/logstash/outputs/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class LogStash::Outputs::NOOPShared < ::LogStash::Outputs::Base
def register; end
end

class LogStash::Outputs::NOOPLegacy < ::LogStash::Outputs::Base
class LogStash::Outputs::NOOPDefault < ::LogStash::Outputs::Base
def register; end
end

Expand All @@ -58,11 +58,9 @@ def multi_receive_encoded(events_and_encoded)

context "single" do
let(:klass) { LogStash::Outputs::NOOPSingle }
let(:params) { { "dummy_option" => "potatoes", "codec" => "json" } }

it "should instantiate cleanly" do
params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 }
worker_params = params.dup; worker_params["workers"] = 1

expect { subject }.not_to raise_error
end

Expand All @@ -79,19 +77,15 @@ def multi_receive_encoded(events_and_encoded)
end
end

context "legacy" do
let(:klass) { LogStash::Outputs::NOOPLegacy }
context "default (no concurrency declared)" do
let(:klass) { LogStash::Outputs::NOOPDefault }

it "should set concurrency correctly" do
expect(subject.concurrency).to eq(:legacy)
end

it "should default the # of workers to 1" do
expect(subject.workers).to eq(1)
it "should default concurrency to :single" do
expect(subject.concurrency).to eq(:single)
end

it "should default concurrency to :legacy" do
expect(subject.concurrency).to eq(:legacy)
it "should accept the deprecated workers setting without error" do
expect { klass.new("workers" => 1) }.not_to raise_error
end
end

Expand Down
10 changes: 0 additions & 10 deletions logstash-core/src/main/java/org/logstash/RubyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ public final class RubyUtil {

public static final RubyClass OUTPUT_STRATEGY_SIMPLE_ABSTRACT;

public static final RubyClass OUTPUT_STRATEGY_LEGACY;

public static final RubyClass OUTPUT_STRATEGY_SINGLE;

public static final RubyClass OUTPUT_STRATEGY_SHARED;
Expand Down Expand Up @@ -373,10 +371,6 @@ public final class RubyUtil {
context, "SimpleAbstractStrategy", OUTPUT_STRATEGY_ABSTRACT,
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
OUTPUT_STRATEGY_LEGACY = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
context, "Legacy", OUTPUT_STRATEGY_ABSTRACT,
OutputStrategyExt.LegacyOutputStrategyExt::new
);
OUTPUT_STRATEGY_SINGLE = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
context, "Single", OUTPUT_STRATEGY_SIMPLE_ABSTRACT,
OutputStrategyExt.SingleOutputStrategyExt::new
Expand All @@ -389,17 +383,13 @@ public final class RubyUtil {
OUTPUT_STRATEGY_ABSTRACT.defineMethods(context, OutputStrategyExt.SimpleAbstractOutputStrategyExt.class);
OUTPUT_STRATEGY_SHARED.defineMethods(context, OutputStrategyExt.SharedOutputStrategyExt.class);
OUTPUT_STRATEGY_SINGLE.defineMethods(context, OutputStrategyExt.SingleOutputStrategyExt.class);
OUTPUT_STRATEGY_LEGACY.defineMethods(context, OutputStrategyExt.LegacyOutputStrategyExt.class);
final OutputStrategyExt.OutputStrategyRegistryExt outputStrategyRegistry =
OutputStrategyExt.OutputStrategyRegistryExt.instance(
context, OUTPUT_DELEGATOR_STRATEGIES
);
outputStrategyRegistry.register(
context, RUBY.newSymbol("shared"), OUTPUT_STRATEGY_SHARED
);
outputStrategyRegistry.register(
context, RUBY.newSymbol("legacy"), OUTPUT_STRATEGY_LEGACY
);
outputStrategyRegistry.register(
context, RUBY.newSymbol("single"), OUTPUT_STRATEGY_SINGLE
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
package org.logstash.config.ir.compiler;

import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.api.Convert;
import org.jruby.api.Create;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.internal.runtime.methods.DynamicMethod;
Expand All @@ -37,8 +33,6 @@
import org.logstash.execution.ExecutionContextExt;
import org.logstash.plugins.factory.ContextualizerExt;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

public final class OutputStrategyExt {
Expand Down Expand Up @@ -166,85 +160,6 @@ protected abstract IRubyObject output(ThreadContext context, IRubyObject events)
protected abstract IRubyObject reg(ThreadContext context);
}

@JRubyClass(name = "Legacy", parent = "AbstractStrategy")
public static final class LegacyOutputStrategyExt extends OutputStrategyExt.AbstractOutputStrategyExt {

private static final long serialVersionUID = 1L;

private transient BlockingQueue<IRubyObject> workerQueue;

private transient IRubyObject workerCount;

private @SuppressWarnings({"rawtypes"}) RubyArray workers;

public LegacyOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod(required = 4)
public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) {
final RubyClass outputClass = (RubyClass) args[0];
final IRubyObject metric = args[1];
final ExecutionContextExt executionContext = (ExecutionContextExt) args[2];
final RubyHash pluginArgs = (RubyHash) args[3];
workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers"));
if (workerCount.isNil()) {
workerCount = RubyFixnum.one(context.runtime);
}
final int count = Convert.toInt(context, workerCount.convertToInteger());
workerQueue = new ArrayBlockingQueue<>(count);
workers = (RubyArray) Create.allocArray(context, count);
for (int i = 0; i < count; ++i) {
final IRubyObject output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs);
initOutputCallsite(outputClass);
output.callMethod(context, "metric=", metric);
workers.append(context, output);
workerQueue.add(output);
}
return this;
}

@JRubyMethod(name = "worker_count")
public IRubyObject workerCount() {
return workerCount;
}

@JRubyMethod
public IRubyObject workers() {
return workers;
}

@Override
public IRubyObject getRubyPlugin(final ThreadContext context) {
return workers.isEmpty() ? context.nil : (IRubyObject) workers.get(0);
}

@Override
protected IRubyObject output(final ThreadContext context, final IRubyObject events) throws InterruptedException {
final IRubyObject worker = workerQueue.take();
try {
invokeOutput(context, events, worker);
return context.nil;
} finally {
workerQueue.put(worker);
}
}

@Override
@SuppressWarnings("unchecked")
protected IRubyObject close(final ThreadContext context) {
workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "do_close"));
return this;
}

@Override
@SuppressWarnings("unchecked")
protected IRubyObject reg(final ThreadContext context) {
workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "register"));
return this;
}
}

@JRubyClass(name = "SimpleAbstractStrategy", parent = "AbstractStrategy")
public abstract static class SimpleAbstractOutputStrategyExt
extends OutputStrategyExt.AbstractOutputStrategyExt {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ public void singleConcurrencyStrategyIsDefault() {
public void outputStrategyTests() {
StrategyPair[] outputStrategies = new StrategyPair[]{
new StrategyPair("shared", OutputStrategyExt.SharedOutputStrategyExt.class),
new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class),
new StrategyPair("legacy", OutputStrategyExt.LegacyOutputStrategyExt.class)
new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class)
};

for (StrategyPair pair : outputStrategies) {
Expand Down Expand Up @@ -184,8 +183,7 @@ public void outputStrategyTests() {
public void outputStrategyMethodDelegationTests() {
RubySymbol[] outputStrategies = new RubySymbol[]{
RUBY.newSymbol("shared"),
RUBY.newSymbol("single"),
RUBY.newSymbol("legacy")
RUBY.newSymbol("single")
};
final ThreadContext context = RUBY.getCurrentContext();
for (RubySymbol symbol : outputStrategies) {
Expand Down
Loading