diff --git a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java index 7dc64ef5f7..071eee7b2a 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java +++ b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2026 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,6 +61,14 @@ public class Metrics { */ public static final String HTTP_CLIENT_PREFIX = "reactor.netty.http.client"; + /** + * Name prefix that will be used for the WebSocket client's metrics + * registered in Micrometer's global registry. + * + * @since 1.3.5 + */ + public static final String WEBSOCKET_CLIENT_PREFIX = "reactor.netty.websocket.client"; + /** * Name prefix that will be used for the TCP server's metrics * registered in Micrometer's global registry. @@ -145,6 +153,20 @@ public class Metrics { */ public static final String RESPONSE_TIME = ".response.time"; + /** + * Time spent for WebSocket handshake. + * + * @since 1.3.5 + */ + public static final String HANDSHAKE_TIME = ".handshake.time"; + + /** + * Duration of the WebSocket connection. + * + * @since 1.3.5 + */ + public static final String CONNECTION_DURATION = ".connection.duration"; + /** * The number of all connections, whether they are active or idle. */ diff --git a/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java b/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java index b4e3cb2d78..794dd89dd0 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java +++ b/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java @@ -134,6 +134,7 @@ public interface NettyPipeline { String TlsMetricsHandler = LEFT + "tlsMetricsHandler"; String WsCompressionHandler = LEFT + "wsCompressionHandler"; String WsFrameAggregator = LEFT + "wsFrameAggregator"; + String WsMetricsHandler = LEFT + "wsMetricsHandler"; String ReactiveBridge = RIGHT + "reactiveBridge"; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java new file mode 100644 index 0000000000..4102803d6a --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java @@ -0,0 +1,269 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import org.jspecify.annotations.Nullable; +import reactor.util.context.ContextView; +import reactor.util.Logger; +import reactor.util.Loggers; + +import java.net.SocketAddress; +import java.time.Duration; + +import static reactor.netty.ReactorNetty.format; + +/** + * {@link ChannelDuplexHandler} for handling WebSocket {@link HttpClient} metrics. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +abstract class AbstractWebSocketClientMetricsHandler extends ChannelDuplexHandler { + + private static final Logger log = Loggers.getLogger(AbstractWebSocketClientMetricsHandler.class); + + final String method; + final @Nullable SocketAddress proxyAddress; + final SocketAddress remoteAddress; + + final String path; + + final ContextView contextView; + + long dataReceived; + + long dataSent; + + long dataReceivedTime; + + long dataSentTime; + + long connectionStartTime; + + long handshakeStartTime; + + protected AbstractWebSocketClientMetricsHandler(SocketAddress remoteAddress, @Nullable SocketAddress proxyAddress, + String path, ContextView contextView, String method) { + this.method = method; + this.path = path; + this.contextView = contextView; + this.proxyAddress = proxyAddress; + this.remoteAddress = remoteAddress; + } + + @Override + public boolean isSharable() { + return false; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + super.handlerAdded(ctx); + connectionStartTime = System.nanoTime(); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + try { + if (connectionStartTime > 0) { + recordConnectionClosed(); + } + } + catch (RuntimeException e) { + if (log.isWarnEnabled()) { + log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e); + } + } + super.handlerRemoved(ctx); + } + + void startHandshake(Channel channel) { + handshakeStartTime = System.nanoTime(); + } + + void recordHandshakeComplete(Channel channel, String status) { + Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime); + if (proxyAddress == null) { + recorder().recordWebSocketHandshakeTime(remoteAddress, path, status, time); + } + else { + recorder().recordWebSocketHandshakeTime(remoteAddress, proxyAddress, path, status, time); + } + } + + void recordHandshakeFailure(Channel channel) { + Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime); + if (proxyAddress == null) { + recorder().recordWebSocketHandshakeTime(remoteAddress, path, "ERROR", time); + } + else { + recorder().recordWebSocketHandshakeTime(remoteAddress, proxyAddress, path, "ERROR", time); + } + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + try { + if (msg instanceof WebSocketFrame) { + WebSocketFrame frame = (WebSocketFrame) msg; + if (isDataFrame(frame)) { + if (dataSentTime == 0) { + dataSentTime = System.nanoTime(); + } + dataSent += extractProcessedDataFromBuffer(frame); + + if (frame.isFinalFragment()) { + // VoidChannelPromise does not support addListener, unvoid to ensure the listener fires + promise = promise.unvoid(); + promise.addListener(f -> { + try { + recordWrite(remoteAddress); + dataSentTime = 0; + } + catch (RuntimeException e) { + if (log.isWarnEnabled()) { + log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e); + } + } + }); + } + } + } + } + catch (RuntimeException e) { + if (log.isWarnEnabled()) { + log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e); + } + } + + //"FutureReturnValueIgnored" this is deliberate + ctx.write(msg, promise); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + try { + if (msg instanceof WebSocketFrame) { + WebSocketFrame frame = (WebSocketFrame) msg; + if (isDataFrame(frame)) { + if (dataReceivedTime == 0) { + dataReceivedTime = System.nanoTime(); + } + dataReceived += extractProcessedDataFromBuffer(frame); + + if (frame.isFinalFragment()) { + recordRead(remoteAddress); + dataReceivedTime = 0; + } + } + } + } + catch (RuntimeException e) { + if (log.isWarnEnabled()) { + log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e); + } + } + + ctx.fireChannelRead(msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + recordException(); + } + catch (RuntimeException e) { + if (log.isWarnEnabled()) { + log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e); + } + } + + ctx.fireExceptionCaught(cause); + } + + static boolean isDataFrame(WebSocketFrame msg) { + return !(msg instanceof CloseWebSocketFrame) && + !(msg instanceof PingWebSocketFrame) && + !(msg instanceof PongWebSocketFrame); + } + + private static long extractProcessedDataFromBuffer(WebSocketFrame msg) { + return msg.content().readableBytes(); + } + + protected abstract WebSocketClientMetricsRecorder recorder(); + + protected void recordConnectionClosed() { + Duration duration = Duration.ofNanos(System.nanoTime() - connectionStartTime); + if (proxyAddress == null) { + recorder().recordWebSocketConnectionDuration(remoteAddress, path, duration); + } + else { + recorder().recordWebSocketConnectionDuration(remoteAddress, proxyAddress, path, duration); + } + } + + protected void recordException() { + if (proxyAddress == null) { + recorder().incrementErrorsCount(remoteAddress, path); + } + else { + recorder().incrementErrorsCount(remoteAddress, proxyAddress, path); + } + } + + protected void recordRead(SocketAddress address) { + if (proxyAddress == null) { + recorder().recordDataReceivedTime(address, path, method, "n/a", + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + + recorder().recordDataReceived(address, path, dataReceived); + } + else { + recorder().recordDataReceivedTime(address, proxyAddress, path, method, "n/a", + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + + recorder().recordDataReceived(address, proxyAddress, path, dataReceived); + } + dataReceived = 0; + } + + protected void recordWrite(SocketAddress address) { + if (proxyAddress == null) { + recorder().recordDataSentTime(address, path, method, + Duration.ofNanos(System.nanoTime() - dataSentTime)); + + recorder().recordDataSent(address, path, dataSent); + } + else { + recorder().recordDataSentTime(address, proxyAddress, path, method, + Duration.ofNanos(System.nanoTime() - dataSentTime)); + + recorder().recordDataSent(address, proxyAddress, path, dataSent); + } + dataSent = 0; + } + +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsHandler.java new file mode 100644 index 0000000000..d5b892d764 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsHandler.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import io.netty.channel.Channel; +import org.jspecify.annotations.Nullable; +import reactor.util.context.ContextView; + +import java.net.SocketAddress; +import java.time.Duration; + +/** + * {@link AbstractWebSocketClientMetricsHandler} that propagates + * {@link reactor.util.context.ContextView}. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +final class ContextAwareWebSocketClientMetricsHandler extends AbstractWebSocketClientMetricsHandler { + + final ContextAwareWebSocketClientMetricsRecorder recorder; + + ContextAwareWebSocketClientMetricsHandler(ContextAwareWebSocketClientMetricsRecorder recorder, + SocketAddress remoteAddress, + @Nullable SocketAddress proxyAddress, + String path, + ContextView contextView, + String method) { + super(remoteAddress, proxyAddress, path, contextView, method); + this.recorder = recorder; + } + + @Override + protected ContextAwareWebSocketClientMetricsRecorder recorder() { + return recorder; + } + + @Override + void recordHandshakeComplete(Channel channel, String status) { + Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime); + if (proxyAddress == null) { + recorder.recordWebSocketHandshakeTime(contextView, remoteAddress, path, status, time); + } + else { + recorder.recordWebSocketHandshakeTime(contextView, remoteAddress, proxyAddress, path, status, time); + } + } + + @Override + void recordHandshakeFailure(Channel channel) { + Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime); + if (proxyAddress == null) { + recorder.recordWebSocketHandshakeTime(contextView, remoteAddress, path, "ERROR", time); + } + else { + recorder.recordWebSocketHandshakeTime(contextView, remoteAddress, proxyAddress, path, "ERROR", time); + } + } + + @Override + protected void recordConnectionClosed() { + Duration duration = Duration.ofNanos(System.nanoTime() - connectionStartTime); + if (proxyAddress == null) { + recorder.recordWebSocketConnectionDuration(contextView, remoteAddress, path, duration); + } + else { + recorder.recordWebSocketConnectionDuration(contextView, remoteAddress, proxyAddress, path, duration); + } + } + + @Override + protected void recordException() { + if (proxyAddress == null) { + recorder().incrementErrorsCount(contextView, remoteAddress, path); + } + else { + recorder().incrementErrorsCount(contextView, remoteAddress, proxyAddress, path); + } + } + + @Override + protected void recordWrite(SocketAddress address) { + if (proxyAddress == null) { + recorder.recordDataSentTime(contextView, address, path, method, + Duration.ofNanos(System.nanoTime() - dataSentTime)); + + recorder.recordDataSent(contextView, address, path, dataSent); + } + else { + recorder.recordDataSentTime(contextView, address, proxyAddress, path, method, + Duration.ofNanos(System.nanoTime() - dataSentTime)); + + recorder.recordDataSent(contextView, address, proxyAddress, path, dataSent); + } + dataSent = 0; + } + + @Override + protected void recordRead(SocketAddress address) { + if (proxyAddress == null) { + recorder.recordDataReceivedTime(contextView, address, path, method, "n/a", + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + + recorder.recordDataReceived(contextView, address, path, dataReceived); + } + else { + recorder.recordDataReceivedTime(contextView, address, proxyAddress, path, method, "n/a", + Duration.ofNanos(System.nanoTime() - dataReceivedTime)); + + recorder.recordDataReceived(contextView, address, proxyAddress, path, dataReceived); + } + dataReceived = 0; + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsRecorder.java new file mode 100644 index 0000000000..ab2eaf2a06 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsRecorder.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import reactor.util.context.Context; +import reactor.util.context.ContextView; + +import java.net.SocketAddress; +import java.time.Duration; + +/** + * {@link ContextView} aware class for collecting metrics on WebSocket client level. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +public abstract class ContextAwareWebSocketClientMetricsRecorder extends ContextAwareHttpClientMetricsRecorder + implements WebSocketClientMetricsRecorder { + + /** + * Records the time that is spent for the WebSocket handshake. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param uri the requested URI + * @param status the WebSocket handshake status + * @param time the time in nanoseconds that is spent for the handshake + */ + public abstract void recordWebSocketHandshakeTime(ContextView contextView, SocketAddress remoteAddress, String uri, + String status, Duration time); + + /** + * Records the time that is spent for the WebSocket handshake. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @param status the WebSocket handshake status + * @param time the time in nanoseconds that is spent for the handshake + */ + public void recordWebSocketHandshakeTime(ContextView contextView, SocketAddress remoteAddress, + SocketAddress proxyAddress, String uri, String status, Duration time) { + recordWebSocketHandshakeTime(contextView, remoteAddress, uri, status, time); + } + + @Override + public void recordWebSocketHandshakeTime(SocketAddress remoteAddress, String uri, String status, Duration time) { + recordWebSocketHandshakeTime(Context.empty(), remoteAddress, uri, status, time); + } + + @Override + public void recordWebSocketHandshakeTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, + String status, Duration time) { + recordWebSocketHandshakeTime(Context.empty(), remoteAddress, proxyAddress, uri, status, time); + } + + /** + * Records the duration of the WebSocket connection. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param uri the requested URI + * @param time the duration of the connection + */ + public abstract void recordWebSocketConnectionDuration(ContextView contextView, SocketAddress remoteAddress, + String uri, Duration time); + + /** + * Records the duration of the WebSocket connection. + * + * @param contextView The current {@link ContextView} associated with the Mono/Flux + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @param time the duration of the connection + */ + public void recordWebSocketConnectionDuration(ContextView contextView, SocketAddress remoteAddress, + SocketAddress proxyAddress, String uri, Duration time) { + recordWebSocketConnectionDuration(contextView, remoteAddress, uri, time); + } + + @Override + public void recordWebSocketConnectionDuration(SocketAddress remoteAddress, String uri, Duration time) { + recordWebSocketConnectionDuration(Context.empty(), remoteAddress, uri, time); + } + + @Override + public void recordWebSocketConnectionDuration(SocketAddress remoteAddress, SocketAddress proxyAddress, + String uri, Duration time) { + recordWebSocketConnectionDuration(Context.empty(), remoteAddress, proxyAddress, uri, time); + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/DefaultContextAwareWebSocketClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/DefaultContextAwareWebSocketClientMetricsRecorder.java new file mode 100644 index 0000000000..48aa16f15d --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/DefaultContextAwareWebSocketClientMetricsRecorder.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import reactor.util.context.ContextView; + +import java.net.SocketAddress; +import java.time.Duration; + +/** + * {@link ContextAwareWebSocketClientMetricsRecorder} that delegates to a {@link ContextAwareHttpClientMetricsRecorder}. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +final class DefaultContextAwareWebSocketClientMetricsRecorder extends ContextAwareWebSocketClientMetricsRecorder { + + final ContextAwareHttpClientMetricsRecorder recorder; + + DefaultContextAwareWebSocketClientMetricsRecorder(ContextAwareHttpClientMetricsRecorder recorder) { + this.recorder = recorder; + } + + @Override + public void recordWebSocketHandshakeTime(ContextView contextView, SocketAddress remoteAddress, String uri, + String status, Duration time) { + } + + @Override + public void recordWebSocketConnectionDuration(ContextView contextView, SocketAddress remoteAddress, + String uri, Duration time) { + } + + @Override + public void recordDataReceivedTime(ContextView contextView, SocketAddress remoteAddress, String uri, + String method, String status, Duration time) { + recorder.recordDataReceivedTime(contextView, remoteAddress, uri, method, status, time); + } + + @Override + public void recordDataReceivedTime(ContextView contextView, SocketAddress remoteAddress, + SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + recorder.recordDataReceivedTime(contextView, remoteAddress, proxyAddress, uri, method, status, time); + } + + @Override + public void recordDataSentTime(ContextView contextView, SocketAddress remoteAddress, String uri, + String method, Duration time) { + recorder.recordDataSentTime(contextView, remoteAddress, uri, method, time); + } + + @Override + public void recordDataSentTime(ContextView contextView, SocketAddress remoteAddress, + SocketAddress proxyAddress, String uri, String method, Duration time) { + recorder.recordDataSentTime(contextView, remoteAddress, proxyAddress, uri, method, time); + } + + @Override + public void recordResponseTime(ContextView contextView, SocketAddress remoteAddress, String uri, + String method, String status, Duration time) { + recorder.recordResponseTime(contextView, remoteAddress, uri, method, status, time); + } + + @Override + public void recordResponseTime(ContextView contextView, SocketAddress remoteAddress, + SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + recorder.recordResponseTime(contextView, remoteAddress, proxyAddress, uri, method, status, time); + } + + @Override + public void incrementErrorsCount(ContextView contextView, SocketAddress remoteAddress, String uri) { + recorder.incrementErrorsCount(contextView, remoteAddress, uri); + } + + @Override + public void incrementErrorsCount(ContextView contextView, SocketAddress remoteAddress, + SocketAddress proxyAddress, String uri) { + recorder.incrementErrorsCount(contextView, remoteAddress, proxyAddress, uri); + } + + @Override + public void recordDataReceived(ContextView contextView, SocketAddress remoteAddress, String uri, long bytes) { + recorder.recordDataReceived(contextView, remoteAddress, uri, bytes); + } + + @Override + public void recordDataReceived(ContextView contextView, SocketAddress remoteAddress, + SocketAddress proxyAddress, String uri, long bytes) { + recorder.recordDataReceived(contextView, remoteAddress, proxyAddress, uri, bytes); + } + + @Override + public void recordDataSent(ContextView contextView, SocketAddress remoteAddress, String uri, long bytes) { + recorder.recordDataSent(contextView, remoteAddress, uri, bytes); + } + + @Override + public void recordDataSent(ContextView contextView, SocketAddress remoteAddress, + SocketAddress proxyAddress, String uri, long bytes) { + recorder.recordDataSent(contextView, remoteAddress, proxyAddress, uri, bytes); + } + + @Override + public void incrementErrorsCount(ContextView contextView, SocketAddress remoteAddress) { + recorder.incrementErrorsCount(contextView, remoteAddress); + } + + @Override + public void recordConnectTime(ContextView contextView, SocketAddress remoteAddress, Duration time, String status) { + recorder.recordConnectTime(contextView, remoteAddress, time, status); + } + + @Override + public void recordDataReceived(ContextView contextView, SocketAddress remoteAddress, long bytes) { + recorder.recordDataReceived(contextView, remoteAddress, bytes); + } + + @Override + public void recordDataSent(ContextView contextView, SocketAddress remoteAddress, long bytes) { + recorder.recordDataSent(contextView, remoteAddress, bytes); + } + + @Override + public void recordTlsHandshakeTime(ContextView contextView, SocketAddress remoteAddress, Duration time, String status) { + recorder.recordTlsHandshakeTime(contextView, remoteAddress, time, status); + } + + @Override + public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time, String status) { + recorder.recordResolveAddressTime(remoteAddress, time, status); + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/DefaultWebSocketClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/DefaultWebSocketClientMetricsRecorder.java new file mode 100644 index 0000000000..a45a9fba84 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/DefaultWebSocketClientMetricsRecorder.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import java.net.SocketAddress; +import java.time.Duration; + +/** + * {@link WebSocketClientMetricsRecorder} that delegates to a {@link HttpClientMetricsRecorder}. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +final class DefaultWebSocketClientMetricsRecorder implements WebSocketClientMetricsRecorder { + + final HttpClientMetricsRecorder recorder; + + DefaultWebSocketClientMetricsRecorder(HttpClientMetricsRecorder recorder) { + this.recorder = recorder; + } + + @Override + public void recordWebSocketHandshakeTime(SocketAddress remoteAddress, String uri, String status, Duration time) { + } + + @Override + public void recordWebSocketConnectionDuration(SocketAddress remoteAddress, String uri, Duration time) { + } + + @Override + public void recordDataReceivedTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) { + recorder.recordDataReceivedTime(remoteAddress, uri, method, status, time); + } + + @Override + public void recordDataReceivedTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + recorder.recordDataReceivedTime(remoteAddress, proxyAddress, uri, method, status, time); + } + + @Override + public void recordDataSentTime(SocketAddress remoteAddress, String uri, String method, Duration time) { + recorder.recordDataSentTime(remoteAddress, uri, method, time); + } + + @Override + public void recordDataSentTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, Duration time) { + recorder.recordDataSentTime(remoteAddress, proxyAddress, uri, method, time); + } + + @Override + public void recordResponseTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) { + recorder.recordResponseTime(remoteAddress, uri, method, status, time); + } + + @Override + public void recordResponseTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + recorder.recordResponseTime(remoteAddress, proxyAddress, uri, method, status, time); + } + + @Override + public void incrementErrorsCount(SocketAddress remoteAddress, String uri) { + recorder.incrementErrorsCount(remoteAddress, uri); + } + + @Override + public void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri) { + recorder.incrementErrorsCount(remoteAddress, proxyAddress, uri); + } + + @Override + public void recordDataReceived(SocketAddress remoteAddress, String uri, long bytes) { + recorder.recordDataReceived(remoteAddress, uri, bytes); + } + + @Override + public void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recorder.recordDataReceived(remoteAddress, proxyAddress, uri, bytes); + } + + @Override + public void recordDataSent(SocketAddress remoteAddress, String uri, long bytes) { + recorder.recordDataSent(remoteAddress, uri, bytes); + } + + @Override + public void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recorder.recordDataSent(remoteAddress, proxyAddress, uri, bytes); + } + + @Override + public void incrementErrorsCount(SocketAddress remoteAddress) { + recorder.incrementErrorsCount(remoteAddress); + } + + @Override + public void recordConnectTime(SocketAddress remoteAddress, Duration time, String status) { + recorder.recordConnectTime(remoteAddress, time, status); + } + + @Override + public void recordDataReceived(SocketAddress remoteAddress, long bytes) { + recorder.recordDataReceived(remoteAddress, bytes); + } + + @Override + public void recordDataSent(SocketAddress remoteAddress, long bytes) { + recorder.recordDataSent(remoteAddress, bytes); + } + + @Override + public void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, String status) { + recorder.recordTlsHandshakeTime(remoteAddress, time, status); + } + + @Override + public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time, String status) { + recorder.recordResolveAddressTime(remoteAddress, time, status); + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2WebsocketClientOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2WebsocketClientOperations.java index f76e5a2e3a..f4f70d8d21 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2WebsocketClientOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2WebsocketClientOperations.java @@ -120,11 +120,18 @@ else if (msg instanceof HttpResponse) { } handshakerHttp2.finishHandshake(channel(), response); + if (micrometerWsHandler != null) { + micrometerWsHandler.recordHandshakeComplete(channel(), + String.valueOf(response.status().code())); + } // This change is needed after the Netty change https://github.com/netty/netty/pull/11966 ctx.read(); listener().onStateChange(this, HttpClientState.RESPONSE_RECEIVED); } catch (Exception e) { + if (micrometerWsHandler != null) { + micrometerWsHandler.recordHandshakeFailure(channel()); + } onInboundError(e); //"FutureReturnValueIgnored" this is deliberate ctx.close(); @@ -156,7 +163,7 @@ void initHandshaker(URI currentURI, WebsocketClientSpec websocketClientSpec) { "Websocket version [" + websocketClientSpec.version().toHttpHeaderValue() + "] is not supported."); } - removeHandler(NettyPipeline.HttpMetricsHandler); + swapMetricsHandler(); if (websocketClientSpec.compress()) { requestHeaders().remove(HttpHeaderNames.ACCEPT_ENCODING); @@ -189,6 +196,11 @@ void initHandshaker(URI currentURI, WebsocketClientSpec websocketClientSpec) { }); } + @Override + String wsHttpMethod() { + return "CONNECT"; + } + @Override boolean isHandshakeComplete() { return handshakerHttp2.handshakeComplete; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerWebSocketClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerWebSocketClientMetricsHandler.java new file mode 100644 index 0000000000..968a9d09c5 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerWebSocketClientMetricsHandler.java @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import io.micrometer.common.KeyValues; +import io.micrometer.core.instrument.Timer; +import io.micrometer.observation.Observation; +import io.micrometer.observation.transport.RequestReplySenderContext; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import org.jspecify.annotations.Nullable; +import reactor.netty.observability.ReactorNettyHandlerContext; +import reactor.util.context.ContextView; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.function.Supplier; + +import static reactor.netty.Metrics.HANDSHAKE_TIME; +import static reactor.netty.Metrics.NA; +import static reactor.netty.Metrics.OBSERVATION_REGISTRY; +import static reactor.netty.Metrics.UNKNOWN; +import static reactor.netty.Metrics.formatSocketAddress; +import static reactor.netty.Metrics.updateChannelContext; +import static reactor.netty.ReactorNetty.setChannelContext; +import static reactor.netty.http.client.WebSocketClientObservations.HandshakeTimeHighCardinalityTags.HTTP_STATUS_CODE; +import static reactor.netty.http.client.WebSocketClientObservations.HandshakeTimeHighCardinalityTags.HTTP_URL; +import static reactor.netty.http.client.WebSocketClientObservations.HandshakeTimeHighCardinalityTags.NET_PEER_NAME; +import static reactor.netty.http.client.WebSocketClientObservations.HandshakeTimeHighCardinalityTags.NET_PEER_PORT; +import static reactor.netty.http.client.WebSocketClientObservations.HandshakeTimeHighCardinalityTags.REACTOR_NETTY_TYPE; +import static reactor.netty.http.client.WebSocketClientObservations.HandshakeTimeLowCardinalityTags.PROXY_ADDRESS; +import static reactor.netty.http.client.WebSocketClientObservations.HandshakeTimeLowCardinalityTags.REMOTE_ADDRESS; +import static reactor.netty.http.client.WebSocketClientObservations.HandshakeTimeLowCardinalityTags.STATUS; +import static reactor.netty.http.client.WebSocketClientObservations.HandshakeTimeLowCardinalityTags.URI; + +/** + * {@link AbstractWebSocketClientMetricsHandler} for Reactor Netty built-in integration with Micrometer. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +final class MicrometerWebSocketClientMetricsHandler extends AbstractWebSocketClientMetricsHandler { + final MicrometerWebSocketClientMetricsRecorder recorder; + + @SuppressWarnings("NullAway") + // Deliberately suppress "NullAway" + // This is a lazy initialization + HandshakeTimeHandlerContext handshakeTimeHandlerContext; + @SuppressWarnings("NullAway") + // Deliberately suppress "NullAway" + // This is a lazy initialization + Observation handshakeTimeObservation; + @Nullable ContextView parentContextView; + + MicrometerWebSocketClientMetricsHandler(MicrometerWebSocketClientMetricsRecorder recorder, + SocketAddress remoteAddress, + @Nullable SocketAddress proxyAddress, + String path, + ContextView contextView, + String method) { + super(remoteAddress, proxyAddress, path, contextView, method); + this.recorder = recorder; + } + + @Override + protected WebSocketClientMetricsRecorder recorder() { + return recorder; + } + + @Override + void startHandshake(Channel channel) { + super.startHandshake(channel); + handshakeTimeHandlerContext = new HandshakeTimeHandlerContext(recorder, path, remoteAddress, proxyAddress); + handshakeTimeObservation = Observation.createNotStarted( + recorder.name() + HANDSHAKE_TIME, handshakeTimeHandlerContext, OBSERVATION_REGISTRY); + parentContextView = updateChannelContext(channel, handshakeTimeObservation); + handshakeTimeObservation.start(); + } + + @Override + void recordHandshakeComplete(Channel channel, String status) { + if (handshakeTimeHandlerContext != null) { + handshakeTimeHandlerContext.status = status; + } + // Cannot invoke the recorder anymore: + // 1. The recorder is one instance only, it is invoked for all requests that can happen + // 2. The recorder does not have knowledge about request lifecycle + // + // Move the implementation from the recorder here + if (handshakeTimeObservation != null) { + handshakeTimeObservation.stop(); + } + setChannelContext(channel, parentContextView); + } + + @Override + void recordHandshakeFailure(Channel channel) { + if (handshakeTimeHandlerContext != null) { + handshakeTimeHandlerContext.status = "ERROR"; + } + if (handshakeTimeObservation != null) { + handshakeTimeObservation.stop(); + } + setChannelContext(channel, parentContextView); + } + + /* + * Requirements for WebSocket clients + * Following OpenTelemetry semantic conventions for HTTP clients, adapted for WebSocket. + */ + static final class HandshakeTimeHandlerContext extends RequestReplySenderContext + implements ReactorNettyHandlerContext, Supplier { + static final String TYPE = "client"; + + final String netPeerName; + final String netPeerPort; + final String path; + final @Nullable String proxyAddress; + final MicrometerWebSocketClientMetricsRecorder recorder; + + String status = UNKNOWN; + + HandshakeTimeHandlerContext(MicrometerWebSocketClientMetricsRecorder recorder, String path, + SocketAddress remoteAddress, @Nullable SocketAddress proxyAddress) { + super((carrier, key, value) -> {}); + this.recorder = recorder; + if (remoteAddress instanceof InetSocketAddress) { + InetSocketAddress address = (InetSocketAddress) remoteAddress; + this.netPeerName = address.getHostString(); + this.netPeerPort = address.getPort() + ""; + } + else { + this.netPeerName = remoteAddress.toString(); + this.netPeerPort = ""; + } + this.path = path; + this.proxyAddress = proxyAddress != null ? formatSocketAddress(proxyAddress) : null; + setContextualName("websocket " + path); + } + + @Override + public Observation.Context get() { + return this; + } + + @Override + public @Nullable Timer getTimer() { + return recorder.getHandshakeTimeTimer(recorder.name() + HANDSHAKE_TIME, + netPeerName + ":" + netPeerPort, proxyAddress == null ? NA : proxyAddress, path, status); + } + + @Override + public KeyValues getHighCardinalityKeyValues() { + return KeyValues.of(REACTOR_NETTY_TYPE.asString(), TYPE, + HTTP_URL.asString(), path, HTTP_STATUS_CODE.asString(), status, + NET_PEER_NAME.asString(), netPeerName, NET_PEER_PORT.asString(), netPeerPort); + } + + @Override + public KeyValues getLowCardinalityKeyValues() { + return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort, + PROXY_ADDRESS.asString(), proxyAddress == null ? NA : proxyAddress, + STATUS.asString(), status, URI.asString(), path); + } + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerWebSocketClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerWebSocketClientMetricsRecorder.java new file mode 100644 index 0000000000..7d05a3937a --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerWebSocketClientMetricsRecorder.java @@ -0,0 +1,252 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import org.jspecify.annotations.Nullable; +import reactor.netty.channel.ChannelMeters; +import reactor.netty.channel.MeterKey; +import reactor.netty.http.MicrometerHttpMetricsRecorder; +import reactor.netty.internal.util.MapUtils; + +import java.net.SocketAddress; +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static reactor.netty.Metrics.CONNECTION_DURATION; +import static reactor.netty.Metrics.DATA_RECEIVED; +import static reactor.netty.Metrics.DATA_RECEIVED_TIME; +import static reactor.netty.Metrics.DATA_SENT; +import static reactor.netty.Metrics.DATA_SENT_TIME; +import static reactor.netty.Metrics.ERRORS; +import static reactor.netty.Metrics.HANDSHAKE_TIME; +import static reactor.netty.Metrics.NA; +import static reactor.netty.Metrics.REGISTRY; +import static reactor.netty.Metrics.WEBSOCKET_CLIENT_PREFIX; +import static reactor.netty.Metrics.formatSocketAddress; + +/** + * {@link WebSocketClientMetricsRecorder} for Reactor Netty built-in integration with Micrometer. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +final class MicrometerWebSocketClientMetricsRecorder extends MicrometerHttpMetricsRecorder implements WebSocketClientMetricsRecorder { + + static final MicrometerWebSocketClientMetricsRecorder INSTANCE = new MicrometerWebSocketClientMetricsRecorder(); + + private final ConcurrentMap handshakeTimeCache = new ConcurrentHashMap<>(); + + private final ConcurrentMap dataReceivedCache = new ConcurrentHashMap<>(); + + private final ConcurrentMap dataSentCache = new ConcurrentHashMap<>(); + + private final ConcurrentMap connectionDurationCache = new ConcurrentHashMap<>(); + + private final ConcurrentMap errorsCache = new ConcurrentHashMap<>(); + + private MicrometerWebSocketClientMetricsRecorder() { + super(WEBSOCKET_CLIENT_PREFIX, "websocket", false); + } + + @Override + public void recordWebSocketHandshakeTime(SocketAddress remoteAddress, String uri, String status, Duration time) { + recordWebSocketHandshakeTime(remoteAddress, NA, uri, status, time); + } + + @Override + public void recordWebSocketHandshakeTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, + String status, Duration time) { + recordWebSocketHandshakeTime(remoteAddress, formatSocketAddress(proxyAddress), uri, status, time); + } + + void recordWebSocketHandshakeTime(SocketAddress remoteAddress, String proxyAddress, String uri, String status, Duration time) { + String address = formatSocketAddress(remoteAddress); + Timer handshakeTime = getHandshakeTimeTimer(name() + HANDSHAKE_TIME, address, proxyAddress, uri, status); + if (handshakeTime != null) { + handshakeTime.record(time); + } + } + + @Nullable Timer getHandshakeTimeTimer(String name, String remoteAddress, String proxyAddress, String uri, String status) { + MeterKey meterKey = new MeterKey(uri, remoteAddress, proxyAddress, null, status); + return MapUtils.computeIfAbsent(handshakeTimeCache, meterKey, + key -> filter(Timer.builder(name) + .tags(WebSocketClientMeters.HandshakeTimeTags.REMOTE_ADDRESS.asString(), remoteAddress, + WebSocketClientMeters.HandshakeTimeTags.PROXY_ADDRESS.asString(), proxyAddress, + WebSocketClientMeters.HandshakeTimeTags.URI.asString(), uri, + WebSocketClientMeters.HandshakeTimeTags.STATUS.asString(), status) + .register(REGISTRY))); + } + + @Override + public void recordWebSocketConnectionDuration(SocketAddress remoteAddress, String uri, Duration time) { + recordWebSocketConnectionDuration(remoteAddress, NA, uri, time); + } + + @Override + public void recordWebSocketConnectionDuration(SocketAddress remoteAddress, SocketAddress proxyAddress, + String uri, Duration time) { + recordWebSocketConnectionDuration(remoteAddress, formatSocketAddress(proxyAddress), uri, time); + } + + void recordWebSocketConnectionDuration(SocketAddress remoteAddress, String proxyAddress, String uri, Duration time) { + String address = formatSocketAddress(remoteAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddress, null, null); + Timer connectionDuration = MapUtils.computeIfAbsent(connectionDurationCache, meterKey, + key -> filter(Timer.builder(name() + CONNECTION_DURATION) + .tags(WebSocketClientMeters.ConnectionDurationTags.REMOTE_ADDRESS.asString(), address, + WebSocketClientMeters.ConnectionDurationTags.PROXY_ADDRESS.asString(), proxyAddress, + WebSocketClientMeters.ConnectionDurationTags.URI.asString(), uri) + .register(REGISTRY))); + if (connectionDuration != null) { + connectionDuration.record(time); + } + } + + @Override + public void recordDataReceivedTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) { + recordDataReceivedTime(remoteAddress, NA, uri, time); + } + + @Override + public void recordDataReceivedTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + recordDataReceivedTime(remoteAddress, formatSocketAddress(proxyAddress), uri, time); + } + + void recordDataReceivedTime(SocketAddress remoteAddress, String proxyAddress, String uri, Duration time) { + String address = formatSocketAddress(remoteAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddress, null, null); + Timer dataReceivedTime = MapUtils.computeIfAbsent(dataReceivedTimeCache, meterKey, + key -> filter(Timer.builder(name() + DATA_RECEIVED_TIME) + .tags(WebSocketClientMeters.DataReceivedTimeTags.REMOTE_ADDRESS.asString(), address, + WebSocketClientMeters.DataReceivedTimeTags.PROXY_ADDRESS.asString(), proxyAddress, + WebSocketClientMeters.DataReceivedTimeTags.URI.asString(), uri) + .register(REGISTRY))); + if (dataReceivedTime != null) { + dataReceivedTime.record(time); + } + } + + @Override + public void recordDataSentTime(SocketAddress remoteAddress, String uri, String method, Duration time) { + doRecordDataSentTime(remoteAddress, NA, uri, time); + } + + @Override + public void recordDataSentTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, Duration time) { + doRecordDataSentTime(remoteAddress, formatSocketAddress(proxyAddress), uri, time); + } + + void doRecordDataSentTime(SocketAddress remoteAddress, String proxyAddress, String uri, Duration time) { + String address = formatSocketAddress(remoteAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddress, null, null); + Timer dataSentTime = MapUtils.computeIfAbsent(dataSentTimeCache, meterKey, + key -> filter(Timer.builder(name() + DATA_SENT_TIME) + .tags(WebSocketClientMeters.DataSentTimeTags.REMOTE_ADDRESS.asString(), address, + WebSocketClientMeters.DataSentTimeTags.PROXY_ADDRESS.asString(), proxyAddress, + WebSocketClientMeters.DataSentTimeTags.URI.asString(), uri) + .register(REGISTRY))); + if (dataSentTime != null) { + dataSentTime.record(time); + } + } + + @Override + public void recordResponseTime(SocketAddress remoteAddress, String uri, String method, String status, Duration time) { + } + + @Override + public void recordResponseTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, String method, String status, Duration time) { + } + + @Override + public void recordDataReceived(SocketAddress remoteAddress, String uri, long bytes) { + recordDataReceived(remoteAddress, NA, uri, bytes); + } + + @Override + public void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recordDataReceived(remoteAddress, formatSocketAddress(proxyAddress), uri, bytes); + } + + void recordDataReceived(SocketAddress remoteAddress, String proxyAddress, String uri, long bytes) { + String address = formatSocketAddress(remoteAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddress, null, null); + DistributionSummary dataReceived = MapUtils.computeIfAbsent(dataReceivedCache, meterKey, + key -> filter(DistributionSummary.builder(name() + DATA_RECEIVED) + .baseUnit(ChannelMeters.DATA_RECEIVED.getBaseUnit()) + .tags(ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address, + ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddress, + ChannelMeters.ChannelMetersTags.URI.asString(), uri) + .register(REGISTRY))); + if (dataReceived != null) { + dataReceived.record(bytes); + } + } + + @Override + public void recordDataSent(SocketAddress remoteAddress, String uri, long bytes) { + recordDataSent(remoteAddress, NA, uri, bytes); + } + + @Override + public void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, long bytes) { + recordDataSent(remoteAddress, formatSocketAddress(proxyAddress), uri, bytes); + } + + void recordDataSent(SocketAddress remoteAddress, String proxyAddress, String uri, long bytes) { + String address = formatSocketAddress(remoteAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddress, null, null); + DistributionSummary dataSent = MapUtils.computeIfAbsent(dataSentCache, meterKey, + key -> filter(DistributionSummary.builder(name() + DATA_SENT) + .baseUnit(ChannelMeters.DATA_SENT.getBaseUnit()) + .tags(ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address, + ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddress, + ChannelMeters.ChannelMetersTags.URI.asString(), uri) + .register(REGISTRY))); + if (dataSent != null) { + dataSent.record(bytes); + } + } + + @Override + public void incrementErrorsCount(SocketAddress remoteAddress, String uri) { + incrementErrorsCount(remoteAddress, NA, uri); + } + + @Override + public void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri) { + incrementErrorsCount(remoteAddress, formatSocketAddress(proxyAddress), uri); + } + + void incrementErrorsCount(SocketAddress remoteAddress, String proxyAddress, String uri) { + String address = formatSocketAddress(remoteAddress); + MeterKey meterKey = new MeterKey(uri, address, proxyAddress, null, null); + Counter errors = MapUtils.computeIfAbsent(errorsCache, meterKey, + key -> filter(Counter.builder(name() + ERRORS) + .tags(ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address, + ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddress, + ChannelMeters.ChannelMetersTags.URI.asString(), uri) + .register(REGISTRY))); + if (errors != null) { + errors.increment(); + } + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientMeters.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientMeters.java new file mode 100644 index 0000000000..3c6085a7d2 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientMeters.java @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.docs.MeterDocumentation; + +/** + * WebSocket {@link HttpClient} meters. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +enum WebSocketClientMeters implements MeterDocumentation { + + /** + * Time spent for the WebSocket handshake on the client. + */ + WEBSOCKET_CLIENT_HANDSHAKE_TIME { + @Override + public String getName() { + return "reactor.netty.websocket.client.handshake.time"; + } + + @Override + public KeyName[] getKeyNames() { + return HandshakeTimeTags.values(); + } + + @Override + public Meter.Type getType() { + return Meter.Type.TIMER; + } + }, + + /** + * Time spent in consuming incoming data on the WebSocket client. + */ + WEBSOCKET_CLIENT_DATA_RECEIVED_TIME { + @Override + public String getName() { + return "reactor.netty.websocket.client.data.received.time"; + } + + @Override + public KeyName[] getKeyNames() { + return DataReceivedTimeTags.values(); + } + + @Override + public Meter.Type getType() { + return Meter.Type.TIMER; + } + }, + + /** + * Time spent in sending outgoing data from the WebSocket client. + */ + WEBSOCKET_CLIENT_DATA_SENT_TIME { + @Override + public String getName() { + return "reactor.netty.websocket.client.data.sent.time"; + } + + @Override + public KeyName[] getKeyNames() { + return DataSentTimeTags.values(); + } + + @Override + public Meter.Type getType() { + return Meter.Type.TIMER; + } + }, + + /** + * Duration of the WebSocket connection on the client. + */ + WEBSOCKET_CLIENT_CONNECTION_DURATION { + @Override + public String getName() { + return "reactor.netty.websocket.client.connection.duration"; + } + + @Override + public KeyName[] getKeyNames() { + return ConnectionDurationTags.values(); + } + + @Override + public Meter.Type getType() { + return Meter.Type.TIMER; + } + }; + + enum HandshakeTimeTags implements KeyName { + + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + + /** + * Remote address. + */ + REMOTE_ADDRESS { + @Override + public String asString() { + return "remote.address"; + } + }, + + /** + * STATUS. + */ + STATUS { + @Override + public String asString() { + return "status"; + } + }, + + /** + * URI. + */ + URI { + @Override + public String asString() { + return "uri"; + } + } + } + + enum DataReceivedTimeTags implements KeyName { + + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + + /** + * Remote address. + */ + REMOTE_ADDRESS { + @Override + public String asString() { + return "remote.address"; + } + }, + + /** + * URI. + */ + URI { + @Override + public String asString() { + return "uri"; + } + } + } + + enum DataSentTimeTags implements KeyName { + + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + + /** + * Remote address. + */ + REMOTE_ADDRESS { + @Override + public String asString() { + return "remote.address"; + } + }, + + /** + * URI. + */ + URI { + @Override + public String asString() { + return "uri"; + } + } + } + + enum ConnectionDurationTags implements KeyName { + + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + + /** + * Remote address. + */ + REMOTE_ADDRESS { + @Override + public String asString() { + return "remote.address"; + } + }, + + /** + * URI. + */ + URI { + @Override + public String asString() { + return "uri"; + } + } + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientMetricsHandler.java new file mode 100644 index 0000000000..3a88955a7f --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientMetricsHandler.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import org.jspecify.annotations.Nullable; +import reactor.util.context.ContextView; + +import java.net.SocketAddress; + +/** + * {@link AbstractWebSocketClientMetricsHandler} for collecting metrics on WebSocket {@link HttpClient} level. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +final class WebSocketClientMetricsHandler extends AbstractWebSocketClientMetricsHandler { + + final WebSocketClientMetricsRecorder recorder; + + WebSocketClientMetricsHandler(WebSocketClientMetricsRecorder recorder, + SocketAddress remoteAddress, + @Nullable SocketAddress proxyAddress, + String path, + ContextView contextView, + String method) { + super(remoteAddress, proxyAddress, path, contextView, method); + this.recorder = recorder; + } + + @Override + protected WebSocketClientMetricsRecorder recorder() { + return recorder; + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientMetricsRecorder.java new file mode 100644 index 0000000000..f9564b14b0 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientMetricsRecorder.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import java.net.SocketAddress; +import java.time.Duration; + +/** + * Interface for collecting metrics on WebSocket client level. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +public interface WebSocketClientMetricsRecorder extends HttpClientMetricsRecorder { + + /** + * Records the time that is spent for the WebSocket handshake. + * + * @param remoteAddress The remote peer + * @param uri the requested URI + * @param status the WebSocket handshake status + * @param time the time that is spent for the handshake + */ + void recordWebSocketHandshakeTime(SocketAddress remoteAddress, String uri, String status, Duration time); + + /** + * Records the time that is spent for the WebSocket handshake. + * + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @param status the WebSocket handshake status + * @param time the time that is spent for the handshake + */ + default void recordWebSocketHandshakeTime(SocketAddress remoteAddress, SocketAddress proxyAddress, String uri, + String status, Duration time) { + recordWebSocketHandshakeTime(remoteAddress, uri, status, time); + } + + /** + * Records the duration of the WebSocket connection. + * + * @param remoteAddress The remote peer + * @param uri the requested URI + * @param time the duration of the connection + */ + void recordWebSocketConnectionDuration(SocketAddress remoteAddress, String uri, Duration time); + + /** + * Records the duration of the WebSocket connection. + * + * @param remoteAddress The remote peer + * @param proxyAddress the proxy address + * @param uri the requested URI + * @param time the duration of the connection + */ + default void recordWebSocketConnectionDuration(SocketAddress remoteAddress, SocketAddress proxyAddress, + String uri, Duration time) { + recordWebSocketConnectionDuration(remoteAddress, uri, time); + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientObservations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientObservations.java new file mode 100644 index 0000000000..6904f2ebfd --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebSocketClientObservations.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.docs.ObservationDocumentation; + +/** + * WebSocket {@link HttpClient} observations. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +enum WebSocketClientObservations implements ObservationDocumentation { + + /** + * WebSocket handshake metric. + */ + WEBSOCKET_CLIENT_HANDSHAKE_TIME { + @Override + public KeyName[] getHighCardinalityKeyNames() { + return HandshakeTimeHighCardinalityTags.values(); + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return HandshakeTimeLowCardinalityTags.values(); + } + + @Override + public String getName() { + return "reactor.netty.websocket.client.handshake.time"; + } + }; + + /** + * Handshake Time High Cardinality Tags. + */ + enum HandshakeTimeHighCardinalityTags implements KeyName { + + /** + * Status code. + */ + HTTP_STATUS_CODE { + @Override + public String asString() { + return "http.status_code"; + } + }, + + /** + * URL. + */ + HTTP_URL { + @Override + public String asString() { + return "http.url"; + } + }, + + /** + * Net peer name. + */ + NET_PEER_NAME { + @Override + public String asString() { + return "net.peer.name"; + } + }, + + /** + * Net peer port. + */ + NET_PEER_PORT { + @Override + public String asString() { + return "net.peer.port"; + } + }, + + /** + * Reactor Netty type (always client). + */ + REACTOR_NETTY_TYPE { + @Override + public String asString() { + return "reactor.netty.type"; + } + } + } + + /** + * Handshake Time Low Cardinality Tags. + */ + enum HandshakeTimeLowCardinalityTags implements KeyName { + + /** + * Proxy address, when there is a proxy configured. + */ + PROXY_ADDRESS { + @Override + public String asString() { + return "proxy.address"; + } + }, + + /** + * Remote address. + */ + REMOTE_ADDRESS { + @Override + public String asString() { + return "remote.address"; + } + }, + + /** + * STATUS. + */ + STATUS { + @Override + public String asString() { + return "status"; + } + }, + + /** + * URI. + */ + URI { + @Override + public String asString() { + return "uri"; + } + } + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java index 756e079ee2..74a08b6591 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2026 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.compression.ZlibCodecFactory; import io.netty.handler.codec.http.FullHttpResponse; @@ -49,6 +50,7 @@ import reactor.netty.http.HttpOperations; import reactor.netty.http.websocket.WebsocketInbound; import reactor.netty.http.websocket.WebsocketOutbound; +import reactor.util.context.ContextView; import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT; import static io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker.MAX_WINDOW_SIZE; @@ -60,6 +62,7 @@ * @author Stephane Maldini * @author Simon Baslé * @author raccoonback + * @author LivingLikeKrillin */ class WebsocketClientOperations extends HttpClientOperations implements WebsocketInbound, WebsocketOutbound { @@ -68,6 +71,8 @@ class WebsocketClientOperations extends HttpClientOperations final Sinks.One onCloseState; final boolean proxyPing; + @Nullable AbstractWebSocketClientMetricsHandler micrometerWsHandler; + volatile int closeSent; static final String INBOUND_CANCEL_LOG = "WebSocket client inbound receiver cancelled, closing Websocket."; @@ -85,7 +90,7 @@ void initHandshaker(URI currentURI, WebsocketClientSpec websocketClientSpec) { // Returned value is deliberately ignored addHandlerFirst(NettyPipeline.HttpAggregator, new HttpObjectAggregator(8192)); - removeHandler(NettyPipeline.HttpMetricsHandler); + swapMetricsHandler(); if (websocketClientSpec.compress()) { requestHeaders().remove(HttpHeaderNames.ACCEPT_ENCODING); @@ -148,11 +153,18 @@ public void onInboundNext(ChannelHandlerContext ctx, Object msg) { if (notRedirected(response) && authenticationNotRequired()) { try { handshakerHttp11.finishHandshake(channel(), response); + if (micrometerWsHandler != null) { + micrometerWsHandler.recordHandshakeComplete(channel(), + String.valueOf(response.status().code())); + } // This change is needed after the Netty change https://github.com/netty/netty/pull/11966 ctx.read(); listener().onStateChange(this, HttpClientState.RESPONSE_RECEIVED); } catch (Exception e) { + if (micrometerWsHandler != null) { + micrometerWsHandler.recordHandshakeFailure(channel()); + } onInboundError(e); //"FutureReturnValueIgnored" this is deliberate ctx.close(); @@ -224,6 +236,75 @@ boolean isHandshakeComplete() { return handshakerHttp11.isHandshakeComplete(); } + void swapMetricsHandler() { + Channel channel = channel(); + ChannelHandler existingHandler = channel.pipeline().get(NettyPipeline.HttpMetricsHandler); + if (existingHandler instanceof AbstractHttpClientMetricsHandler) { + AbstractHttpClientMetricsHandler httpHandler = (AbstractHttpClientMetricsHandler) existingHandler; + channel.pipeline().remove(NettyPipeline.HttpMetricsHandler); + + String rawPath = resolvePath(this); + String resolvedPath = httpHandler.uriTagValue != null ? httpHandler.uriTagValue.apply(rawPath) : rawPath; + ContextView ctxView = currentContext(); + String httpMethod = wsHttpMethod(); + + AbstractWebSocketClientMetricsHandler wsHandler; + if (httpHandler instanceof MicrometerHttpClientMetricsHandler) { + wsHandler = new MicrometerWebSocketClientMetricsHandler( + MicrometerWebSocketClientMetricsRecorder.INSTANCE, + httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); + } + else if (httpHandler instanceof ContextAwareHttpClientMetricsHandler) { + ContextAwareHttpClientMetricsHandler ctxHandler = (ContextAwareHttpClientMetricsHandler) httpHandler; + ContextAwareWebSocketClientMetricsRecorder wsRecorder; + if (ctxHandler.recorder instanceof ContextAwareWebSocketClientMetricsRecorder) { + wsRecorder = (ContextAwareWebSocketClientMetricsRecorder) ctxHandler.recorder; + } + else { + wsRecorder = new DefaultContextAwareWebSocketClientMetricsRecorder(ctxHandler.recorder); + } + wsHandler = new ContextAwareWebSocketClientMetricsHandler( + wsRecorder, + httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); + } + else if (httpHandler instanceof HttpClientMetricsHandler) { + HttpClientMetricsHandler plainHandler = (HttpClientMetricsHandler) httpHandler; + WebSocketClientMetricsRecorder wsRecorder; + if (plainHandler.recorder instanceof WebSocketClientMetricsRecorder) { + wsRecorder = (WebSocketClientMetricsRecorder) plainHandler.recorder; + } + else { + wsRecorder = new DefaultWebSocketClientMetricsRecorder(plainHandler.recorder); + } + wsHandler = new WebSocketClientMetricsHandler( + wsRecorder, + httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); + } + else { + return; + } + wsHandler.startHandshake(channel); + this.micrometerWsHandler = wsHandler; + channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.WsMetricsHandler, wsHandler); + } + else { + removeHandler(NettyPipeline.HttpMetricsHandler); + } + } + + String wsHttpMethod() { + return "GET"; + } + + static String resolvePath(HttpClientOperations ops) { + try { + return ops.fullPath(); + } + catch (Exception e) { + return "/bad-request"; + } + } + @Override protected void onOutboundComplete() { } diff --git a/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json b/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json index adb31f04e1..6e0fd198d1 100644 --- a/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json +++ b/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json @@ -6,6 +6,13 @@ "name": "reactor.netty.http.client.AbstractHttpClientMetricsHandler", "queryAllPublicMethods": true }, + { + "condition": { + "typeReachable": "reactor.netty.http.client.AbstractWebSocketClientMetricsHandler" + }, + "name": "reactor.netty.http.client.AbstractWebSocketClientMetricsHandler", + "queryAllPublicMethods": true + }, { "condition": { "typeReachable": "reactor.netty.http.client.ContextAwareHttpClientMetricsHandler" @@ -13,6 +20,13 @@ "name": "reactor.netty.http.client.ContextAwareHttpClientMetricsHandler", "queryAllPublicMethods": true }, + { + "condition": { + "typeReachable": "reactor.netty.http.client.ContextAwareWebSocketClientMetricsHandler" + }, + "name": "reactor.netty.http.client.ContextAwareWebSocketClientMetricsHandler", + "queryAllPublicMethods": true + }, { "condition": { "typeReachable": "reactor.netty.http.client.Http2StreamBridgeClientHandler" @@ -132,6 +146,20 @@ "name": "reactor.netty.http.client.MicrometerHttpClientMetricsHandler", "queryAllPublicMethods": true }, + { + "condition": { + "typeReachable": "reactor.netty.http.client.MicrometerWebSocketClientMetricsHandler" + }, + "name": "reactor.netty.http.client.MicrometerWebSocketClientMetricsHandler", + "queryAllPublicMethods": true + }, + { + "condition": { + "typeReachable": "reactor.netty.http.client.WebSocketClientMetricsHandler" + }, + "name": "reactor.netty.http.client.WebSocketClientMetricsHandler", + "queryAllPublicMethods": true + }, { "condition": { "typeReachable": "reactor.netty.http.server.AbstractHttpServerMetricsHandler" diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/WebSocketClientMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/WebSocketClientMetricsHandlerTests.java new file mode 100644 index 0000000000..82f1f7136a --- /dev/null +++ b/reactor-netty-http/src/test/java/reactor/netty/http/WebSocketClientMetricsHandlerTests.java @@ -0,0 +1,495 @@ +/* + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.pkitesting.CertificateBuilder; +import io.netty.pkitesting.X509Bundle; +import io.netty.util.CharsetUtil; +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Named; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.BaseHttpTest; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.server.HttpServer; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.tcp.SslProvider.ProtocolSslContextSpec; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +import static reactor.netty.Metrics.CONNECTION_DURATION; +import static reactor.netty.Metrics.DATA_RECEIVED; +import static reactor.netty.Metrics.DATA_RECEIVED_TIME; +import static reactor.netty.Metrics.DATA_SENT; +import static reactor.netty.Metrics.DATA_SENT_TIME; +import static reactor.netty.Metrics.HANDSHAKE_TIME; +import static reactor.netty.Metrics.NA; +import static reactor.netty.Metrics.PROXY_ADDRESS; +import static reactor.netty.Metrics.REMOTE_ADDRESS; +import static reactor.netty.Metrics.STATUS; +import static reactor.netty.Metrics.URI; +import static reactor.netty.Metrics.WEBSOCKET_CLIENT_PREFIX; +import static reactor.netty.Metrics.formatSocketAddress; +import static reactor.netty.micrometer.DistributionSummaryAssert.assertDistributionSummary; +import static reactor.netty.micrometer.TimerAssert.assertTimer; + +/** + * WebSocket client metrics tests. + * + * @author LivingLikeKrillin + * @since 1.3.5 + */ +class WebSocketClientMetricsHandlerTests extends BaseHttpTest { + + static final String WS_HANDSHAKE_TIME = WEBSOCKET_CLIENT_PREFIX + HANDSHAKE_TIME; + static final String WS_DATA_RECEIVED_TIME = WEBSOCKET_CLIENT_PREFIX + DATA_RECEIVED_TIME; + static final String WS_DATA_SENT_TIME = WEBSOCKET_CLIENT_PREFIX + DATA_SENT_TIME; + static final String WS_DATA_RECEIVED = WEBSOCKET_CLIENT_PREFIX + DATA_RECEIVED; + static final String WS_DATA_SENT = WEBSOCKET_CLIENT_PREFIX + DATA_SENT; + static final String WS_CONNECTION_DURATION = WEBSOCKET_CLIENT_PREFIX + CONNECTION_DURATION; + + HttpServer httpServer; + HttpClient httpClient; + private ConnectionProvider provider; + private MeterRegistry registry; + + static X509Bundle ssc; + static Http2SslContextSpec serverCtx2; + static Http2SslContextSpec clientCtx2; + + @BeforeAll + static void createSelfSignedCertificate() throws Exception { + ssc = new CertificateBuilder().subject("CN=localhost").setIsCertificateAuthority(true).buildSelfSigned(); + serverCtx2 = Http2SslContextSpec.forServer(ssc.toTempCertChainPem(), ssc.toTempPrivateKeyPem()) + .configure(builder -> builder.sslProvider(SslProvider.JDK)); + clientCtx2 = Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE) + .sslProvider(SslProvider.JDK)); + } + + @BeforeEach + void setUp() { + httpServer = createServer() + .host("127.0.0.1") + .route(r -> r + .get("/ws", (req, res) -> res.sendWebsocket((in, out) -> + out.sendString(Mono.just("Hello World!")))) + .get("/ws-echo", (req, res) -> res.sendWebsocket((in, out) -> + out.send(in.receive().retain()))) + .get("/ws-fragment", (req, res) -> res.sendWebsocket((in, out) -> + out.sendObject(Flux.just( + new TextWebSocketFrame(false, 0, + Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8)), + new ContinuationWebSocketFrame(true, 0, + Unpooled.copiedBuffer(" world", CharsetUtil.UTF_8)))))) + .get("/ws-control-frames", (req, res) -> res.sendWebsocket((in, out) -> + out.sendObject(Flux.just( + new PingWebSocketFrame(Unpooled.copiedBuffer("ping", CharsetUtil.UTF_8)), + new TextWebSocketFrame("data"))))) + ); + + provider = ConnectionProvider.create("WebSocketClientMetricsHandlerTests", 1); + httpClient = createClient(provider, () -> disposableServer.address()) + .metrics(true, Function.identity()); + + registry = new SimpleMeterRegistry(); + Metrics.addRegistry(registry); + } + + @AfterEach + void tearDown() throws InterruptedException, ExecutionException, TimeoutException { + if (!provider.isDisposed()) { + provider.disposeLater() + .block(Duration.ofSeconds(30)); + } + + Metrics.removeRegistry(registry); + registry.clear(); + registry.close(); + + if (disposableServer != null) { + disposableServer.disposeNow(); + disposableServer = null; + } + } + + @Test + void testWebSocketHandshakeMetrics() { + disposableServer = httpServer.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + httpClient.websocket() + .uri("/ws") + .handle((in, out) -> in.receive().aggregate().asString()) + .as(StepVerifier::create) + .expectNext("Hello World!") + .expectComplete() + .verify(Duration.ofSeconds(30)); + + // Verify handshake time timer + assertTimer(registry, WS_HANDSHAKE_TIME, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws", + STATUS, "101") + .hasCountEqualTo(1) + .hasTotalTimeGreaterThan(0); + } + + @Test + void testWebSocketDataReceivedMetrics() { + disposableServer = httpServer.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + httpClient.websocket() + .uri("/ws") + .handle((in, out) -> in.receive().aggregate().asString()) + .as(StepVerifier::create) + .expectNext("Hello World!") + .expectComplete() + .verify(Duration.ofSeconds(30)); + + // Verify data received time timer + assertTimer(registry, WS_DATA_RECEIVED_TIME, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws") + .hasCountEqualTo(1) + .hasTotalTimeGreaterThan(0); + + // Verify data received distribution summary + assertDistributionSummary(registry, WS_DATA_RECEIVED, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws") + .hasCountGreaterThanOrEqualTo(1) + .hasTotalAmountGreaterThanOrEqualTo(1); + } + + @Test + void testWebSocketDataSentMetrics() { + disposableServer = httpServer.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + httpClient.websocket() + .uri("/ws-echo") + .handle((in, out) -> + out.sendString(Mono.just("test message")) + .then() + .thenMany(in.receive().asString().take(1))) + .as(StepVerifier::create) + .expectNext("test message") + .expectComplete() + .verify(Duration.ofSeconds(30)); + + // Verify data sent time timer + assertTimer(registry, WS_DATA_SENT_TIME, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws-echo") + .hasCountEqualTo(1) + .hasTotalTimeGreaterThan(0); + + // Verify data sent distribution summary + assertDistributionSummary(registry, WS_DATA_SENT, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws-echo") + .hasCountGreaterThanOrEqualTo(1) + .hasTotalAmountGreaterThanOrEqualTo(1); + } + + @Test + void testWebSocketMultipleConnections() { + disposableServer = httpServer.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + // Connection pool size 1 means sequential connections + for (int i = 0; i < 3; i++) { + httpClient.websocket() + .uri("/ws") + .handle((in, out) -> in.receive().aggregate().asString()) + .as(StepVerifier::create) + .expectNext("Hello World!") + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + + // Verify handshake time timer count + assertTimer(registry, WS_HANDSHAKE_TIME, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws", + STATUS, "101") + .hasCountEqualTo(3) + .hasTotalTimeGreaterThan(0); + } + + @Test + void testWebSocketUriTagValueFunction() { + disposableServer = httpServer.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + HttpClient client = createClient(provider, () -> disposableServer.address()) + .metrics(true, s -> "/normalized"); + + client.websocket() + .uri("/ws") + .handle((in, out) -> in.receive().aggregate().asString()) + .as(StepVerifier::create) + .expectNext("Hello World!") + .expectComplete() + .verify(Duration.ofSeconds(30)); + + // Verify URI tag uses the normalized value + assertTimer(registry, WS_HANDSHAKE_TIME, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/normalized", + STATUS, "101") + .hasCountEqualTo(1) + .hasTotalTimeGreaterThan(0); + } + + @Test + void testWebSocketConnectionDuration() { + disposableServer = httpServer.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + httpClient.websocket() + .uri("/ws") + .handle((in, out) -> in.receive().aggregate().asString()) + .as(StepVerifier::create) + .expectNext("Hello World!") + .expectComplete() + .verify(Duration.ofSeconds(30)); + + provider.disposeLater() + .block(Duration.ofSeconds(30)); + + await().atMost(5, TimeUnit.SECONDS) + .pollInterval(50, TimeUnit.MILLISECONDS) + .untilAsserted(() -> + assertTimer(registry, WS_CONNECTION_DURATION, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws") + .hasCountEqualTo(1) + .hasTotalTimeGreaterThan(0)); + } + + @Test + void testWebSocketHandshakeFailure() { + disposableServer = createServer() + .host("127.0.0.1") + .route(r -> r.get("/not-ws", (req, res) -> res.sendString(Mono.just("not a websocket")))) + .bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + httpClient.websocket() + .uri("/not-ws") + .handle((in, out) -> in.receive().aggregate().asString()) + .as(StepVerifier::create) + .expectError() + .verify(Duration.ofSeconds(30)); + + assertTimer(registry, WS_HANDSHAKE_TIME, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/not-ws") + .hasCountEqualTo(1) + .hasTotalTimeGreaterThan(0); + } + + @Test + void testWebSocketFragmentedDataSentMetrics() { + disposableServer = httpServer.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + byte[] part1 = "Hello".getBytes(CharsetUtil.UTF_8); + byte[] part2 = " ".getBytes(CharsetUtil.UTF_8); + byte[] part3 = "World!".getBytes(CharsetUtil.UTF_8); + long expectedTotal = part1.length + part2.length + part3.length; + + httpClient.websocket() + .uri("/ws-echo") + .handle((in, out) -> + out.sendObject(Flux.just( + new TextWebSocketFrame(false, 0, Unpooled.wrappedBuffer(part1)), + new ContinuationWebSocketFrame(false, 0, Unpooled.wrappedBuffer(part2)), + new ContinuationWebSocketFrame(true, 0, Unpooled.wrappedBuffer(part3))))) + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + // Fragmented frames should be recorded as a single message + await().atMost(5, TimeUnit.SECONDS) + .pollInterval(50, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertDistributionSummary(registry, WS_DATA_SENT, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws-echo") + .hasCountEqualTo(1) + .hasTotalAmountGreaterThanOrEqualTo(expectedTotal); + + assertTimer(registry, WS_DATA_SENT_TIME, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws-echo") + .hasCountEqualTo(1) + .hasTotalTimeGreaterThan(0); + }); + } + + @Test + void testWebSocketFragmentedDataReceivedMetrics() { + disposableServer = httpServer.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + // Server sends "hello" + " world" as two fragments, then closes + long expectedTotal = "hello".getBytes(CharsetUtil.UTF_8).length + + " world".getBytes(CharsetUtil.UTF_8).length; + + httpClient.websocket() + .uri("/ws-fragment") + .handle((in, out) -> in.aggregateFrames().receiveFrames().take(1).then()) + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + // Fragmented frames should be recorded as a single message + assertDistributionSummary(registry, WS_DATA_RECEIVED, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws-fragment") + .hasCountEqualTo(1) + .hasTotalAmountGreaterThanOrEqualTo(expectedTotal); + + assertTimer(registry, WS_DATA_RECEIVED_TIME, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws-fragment") + .hasCountEqualTo(1) + .hasTotalTimeGreaterThan(0); + } + + @Test + void testWebSocketControlFramesExcludedFromDataMetrics() { + disposableServer = httpServer.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + httpClient.websocket() + .uri("/ws-control-frames") + .handle((in, out) -> in.receive().aggregate().asString()) + .as(StepVerifier::create) + .expectNext("data") + .expectComplete() + .verify(Duration.ofSeconds(30)); + + // Only the TextWebSocketFrame ("data") should be recorded, not the PingWebSocketFrame + assertDistributionSummary(registry, WS_DATA_RECEIVED, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/ws-control-frames") + .hasCountEqualTo(1) + .hasTotalAmountGreaterThanOrEqualTo("data".getBytes(CharsetUtil.UTF_8).length); + } + + @ParameterizedTest + @MethodSource("websocketProtocols") + @SuppressWarnings("deprecation") + void testWebSocketMetricsAcrossProtocols(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) { + HttpServer server = createServer() + .host("127.0.0.1") + .handle((req, res) -> res.sendWebsocket((in, out) -> + out.sendString(Mono.just("Hello World!")))); + if (serverCtx != null) { + server = server.secure(spec -> spec.sslContext(serverCtx)); + } + server = server.protocol(serverProtocols) + .http2Settings(spec -> spec.connectProtocolEnabled(true)); + disposableServer = server.bindNow(); + String serverAddress = formatSocketAddress(disposableServer.address()); + + HttpClient client = httpClient; + if (clientCtx != null) { + client = client.secure(spec -> spec.sslContext(clientCtx)); + } + client = client.protocol(clientProtocols); + + client.websocket() + .uri("/") + .handle((in, out) -> in.receive().aggregate().asString()) + .as(StepVerifier::create) + .expectNext("Hello World!") + .expectComplete() + .verify(Duration.ofSeconds(30)); + + // Verify handshake time timer exists and is correct + assertTimer(registry, WS_HANDSHAKE_TIME, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/") + .hasCountEqualTo(1) + .hasTotalTimeGreaterThan(0); + + // Verify data received metrics exist + assertDistributionSummary(registry, WS_DATA_RECEIVED, + REMOTE_ADDRESS, serverAddress, + PROXY_ADDRESS, NA, + URI, "/") + .hasCountGreaterThanOrEqualTo(1); + } + + static Stream websocketProtocols() { + return Stream.of( + Arguments.of( + Named.of("HTTP/1.1", new HttpProtocol[]{HttpProtocol.HTTP11}), + Named.of("HTTP/1.1", new HttpProtocol[]{HttpProtocol.HTTP11}), + Named.of("null", null), Named.of("null", null)), + Arguments.of( + Named.of("H2", new HttpProtocol[]{HttpProtocol.H2}), + Named.of("H2", new HttpProtocol[]{HttpProtocol.H2}), + Named.of("serverCtx2", serverCtx2), Named.of("clientCtx2", clientCtx2)) + ); + } +}