diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 7b9291ce16527..f2120baa60898 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -143,6 +143,11 @@ class ControllerServer( if (linuxIoMetricsCollector.usable()) { metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes()) metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes()) + metricsGroup.newGauge("linux-disk-rchar", () => linuxIoMetricsCollector.rchar()) + metricsGroup.newGauge("linux-disk-wchar", () => linuxIoMetricsCollector.wchar()) + metricsGroup.newGauge("linux-disk-syscr", () => linuxIoMetricsCollector.syscr()) + metricsGroup.newGauge("linux-disk-syscw", () => linuxIoMetricsCollector.syscw()) + metricsGroup.newGauge("linux-disk-cancelled-write-bytes", () => linuxIoMetricsCollector.cancelledWriteBytes()) } authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.ControllerRole.toString) diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index 46576d97d338a..0b64e32e4cecf 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -119,5 +119,10 @@ trait KafkaBroker extends Logging { if (linuxIoMetricsCollector.usable()) { metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes()) metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes()) + metricsGroup.newGauge("linux-disk-rchar", () => linuxIoMetricsCollector.rchar()) + metricsGroup.newGauge("linux-disk-wchar", () => linuxIoMetricsCollector.wchar()) + metricsGroup.newGauge("linux-disk-syscr", () => linuxIoMetricsCollector.syscr()) + metricsGroup.newGauge("linux-disk-syscw", () => linuxIoMetricsCollector.syscw()) + metricsGroup.newGauge("linux-disk-cancelled-write-bytes", () => linuxIoMetricsCollector.cancelledWriteBytes()) } } diff --git a/server/src/main/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollector.java b/server/src/main/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollector.java index e34a4d5c4265a..bdee421e33eff 100644 --- a/server/src/main/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollector.java +++ b/server/src/main/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollector.java @@ -35,6 +35,11 @@ public class LinuxIoMetricsCollector { private static final Logger LOG = LoggerFactory.getLogger(LinuxIoMetricsCollector.class); private static final String READ_BYTES_PREFIX = "read_bytes: "; private static final String WRITE_BYTES_PREFIX = "write_bytes: "; + private static final String RCHAR_PREFIX = "rchar: "; + private static final String WCHAR_PREFIX = "wchar: "; + private static final String SYSCR_PREFIX = "syscr: "; + private static final String SYSCW_PREFIX = "syscw: "; + private static final String CANCELLED_WRITE_BYTES_PREFIX = "cancelled_write_bytes: "; private final Time time; private final Path path; @@ -42,6 +47,11 @@ public class LinuxIoMetricsCollector { private long lastUpdateMs = -1L; private long cachedReadBytes = 0L; private long cachedWriteBytes = 0L; + private long cachedRchar = 0L; + private long cachedWchar = 0L; + private long cachedSyscr = 0L; + private long cachedSyscw = 0L; + private long cachedCancelledWriteBytes = 0L; public LinuxIoMetricsCollector(String procRoot, Time time) { this.time = time; @@ -68,6 +78,76 @@ public long writeBytes() { } } + /** + * Returns the total number of characters read (includes cached reads). + * This value represents all read operations, including those satisfied by the page cache. + */ + public long rchar() { + synchronized (this) { + long curMs = time.milliseconds(); + if (curMs != lastUpdateMs) { + updateValues(curMs); + } + return cachedRchar; + } + } + + /** + * Returns the total number of characters written (includes cached writes). + * This value represents all write operations, including those that may not have reached disk. + */ + public long wchar() { + synchronized (this) { + long curMs = time.milliseconds(); + if (curMs != lastUpdateMs) { + updateValues(curMs); + } + return cachedWchar; + } + } + + /** + * Returns the number of read system calls. + * This metric helps identify I/O patterns and syscall overhead. + */ + public long syscr() { + synchronized (this) { + long curMs = time.milliseconds(); + if (curMs != lastUpdateMs) { + updateValues(curMs); + } + return cachedSyscr; + } + } + + /** + * Returns the number of write system calls. + * This metric helps identify I/O patterns and syscall overhead. + */ + public long syscw() { + synchronized (this) { + long curMs = time.milliseconds(); + if (curMs != lastUpdateMs) { + updateValues(curMs); + } + return cachedSyscw; + } + } + + /** + * Returns the number of bytes that were cancelled before being written. + * This can occur when a write is truncated or cancelled. + */ + public long cancelledWriteBytes() { + synchronized (this) { + long curMs = time.milliseconds(); + if (curMs != lastUpdateMs) { + updateValues(curMs); + } + return cachedCancelledWriteBytes; + } + } + /** * Read /proc/self/io. * Generally, each line in this file contains a prefix followed by a colon and a number. @@ -85,12 +165,27 @@ private boolean updateValues(long now) { try { cachedReadBytes = -1L; cachedWriteBytes = -1L; + cachedRchar = -1L; + cachedWchar = -1L; + cachedSyscr = -1L; + cachedSyscw = -1L; + cachedCancelledWriteBytes = -1L; List lines = Files.readAllLines(path, StandardCharsets.UTF_8); for (String line : lines) { if (line.startsWith(READ_BYTES_PREFIX)) { cachedReadBytes = Long.parseLong(line.substring(READ_BYTES_PREFIX.length())); } else if (line.startsWith(WRITE_BYTES_PREFIX)) { cachedWriteBytes = Long.parseLong(line.substring(WRITE_BYTES_PREFIX.length())); + } else if (line.startsWith(RCHAR_PREFIX)) { + cachedRchar = Long.parseLong(line.substring(RCHAR_PREFIX.length())); + } else if (line.startsWith(WCHAR_PREFIX)) { + cachedWchar = Long.parseLong(line.substring(WCHAR_PREFIX.length())); + } else if (line.startsWith(SYSCR_PREFIX)) { + cachedSyscr = Long.parseLong(line.substring(SYSCR_PREFIX.length())); + } else if (line.startsWith(SYSCW_PREFIX)) { + cachedSyscw = Long.parseLong(line.substring(SYSCW_PREFIX.length())); + } else if (line.startsWith(CANCELLED_WRITE_BYTES_PREFIX)) { + cachedCancelledWriteBytes = Long.parseLong(line.substring(CANCELLED_WRITE_BYTES_PREFIX.length())); } } lastUpdateMs = now; diff --git a/server/src/test/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollectorTest.java b/server/src/test/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollectorTest.java index 8c96c5c5c10ee..3a13a14327807 100644 --- a/server/src/test/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollectorTest.java +++ b/server/src/test/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollectorTest.java @@ -39,23 +39,65 @@ public class LinuxIoMetricsCollectorTest { public void testReadProcFile() throws IOException { TestDirectory testDirectory = new TestDirectory(); Time time = new MockTime(0L, 100L, 1000L); - testDirectory.writeProcFile(123L, 456L); + testDirectory.writeProcFile(123L, 456L, 1000L, 2000L, 10L, 20L, 5L); LinuxIoMetricsCollector collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath(), time); // Test that we can read the values we wrote. assertTrue(collector.usable()); assertEquals(123L, collector.readBytes()); assertEquals(456L, collector.writeBytes()); - testDirectory.writeProcFile(124L, 457L); + assertEquals(1000L, collector.rchar()); + assertEquals(2000L, collector.wchar()); + assertEquals(10L, collector.syscr()); + assertEquals(20L, collector.syscw()); + assertEquals(5L, collector.cancelledWriteBytes()); + testDirectory.writeProcFile(124L, 457L, 1001L, 2001L, 11L, 21L, 6L); // The previous values should still be cached. assertEquals(123L, collector.readBytes()); assertEquals(456L, collector.writeBytes()); + assertEquals(1000L, collector.rchar()); + assertEquals(2000L, collector.wchar()); + assertEquals(10L, collector.syscr()); + assertEquals(20L, collector.syscw()); + assertEquals(5L, collector.cancelledWriteBytes()); // Update the time, and the values should be re-read. time.sleep(1); assertEquals(124L, collector.readBytes()); assertEquals(457L, collector.writeBytes()); + assertEquals(1001L, collector.rchar()); + assertEquals(2001L, collector.wchar()); + assertEquals(11L, collector.syscr()); + assertEquals(21L, collector.syscw()); + assertEquals(6L, collector.cancelledWriteBytes()); + } + + @Test + public void testAllMetricsWithRealWorldValues() throws IOException { + TestDirectory testDirectory = new TestDirectory(); + Time time = new MockTime(0L, 100L, 1000L); + // Simulate real-world values where rchar/wchar are much larger than read_bytes/write_bytes + // (due to page cache hits) + testDirectory.writeProcFile( + 1048576L, // read_bytes: 1 MB actually read from disk + 2097152L, // write_bytes: 2 MB actually written to disk + 10485760L, // rchar: 10 MB total reads (9 MB from cache) + 20971520L, // wchar: 20 MB total writes (18 MB cached) + 150L, // syscr: 150 read syscalls + 300L, // syscw: 300 write syscalls + 524288L // cancelled_write_bytes: 512 KB cancelled + ); + LinuxIoMetricsCollector collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath(), time); + + assertTrue(collector.usable()); + assertEquals(1048576L, collector.readBytes()); + assertEquals(2097152L, collector.writeBytes()); + assertEquals(10485760L, collector.rchar()); + assertEquals(20971520L, collector.wchar()); + assertEquals(150L, collector.syscr()); + assertEquals(300L, collector.syscw()); + assertEquals(524288L, collector.cancelledWriteBytes()); } @Test @@ -78,14 +120,15 @@ static class TestDirectory { selfDir = Files.createDirectories(baseDir.toPath().resolve("self")); } - void writeProcFile(long readBytes, long writeBytes) throws IOException { - String bld = "rchar: 0\n" + - "wchar: 0\n" + - "syschr: 0\n" + - "syscw: 0\n" + + void writeProcFile(long readBytes, long writeBytes, long rchar, long wchar, + long syscr, long syscw, long cancelledWriteBytes) throws IOException { + String bld = "rchar: " + rchar + "\n" + + "wchar: " + wchar + "\n" + + "syscr: " + syscr + "\n" + + "syscw: " + syscw + "\n" + "read_bytes: " + readBytes + "\n" + "write_bytes: " + writeBytes + "\n" + - "cancelled_write_bytes: 0\n"; + "cancelled_write_bytes: " + cancelledWriteBytes + "\n"; Files.writeString(selfDir.resolve("io"), bld); } }