diff --git a/src/main/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriter.java b/src/main/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriter.java index 2de51b78..62551306 100644 --- a/src/main/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriter.java +++ b/src/main/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriter.java @@ -23,24 +23,33 @@ */ package org.jmxtrans.agent; -import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.*; -import static org.jmxtrans.agent.util.ConfigurationUtils.*; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.SETTING_HOST; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.SETTING_PORT; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.SETTING_PORT_DEFAULT_VALUE; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.filterNonFloatValues; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.getConfiguredMetricPrefixOrNull; +import static org.jmxtrans.agent.util.ConfigurationUtils.getInt; +import static org.jmxtrans.agent.util.ConfigurationUtils.getString; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; -import java.net.*; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.charset.Charset; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Level; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.jmxtrans.agent.graphite.GraphiteMetricMessageBuilder; import org.jmxtrans.agent.util.io.IoUtils; import org.jmxtrans.agent.util.net.HostAndPort; +import org.jmxtrans.agent.util.time.Clock; +import org.jmxtrans.agent.util.time.SystemCurrentTimeMillisClock; /** * @author Cyrille Le Clerc @@ -56,6 +65,8 @@ public class GraphitePlainTextTcpOutputWriter extends AbstractOutputWriter imple private Writer writer; private int socketConnectTimeoutInMillis = SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE; private GraphiteMetricMessageBuilder messageBuilder; + private boolean filterNonFloatValues; + private Clock clock; @Override public void postConstruct(Map settings) { @@ -68,9 +79,11 @@ public void postConstruct(Map settings) { socketConnectTimeoutInMillis = getInt(settings, SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS, SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE); + clock = new SystemCurrentTimeMillisClock(); logger.log(getInfoLevel(), "GraphitePlainTextTcpOutputWriter is configured with " + graphiteServerHostAndPort + ", metricPathPrefix=" + messageBuilder.getPrefix() + ", socketConnectTimeoutInMillis=" + socketConnectTimeoutInMillis); + filterNonFloatValues = filterNonFloatValues(settings); } @Override @@ -80,7 +93,13 @@ public void writeInvocationResult(@Nonnull String invocationName, @Nullable Obje @Override public void writeQueryResult(@Nonnull String metricName, @Nullable String type, @Nullable Object value) throws IOException { - String msg = messageBuilder.buildMessage(metricName, value, TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + if (filterNonFloatValues && !messageBuilder.isFloat(value)) { + if (logger.isLoggable(getTraceLevel())) { + logger.log(getTraceLevel(), "Filter non float value '" + value + "'"); + } + return; + } + String msg = messageBuilder.buildMessage(metricName, value, TimeUnit.SECONDS.convert(clock.getCurrentTimeMillis(), TimeUnit.MILLISECONDS)); try { ensureGraphiteConnection(); if (logger.isLoggable(getTraceLevel())) { @@ -94,6 +113,10 @@ public void writeQueryResult(@Nonnull String metricName, @Nullable String type, } } + protected void setClock(Clock clock) { + this.clock = clock; + } + private void releaseGraphiteConnection() { IoUtils.closeQuietly(writer); IoUtils.closeQuietly(socket); diff --git a/src/main/java/org/jmxtrans/agent/GraphiteUdpOutputWriter.java b/src/main/java/org/jmxtrans/agent/GraphiteUdpOutputWriter.java index 761906f5..d455a4fc 100644 --- a/src/main/java/org/jmxtrans/agent/GraphiteUdpOutputWriter.java +++ b/src/main/java/org/jmxtrans/agent/GraphiteUdpOutputWriter.java @@ -23,7 +23,8 @@ */ package org.jmxtrans.agent; -import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.*; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.getConfiguredMetricPrefixOrNull; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.getHostAndPort; import java.io.IOException; import java.net.DatagramPacket; @@ -36,6 +37,7 @@ import java.util.logging.Level; import org.jmxtrans.agent.graphite.GraphiteMetricMessageBuilder; +import org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings; import org.jmxtrans.agent.util.net.HostAndPort; import org.jmxtrans.agent.util.time.Clock; import org.jmxtrans.agent.util.time.SystemCurrentTimeMillisClock; @@ -52,6 +54,7 @@ public class GraphiteUdpOutputWriter extends AbstractOutputWriter { private UdpMessageSender messageSender; private Clock clock; private GraphiteMetricMessageBuilder messageBuilder; + private boolean filterNonFloatValues = false; @Override public void postConstruct(Map settings) { @@ -62,6 +65,7 @@ public void postConstruct(Map settings) { clock = new SystemCurrentTimeMillisClock(); logger.log(getInfoLevel(), "GraphiteUdpOutputWriter is configured with " + graphiteServerHostAndPort + ", metricPathPrefix=" + messageBuilder.getPrefix()); + filterNonFloatValues = GraphiteOutputWriterCommonSettings.filterNonFloatValues(settings); } @Override @@ -69,8 +73,14 @@ public void writeInvocationResult(String invocationName, Object value) throws IO writeQueryResult(invocationName, null, value); } - @Override + @Override public void writeQueryResult(String metricName, String metricType, Object value) throws IOException { + if (filterNonFloatValues && !messageBuilder.isFloat(value)) { + if (logger.isLoggable(getTraceLevel())) { + logger.log(getTraceLevel(), "Filter non float value '" + value + "'"); + } + return; + } String msg = messageBuilder.buildMessage(metricName, value, TimeUnit.SECONDS.convert(clock.getCurrentTimeMillis(), TimeUnit.MILLISECONDS)); logMessageIfTraceLoggable(msg); tryWriteMsg(msg + "\n"); diff --git a/src/main/java/org/jmxtrans/agent/graphite/GraphiteMetricMessageBuilder.java b/src/main/java/org/jmxtrans/agent/graphite/GraphiteMetricMessageBuilder.java index fbe9ec07..0ecf9f29 100644 --- a/src/main/java/org/jmxtrans/agent/graphite/GraphiteMetricMessageBuilder.java +++ b/src/main/java/org/jmxtrans/agent/graphite/GraphiteMetricMessageBuilder.java @@ -50,13 +50,17 @@ public GraphiteMetricMessageBuilder(@Nullable String configuredMetricPathPrefix) * @return The metric string without trailing newline */ public String buildMessage(String metricName, Object value, long timestamp) { - if (value instanceof Boolean) { - return metricPathPrefix + metricName + " " + ((Boolean)value ? 1 : 0) + " " + timestamp; - } - return metricPathPrefix + metricName + " " + value + " " + timestamp; + return metricPathPrefix + metricName + " " + convertToString(value) + " " + timestamp; } - /** + private String convertToString(Object value) { + if (value instanceof Boolean) { + return (Boolean)value ? "1" : "0"; + } + return String.valueOf(value); + } + + /** * {@link java.net.InetAddress#getLocalHost()} may not be known at JVM startup when the process is launched as a Linux service. */ private static String buildMetricPathPrefix(String configuredMetricPathPrefix) { @@ -75,5 +79,15 @@ private static String buildMetricPathPrefix(String configuredMetricPathPrefix) { public String getPrefix() { return metricPathPrefix; } - + + /** + * Checks if the given value can be sent as a float value to graphite. + * Note that Booleans are converted to 1/0 and will pass this check + * + * @param value value to check + * @return true if the string representation of value is parseable as a float + */ + public boolean isFloat(Object value) { + return convertToString(value).matches("[-+]?[0-9]*\\.?[0-9]+"); + } } diff --git a/src/main/java/org/jmxtrans/agent/graphite/GraphiteOutputWriterCommonSettings.java b/src/main/java/org/jmxtrans/agent/graphite/GraphiteOutputWriterCommonSettings.java index a78faad0..5e8766c0 100644 --- a/src/main/java/org/jmxtrans/agent/graphite/GraphiteOutputWriterCommonSettings.java +++ b/src/main/java/org/jmxtrans/agent/graphite/GraphiteOutputWriterCommonSettings.java @@ -23,7 +23,9 @@ */ package org.jmxtrans.agent.graphite; -import static org.jmxtrans.agent.util.ConfigurationUtils.*; +import static org.jmxtrans.agent.util.ConfigurationUtils.getBoolean; +import static org.jmxtrans.agent.util.ConfigurationUtils.getInt; +import static org.jmxtrans.agent.util.ConfigurationUtils.getString; import java.util.Map; @@ -38,6 +40,7 @@ public class GraphiteOutputWriterCommonSettings { public static final String SETTING_PORT = "port"; public static final int SETTING_PORT_DEFAULT_VALUE = 2003; public static final String SETTING_NAME_PREFIX = "namePrefix"; + public static final String SETTING_FILTER_NON_FLOAT = "filterNonFloatValues"; private GraphiteOutputWriterCommonSettings(){} @@ -49,4 +52,8 @@ public static HostAndPort getHostAndPort(Map settings) { public static String getConfiguredMetricPrefixOrNull(Map settings) { return getString(settings, SETTING_NAME_PREFIX, null); } + + public static boolean filterNonFloatValues(Map settings) { + return getBoolean(settings, "filterNonFloatValues", false); + } } diff --git a/src/test/java/org/jmxtrans/agent/GraphiteMetricMessageBuilderTest.java b/src/test/java/org/jmxtrans/agent/GraphiteMetricMessageBuilderTest.java index a728f694..f4d6d1bc 100644 --- a/src/test/java/org/jmxtrans/agent/GraphiteMetricMessageBuilderTest.java +++ b/src/test/java/org/jmxtrans/agent/GraphiteMetricMessageBuilderTest.java @@ -23,8 +23,12 @@ */ package org.jmxtrans.agent; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import org.jmxtrans.agent.graphite.GraphiteMetricMessageBuilder; import org.junit.Test; @@ -62,4 +66,18 @@ public void falseIsConvertedToZero() { String msg = builder.buildMessage("bar", false, 11); assertThat(msg, equalTo("foo.bar 0 11")); } + + @Test + public void checksFloatValues() throws Exception { + GraphiteMetricMessageBuilder builder = new GraphiteMetricMessageBuilder("foo."); + assertTrue(builder.isFloat("1.23")); + assertTrue(builder.isFloat("1")); + assertTrue(builder.isFloat("-1.23")); + assertTrue(builder.isFloat("0")); + assertTrue(builder.isFloat(false)); + assertTrue(builder.isFloat(true)); + assertFalse(builder.isFloat("")); + assertFalse(builder.isFloat(null)); + assertFalse(builder.isFloat("qwerty")); + } } diff --git a/src/test/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriterTest.java b/src/test/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriterTest.java index 31eda7d2..114bd728 100644 --- a/src/test/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriterTest.java +++ b/src/test/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriterTest.java @@ -24,15 +24,17 @@ package org.jmxtrans.agent; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.hamcrest.Matcher; import org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings; +import org.jmxtrans.agent.testutils.FixedTimeClock; +import org.jmxtrans.agent.util.time.Clock; import org.junit.Rule; import org.junit.Test; @@ -44,29 +46,56 @@ public class GraphitePlainTextTcpOutputWriterTest { @Rule public TcpLineServer tcpLineServer = new TcpLineServer(); + private final Clock clock = new FixedTimeClock(33000); + @Test public void reconnectsAfterServerClosesConnection() throws Exception { GraphitePlainTextTcpOutputWriter graphiteWriter = new GraphitePlainTextTcpOutputWriter(); Map config = new HashMap<>(); config.put(GraphiteOutputWriterCommonSettings.SETTING_HOST, "127.0.0.1"); config.put(GraphiteOutputWriterCommonSettings.SETTING_PORT, "" + tcpLineServer.getPort()); + config.put(GraphiteOutputWriterCommonSettings.SETTING_NAME_PREFIX, "bar."); graphiteWriter.postConstruct(config); + graphiteWriter.setClock(clock); // Write one metric to see it is received writeTestMetric(graphiteWriter); - assertEventuallyReceived(tcpLineServer, hasSize(1)); + assertEventuallyReceived(tcpLineServer, containsInAnyOrder("bar.foo 1 33")); // Disconnect the Graphite writer tcpLineServer.disconnectAllClients(); waitForErrorToBeDetectedByGraphiteWriter(graphiteWriter); - writeTestMetric(graphiteWriter); // Write one metric and verify that it is received writeTestMetric(graphiteWriter); - assertEventuallyReceived(tcpLineServer, hasSize(greaterThan(1))); + writeTestMetric(graphiteWriter); + assertEventuallyReceived(tcpLineServer, containsInAnyOrder("bar.foo 1 33", "bar.foo 2 33", "bar.foo 3 33")); + } + + @Test + public void filterNonNumericValues() throws Exception { + GraphitePlainTextTcpOutputWriter writer = new GraphitePlainTextTcpOutputWriter(); + Map config = new HashMap<>(); + config.put(GraphiteOutputWriterCommonSettings.SETTING_HOST, "127.0.0.1"); + config.put(GraphiteOutputWriterCommonSettings.SETTING_PORT, "" + tcpLineServer.getPort()); + config.put(GraphiteOutputWriterCommonSettings.SETTING_NAME_PREFIX, "bar."); + config.put(GraphiteOutputWriterCommonSettings.SETTING_FILTER_NON_FLOAT, "true"); + writer.postConstruct(config); + writer.setClock(clock); + + writer.writeQueryResult("metric", "type", 1); + writer.writeQueryResult("metric", "type", null); + writer.writeQueryResult("metric.2", "type", "non string"); + writer.writeQueryResult("metric.2", "type", "2"); + writer.writeQueryResult("metric.3", "type", ""); + writer.writeQueryResult("metric.3", "type", true); + writer.postCollect(); + assertEventuallyReceived(tcpLineServer, + containsInAnyOrder("bar.metric 1 33", "bar.metric.2 2 33", "bar.metric.3 1 33")); + tcpLineServer.disconnectAllClients(); } private void waitForErrorToBeDetectedByGraphiteWriter(GraphitePlainTextTcpOutputWriter writer) { for (int i = 0; i < 10; i++) { try { - writer.writeQueryResult("foo", null, 1); + writer.writeQueryResult("foo", null, 4711 + i); writer.postCollect(); Thread.sleep(20); } catch (Exception e) { @@ -76,16 +105,18 @@ private void waitForErrorToBeDetectedByGraphiteWriter(GraphitePlainTextTcpOutput fail("No error ocurred after closing server!"); } + private int counter; + private void writeTestMetric(GraphitePlainTextTcpOutputWriter writer) { try { - writer.writeQueryResult("foo", null, 1); + writer.writeQueryResult("foo", null, ++counter); writer.postCollect(); } catch (Exception e) { e.printStackTrace(); } } - public void assertEventuallyReceived(TcpLineServer server, Matcher> matcher) + public void assertEventuallyReceived(TcpLineServer server, Matcher> matcher) throws Exception { for (int i = 0; i < 100; i++) { if (matcher.matches(server.getReceivedLines())) { diff --git a/src/test/java/org/jmxtrans/agent/GraphiteUdpOutputWriterTest.java b/src/test/java/org/jmxtrans/agent/GraphiteUdpOutputWriterTest.java index 40ade38d..2d71d351 100644 --- a/src/test/java/org/jmxtrans/agent/GraphiteUdpOutputWriterTest.java +++ b/src/test/java/org/jmxtrans/agent/GraphiteUdpOutputWriterTest.java @@ -23,8 +23,9 @@ */ package org.jmxtrans.agent; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; import java.io.IOException; import java.net.InetAddress; @@ -37,6 +38,7 @@ import java.util.Map; import org.hamcrest.Matcher; +import org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings; import org.jmxtrans.agent.testutils.FixedTimeClock; import org.jmxtrans.agent.util.time.Clock; import org.junit.After; @@ -57,7 +59,7 @@ public class GraphiteUdpOutputWriterTest { @Rule public UdpServer udpServer = new UdpServer(); - private Clock clock = new FixedTimeClock(33000); + private final Clock clock = new FixedTimeClock(33000); private GraphiteUdpOutputWriter writer; @@ -83,12 +85,41 @@ public void manyQueryResults() throws Exception { containsInAnyOrder("foo.metric 1 33\n", "foo.metric.2 2 33\n", "foo.metric.3 3 33\n")); } + @Test + public void handlesBoolean() throws Exception { + writer.writeQueryResult("metric", "type", true); + assertEventuallyReceived(udpServer, contains("foo.metric 1 33\n")); + } + + @Test + public void handlesFalseBoolean() throws Exception { + writer.writeQueryResult("metric", "type", false); + assertEventuallyReceived(udpServer, contains("foo.metric 0 33\n")); + } + @Test public void oneInvocationResult() throws Exception { writer.writeInvocationResult("invoke", 123); assertEventuallyReceived(udpServer, contains("foo.invoke 123 33\n")); } + @Test + public void filterNonNumericValues() throws Exception { + Map testSettings = testSettings(); + testSettings.put(GraphiteOutputWriterCommonSettings.SETTING_FILTER_NON_FLOAT, "true"); + writer.postConstruct(testSettings); + writer.setClock(clock); + + writer.writeQueryResult("metric", "type", 1); + writer.writeQueryResult("metric", "type", null); + writer.writeQueryResult("metric.2", "type", "non string"); + writer.writeQueryResult("metric.2", "type", "2"); + writer.writeQueryResult("metric.3", "type", true); + writer.writeQueryResult("metric.3", "type", ""); + assertEventuallyReceived(udpServer, + containsInAnyOrder("foo.metric 1 33\n", "foo.metric.2 2 33\n", "foo.metric.3 1 33\n")); + } + @After public void destroyWriter() { writer.preDestroy(); @@ -119,7 +150,7 @@ public void assertEventuallyReceived(UdpServer server, Matcher receivedMessages = new ArrayList<>(); + private final List receivedMessages = new ArrayList<>(); private DatagramChannel channel; public void openChannel() throws Exception {