Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/config/mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def config_init(params)
opts = self.class.get_config[name]
if opts && opts[:deprecated]
extra = opts[:deprecated].is_a?(String) ? opts[:deprecated] : ""
extra.gsub!("%PLUGIN%", self.class.config_name)
extra.gsub!("%PLUGIN%", self.class.config_name.to_s)
Comment thread
andrewvc marked this conversation as resolved.
Outdated
self.deprecation_logger.deprecated(
"You are using a deprecated config setting #{name.inspect} set in " \
"#{self.class.config_name}. Deprecated settings will continue to work, " \
Expand All @@ -135,7 +135,7 @@ def config_init(params)

if opts && opts[:obsolete]
extra = opts[:obsolete].is_a?(String) ? opts[:obsolete] : ""
extra.gsub!("%PLUGIN%", self.class.config_name)
extra.gsub!("%PLUGIN%", self.class.config_name.to_s)
raise LogStash::ConfigurationError,
I18n.t("logstash.runner.configuration.obsolete", :name => name,
:plugin => self.class.config_name, :extra => extra)
Expand Down
26 changes: 11 additions & 15 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,22 @@ class LogStash::Outputs::Base < LogStash::Plugin

# The codec used for output data. Output codecs are a convenient method for encoding your data before it leaves the output, without needing a separate filter in your Logstash pipeline.
config :codec, :validate => :codec, :default => "plain"
# TODO remove this in Logstash 6.0
# when we no longer support the :legacy type
# This is hacky, but it can only be herne
config :workers, :type => :number, :default => 1
Comment thread
andrewvc marked this conversation as resolved.

config :workers, :type => :number, :deprecated => "This parameter will be ignored."

# Set or return concurrency type
def self.concurrency(type = nil)
if type
if type == :legacy
self.logger.warn("Output plugin #{self.name} declares `concurrency :legacy` which is removed. Defaulting to :single. Please update the plugin to use `concurrency :single` or `concurrency :shared`.")
type = :single
end
if ![:shared, :single].include?(type)
raise ArgumentError, "Invalid concurrency type '#{type}', must be one of :shared, :single"
end
Comment thread
andrewvc marked this conversation as resolved.
Outdated
@concurrency = type
else
@concurrency || :legacy # default is :legacyo
@concurrency || :single
end
end

Expand All @@ -57,12 +62,6 @@ def self.threadsafe?
concurrency == :shared
end

# Deprecated: Favor `concurrency :single`
# Remove in Logstash 6.0.0
def self.declare_workers_not_supported!(message = nil)
concurrency :single
end

public

def self.plugin_type
Expand All @@ -72,11 +71,8 @@ def self.plugin_type
public
def initialize(params = {})
super
config_init(@params)

if self.workers != 1
raise LogStash::ConfigurationError, "You are using a plugin that doesn't support workers but have set the workers value explicitly! This plugin uses the #{concurrency} and doesn't need this option"
end
config_init(@params)

# If we're running with a single thread we must enforce single-threaded concurrency by default
# Maybe in a future version we'll assume output plugins are threadsafe
Expand Down
22 changes: 8 additions & 14 deletions logstash-core/spec/logstash/outputs/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class LogStash::Outputs::NOOPShared < ::LogStash::Outputs::Base
def register; end
end

class LogStash::Outputs::NOOPLegacy < ::LogStash::Outputs::Base
class LogStash::Outputs::NOOPDefault < ::LogStash::Outputs::Base
def register; end
end

Expand All @@ -58,11 +58,9 @@ def multi_receive_encoded(events_and_encoded)

context "single" do
let(:klass) { LogStash::Outputs::NOOPSingle }
let(:params) { { "dummy_option" => "potatoes", "codec" => "json" } }

it "should instantiate cleanly" do
params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 }
worker_params = params.dup; worker_params["workers"] = 1

expect { subject }.not_to raise_error
end

Expand All @@ -79,19 +77,15 @@ def multi_receive_encoded(events_and_encoded)
end
end

context "legacy" do
let(:klass) { LogStash::Outputs::NOOPLegacy }
context "default (no concurrency declared)" do
let(:klass) { LogStash::Outputs::NOOPDefault }

it "should set concurrency correctly" do
expect(subject.concurrency).to eq(:legacy)
end

it "should default the # of workers to 1" do
expect(subject.workers).to eq(1)
it "should default concurrency to :single" do
expect(subject.concurrency).to eq(:single)
end

it "should default concurrency to :legacy" do
expect(subject.concurrency).to eq(:legacy)
it "should accept the deprecated workers setting without error" do
expect { klass.new("workers" => 1) }.not_to raise_error
end
end

