Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion reactor-netty-core/src/main/java/reactor/netty/Metrics.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2025 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2026 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,14 @@ public class Metrics {
*/
public static final String HTTP_CLIENT_PREFIX = "reactor.netty.http.client";

/**
* Name prefix that will be used for the WebSocket client's metrics
* registered in Micrometer's global registry.
*
* @since 1.3.5
*/
public static final String WEBSOCKET_CLIENT_PREFIX = "reactor.netty.websocket.client";

/**
* Name prefix that will be used for the TCP server's metrics
* registered in Micrometer's global registry.
Expand Down Expand Up @@ -145,6 +153,20 @@ public class Metrics {
*/
public static final String RESPONSE_TIME = ".response.time";

/**
* Time spent for WebSocket handshake.
*
* @since 1.3.5
*/
public static final String HANDSHAKE_TIME = ".handshake.time";

/**
* Duration of the WebSocket connection.
*
* @since 1.3.5
*/
public static final String CONNECTION_DURATION = ".connection.duration";

/**
* The number of all connections, whether they are active or idle.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public interface NettyPipeline {
String TlsMetricsHandler = LEFT + "tlsMetricsHandler";
String WsCompressionHandler = LEFT + "wsCompressionHandler";
String WsFrameAggregator = LEFT + "wsFrameAggregator";
String WsMetricsHandler = LEFT + "wsMetricsHandler";

String ReactiveBridge = RIGHT + "reactiveBridge";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.http.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.jspecify.annotations.Nullable;
import reactor.util.context.ContextView;
import reactor.util.Logger;
import reactor.util.Loggers;

import java.net.SocketAddress;
import java.time.Duration;

import static reactor.netty.ReactorNetty.format;

/**
* {@link ChannelDuplexHandler} for handling WebSocket {@link HttpClient} metrics.
*
* @author LivingLikeKrillin
* @since 1.3.5
*/
abstract class AbstractWebSocketClientMetricsHandler extends ChannelDuplexHandler {

private static final Logger log = Loggers.getLogger(AbstractWebSocketClientMetricsHandler.class);

final String method;
final @Nullable SocketAddress proxyAddress;
final SocketAddress remoteAddress;

final String path;

final ContextView contextView;

long dataReceived;

long dataSent;

long dataReceivedTime;

long dataSentTime;

long connectionStartTime;

Comment thread
violetagg marked this conversation as resolved.
long handshakeStartTime;

protected AbstractWebSocketClientMetricsHandler(SocketAddress remoteAddress, @Nullable SocketAddress proxyAddress,
String path, ContextView contextView, String method) {
this.method = method;
this.path = path;
this.contextView = contextView;
this.proxyAddress = proxyAddress;
this.remoteAddress = remoteAddress;
}

@Override
public boolean isSharable() {
return false;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
connectionStartTime = System.nanoTime();
}

Comment thread
violetagg marked this conversation as resolved.
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
try {
if (connectionStartTime > 0) {
recordConnectionClosed();
}
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
}
super.handlerRemoved(ctx);
}

void startHandshake(Channel channel) {
handshakeStartTime = System.nanoTime();
}

void recordHandshakeComplete(Channel channel, String status) {
Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime);
if (proxyAddress == null) {
recorder().recordWebSocketHandshakeTime(remoteAddress, path, status, time);
}
else {
recorder().recordWebSocketHandshakeTime(remoteAddress, proxyAddress, path, status, time);
}
}

void recordHandshakeFailure(Channel channel) {
Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime);
if (proxyAddress == null) {
recorder().recordWebSocketHandshakeTime(remoteAddress, path, "ERROR", time);
}
else {
recorder().recordWebSocketHandshakeTime(remoteAddress, proxyAddress, path, "ERROR", time);
}
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
try {
if (msg instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) msg;
if (isDataFrame(frame)) {
if (dataSentTime == 0) {
dataSentTime = System.nanoTime();
}
dataSent += extractProcessedDataFromBuffer(frame);

if (frame.isFinalFragment()) {
// VoidChannelPromise does not support addListener, unvoid to ensure the listener fires
promise = promise.unvoid();
promise.addListener(f -> {
try {
recordWrite(remoteAddress);
dataSentTime = 0;
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
}
});
}
}
}
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
}
Comment thread
violetagg marked this conversation as resolved.

//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (msg instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) msg;
if (isDataFrame(frame)) {
if (dataReceivedTime == 0) {
dataReceivedTime = System.nanoTime();
}
dataReceived += extractProcessedDataFromBuffer(frame);

if (frame.isFinalFragment()) {
recordRead(remoteAddress);
dataReceivedTime = 0;
}
}
}
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
}

ctx.fireChannelRead(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
recordException();
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
}

ctx.fireExceptionCaught(cause);
}

static boolean isDataFrame(WebSocketFrame msg) {
return !(msg instanceof CloseWebSocketFrame) &&
!(msg instanceof PingWebSocketFrame) &&
!(msg instanceof PongWebSocketFrame);
}

private static long extractProcessedDataFromBuffer(WebSocketFrame msg) {
return msg.content().readableBytes();
}

protected abstract WebSocketClientMetricsRecorder recorder();

protected void recordConnectionClosed() {
Duration duration = Duration.ofNanos(System.nanoTime() - connectionStartTime);
if (proxyAddress == null) {
recorder().recordWebSocketConnectionDuration(remoteAddress, path, duration);
}
else {
recorder().recordWebSocketConnectionDuration(remoteAddress, proxyAddress, path, duration);
}
}

protected void recordException() {
if (proxyAddress == null) {
recorder().incrementErrorsCount(remoteAddress, path);
}
else {
recorder().incrementErrorsCount(remoteAddress, proxyAddress, path);
}
}

protected void recordRead(SocketAddress address) {
if (proxyAddress == null) {
recorder().recordDataReceivedTime(address, path, method, "n/a",
Duration.ofNanos(System.nanoTime() - dataReceivedTime));

recorder().recordDataReceived(address, path, dataReceived);
}
else {
recorder().recordDataReceivedTime(address, proxyAddress, path, method, "n/a",
Duration.ofNanos(System.nanoTime() - dataReceivedTime));

recorder().recordDataReceived(address, proxyAddress, path, dataReceived);
}
dataReceived = 0;
}

protected void recordWrite(SocketAddress address) {
if (proxyAddress == null) {
recorder().recordDataSentTime(address, path, method,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder().recordDataSent(address, path, dataSent);
}
else {
recorder().recordDataSentTime(address, proxyAddress, path, method,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder().recordDataSent(address, proxyAddress, path, dataSent);
}
dataSent = 0;
}

}
Loading
Loading