Expand Down
10 changes: 0 additions & 10 deletions logstash-core/src/main/java/org/logstash/RubyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ public final class RubyUtil {

public static final RubyClass OUTPUT_STRATEGY_SIMPLE_ABSTRACT;

public static final RubyClass OUTPUT_STRATEGY_LEGACY;

public static final RubyClass OUTPUT_STRATEGY_SINGLE;

public static final RubyClass OUTPUT_STRATEGY_SHARED;
Expand Down Expand Up @@ -373,10 +371,6 @@ public final class RubyUtil {
context, "SimpleAbstractStrategy", OUTPUT_STRATEGY_ABSTRACT,
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
OUTPUT_STRATEGY_LEGACY = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
context, "Legacy", OUTPUT_STRATEGY_ABSTRACT,
OutputStrategyExt.LegacyOutputStrategyExt::new
);
OUTPUT_STRATEGY_SINGLE = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
context, "Single", OUTPUT_STRATEGY_SIMPLE_ABSTRACT,
OutputStrategyExt.SingleOutputStrategyExt::new
Expand All @@ -389,17 +383,13 @@ public final class RubyUtil {
OUTPUT_STRATEGY_ABSTRACT.defineMethods(context, OutputStrategyExt.SimpleAbstractOutputStrategyExt.class);
OUTPUT_STRATEGY_SHARED.defineMethods(context, OutputStrategyExt.SharedOutputStrategyExt.class);
OUTPUT_STRATEGY_SINGLE.defineMethods(context, OutputStrategyExt.SingleOutputStrategyExt.class);
OUTPUT_STRATEGY_LEGACY.defineMethods(context, OutputStrategyExt.LegacyOutputStrategyExt.class);
final OutputStrategyExt.OutputStrategyRegistryExt outputStrategyRegistry =
OutputStrategyExt.OutputStrategyRegistryExt.instance(
context, OUTPUT_DELEGATOR_STRATEGIES
);
outputStrategyRegistry.register(
context, RUBY.newSymbol("shared"), OUTPUT_STRATEGY_SHARED
);
outputStrategyRegistry.register(
context, RUBY.newSymbol("legacy"), OUTPUT_STRATEGY_LEGACY
);
outputStrategyRegistry.register(
context, RUBY.newSymbol("single"), OUTPUT_STRATEGY_SINGLE
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
package org.logstash.config.ir.compiler;

import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.api.Convert;
import org.jruby.api.Create;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.internal.runtime.methods.DynamicMethod;
Expand All @@ -37,8 +33,6 @@
import org.logstash.execution.ExecutionContextExt;
import org.logstash.plugins.factory.ContextualizerExt;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

public final class OutputStrategyExt {
Expand Down Expand Up @@ -166,85 +160,6 @@ protected abstract IRubyObject output(ThreadContext context, IRubyObject events)
protected abstract IRubyObject reg(ThreadContext context);
}

@JRubyClass(name = "Legacy", parent = "AbstractStrategy")
public static final class LegacyOutputStrategyExt extends OutputStrategyExt.AbstractOutputStrategyExt {

private static final long serialVersionUID = 1L;

private transient BlockingQueue<IRubyObject> workerQueue;

private transient IRubyObject workerCount;

private @SuppressWarnings({"rawtypes"}) RubyArray workers;

public LegacyOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod(required = 4)
public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) {
final RubyClass outputClass = (RubyClass) args[0];
final IRubyObject metric = args[1];
final ExecutionContextExt executionContext = (ExecutionContextExt) args[2];
final RubyHash pluginArgs = (RubyHash) args[3];
workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers"));
if (workerCount.isNil()) {
workerCount = RubyFixnum.one(context.runtime);
}
final int count = Convert.toInt(context, workerCount.convertToInteger());
workerQueue = new ArrayBlockingQueue<>(count);
workers = (RubyArray) Create.allocArray(context, count);
for (int i = 0; i < count; ++i) {
final IRubyObject output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs);
initOutputCallsite(outputClass);
output.callMethod(context, "metric=", metric);
workers.append(context, output);
workerQueue.add(output);
}
return this;
}

@JRubyMethod(name = "worker_count")
public IRubyObject workerCount() {
return workerCount;
}

@JRubyMethod
public IRubyObject workers() {
return workers;
}

@Override
public IRubyObject getRubyPlugin(final ThreadContext context) {
return workers.isEmpty() ? context.nil : (IRubyObject) workers.get(0);
}

@Override
protected IRubyObject output(final ThreadContext context, final IRubyObject events) throws InterruptedException {
final IRubyObject worker = workerQueue.take();
try {
invokeOutput(context, events, worker);
return context.nil;
} finally {
workerQueue.put(worker);
}
}

@Override
@SuppressWarnings("unchecked")
protected IRubyObject close(final ThreadContext context) {
workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "do_close"));
return this;
}

@Override
@SuppressWarnings("unchecked")
protected IRubyObject reg(final ThreadContext context) {
workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "register"));
return this;
}
}

@JRubyClass(name = "SimpleAbstractStrategy", parent = "AbstractStrategy")
public abstract static class SimpleAbstractOutputStrategyExt
extends OutputStrategyExt.AbstractOutputStrategyExt {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ public void singleConcurrencyStrategyIsDefault() {
public void outputStrategyTests() {
StrategyPair[] outputStrategies = new StrategyPair[]{
new StrategyPair("shared", OutputStrategyExt.SharedOutputStrategyExt.class),
new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class),
new StrategyPair("legacy", OutputStrategyExt.LegacyOutputStrategyExt.class)
new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class)
};

for (StrategyPair pair : outputStrategies) {
Expand Down Expand Up @@ -184,8 +183,7 @@ public void outputStrategyTests() {
public void outputStrategyMethodDelegationTests() {
RubySymbol[] outputStrategies = new RubySymbol[]{
RUBY.newSymbol("shared"),
RUBY.newSymbol("single"),
RUBY.newSymbol("legacy")
RUBY.newSymbol("single")
};
final ThreadContext context = RUBY.getCurrentContext();
for (RubySymbol symbol : outputStrategies) {
Expand Down
Loading