Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ class ControllerServer(
if (linuxIoMetricsCollector.usable()) {
metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
metricsGroup.newGauge("linux-disk-rchar", () => linuxIoMetricsCollector.rchar())
metricsGroup.newGauge("linux-disk-wchar", () => linuxIoMetricsCollector.wchar())
metricsGroup.newGauge("linux-disk-syscr", () => linuxIoMetricsCollector.syscr())
metricsGroup.newGauge("linux-disk-syscw", () => linuxIoMetricsCollector.syscw())
metricsGroup.newGauge("linux-disk-cancelled-write-bytes", () => linuxIoMetricsCollector.cancelledWriteBytes())
}

authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.ControllerRole.toString)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,10 @@ trait KafkaBroker extends Logging {
if (linuxIoMetricsCollector.usable()) {
metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
metricsGroup.newGauge("linux-disk-rchar", () => linuxIoMetricsCollector.rchar())
metricsGroup.newGauge("linux-disk-wchar", () => linuxIoMetricsCollector.wchar())
metricsGroup.newGauge("linux-disk-syscr", () => linuxIoMetricsCollector.syscr())
metricsGroup.newGauge("linux-disk-syscw", () => linuxIoMetricsCollector.syscw())
metricsGroup.newGauge("linux-disk-cancelled-write-bytes", () => linuxIoMetricsCollector.cancelledWriteBytes())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@ public class LinuxIoMetricsCollector {
private static final Logger LOG = LoggerFactory.getLogger(LinuxIoMetricsCollector.class);
private static final String READ_BYTES_PREFIX = "read_bytes: ";
private static final String WRITE_BYTES_PREFIX = "write_bytes: ";
private static final String RCHAR_PREFIX = "rchar: ";
private static final String WCHAR_PREFIX = "wchar: ";
private static final String SYSCR_PREFIX = "syscr: ";
private static final String SYSCW_PREFIX = "syscw: ";
private static final String CANCELLED_WRITE_BYTES_PREFIX = "cancelled_write_bytes: ";

private final Time time;
private final Path path;

private long lastUpdateMs = -1L;
private long cachedReadBytes = 0L;
private long cachedWriteBytes = 0L;
private long cachedRchar = 0L;
private long cachedWchar = 0L;
private long cachedSyscr = 0L;
private long cachedSyscw = 0L;
private long cachedCancelledWriteBytes = 0L;

public LinuxIoMetricsCollector(String procRoot, Time time) {
this.time = time;
Expand All @@ -68,6 +78,76 @@ public long writeBytes() {
}
}

/**
* Returns the total number of characters read (includes cached reads).
* This value represents all read operations, including those satisfied by the page cache.
*/
public long rchar() {
synchronized (this) {
long curMs = time.milliseconds();
if (curMs != lastUpdateMs) {
updateValues(curMs);
}
return cachedRchar;
}
}

/**
* Returns the total number of characters written (includes cached writes).
* This value represents all write operations, including those that may not have reached disk.
*/
public long wchar() {
synchronized (this) {
long curMs = time.milliseconds();
if (curMs != lastUpdateMs) {
updateValues(curMs);
}
return cachedWchar;
}
}

/**
* Returns the number of read system calls.
* This metric helps identify I/O patterns and syscall overhead.
*/
public long syscr() {
synchronized (this) {
long curMs = time.milliseconds();
if (curMs != lastUpdateMs) {
updateValues(curMs);
}
return cachedSyscr;
}
}

/**
* Returns the number of write system calls.
* This metric helps identify I/O patterns and syscall overhead.
*/
public long syscw() {
synchronized (this) {
long curMs = time.milliseconds();
if (curMs != lastUpdateMs) {
updateValues(curMs);
}
return cachedSyscw;
}
}

/**
* Returns the number of bytes that were cancelled before being written.
* This can occur when a write is truncated or cancelled.
*/
public long cancelledWriteBytes() {
synchronized (this) {
long curMs = time.milliseconds();
if (curMs != lastUpdateMs) {
updateValues(curMs);
}
return cachedCancelledWriteBytes;
}
}

/**
* Read /proc/self/io.
* Generally, each line in this file contains a prefix followed by a colon and a number.
Expand All @@ -85,12 +165,27 @@ private boolean updateValues(long now) {
try {
cachedReadBytes = -1L;
cachedWriteBytes = -1L;
cachedRchar = -1L;
cachedWchar = -1L;
cachedSyscr = -1L;
cachedSyscw = -1L;
cachedCancelledWriteBytes = -1L;
List<String> lines = Files.readAllLines(path, StandardCharsets.UTF_8);
for (String line : lines) {
if (line.startsWith(READ_BYTES_PREFIX)) {
cachedReadBytes = Long.parseLong(line.substring(READ_BYTES_PREFIX.length()));
} else if (line.startsWith(WRITE_BYTES_PREFIX)) {
cachedWriteBytes = Long.parseLong(line.substring(WRITE_BYTES_PREFIX.length()));
} else if (line.startsWith(RCHAR_PREFIX)) {
cachedRchar = Long.parseLong(line.substring(RCHAR_PREFIX.length()));
} else if (line.startsWith(WCHAR_PREFIX)) {
cachedWchar = Long.parseLong(line.substring(WCHAR_PREFIX.length()));
} else if (line.startsWith(SYSCR_PREFIX)) {
cachedSyscr = Long.parseLong(line.substring(SYSCR_PREFIX.length()));
} else if (line.startsWith(SYSCW_PREFIX)) {
cachedSyscw = Long.parseLong(line.substring(SYSCW_PREFIX.length()));
} else if (line.startsWith(CANCELLED_WRITE_BYTES_PREFIX)) {
cachedCancelledWriteBytes = Long.parseLong(line.substring(CANCELLED_WRITE_BYTES_PREFIX.length()));
}
}
lastUpdateMs = now;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,65 @@ public class LinuxIoMetricsCollectorTest {
public void testReadProcFile() throws IOException {
TestDirectory testDirectory = new TestDirectory();
Time time = new MockTime(0L, 100L, 1000L);
testDirectory.writeProcFile(123L, 456L);
testDirectory.writeProcFile(123L, 456L, 1000L, 2000L, 10L, 20L, 5L);
LinuxIoMetricsCollector collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath(), time);

// Test that we can read the values we wrote.
assertTrue(collector.usable());
assertEquals(123L, collector.readBytes());
assertEquals(456L, collector.writeBytes());
testDirectory.writeProcFile(124L, 457L);
assertEquals(1000L, collector.rchar());
assertEquals(2000L, collector.wchar());
assertEquals(10L, collector.syscr());
assertEquals(20L, collector.syscw());
assertEquals(5L, collector.cancelledWriteBytes());
testDirectory.writeProcFile(124L, 457L, 1001L, 2001L, 11L, 21L, 6L);

// The previous values should still be cached.
assertEquals(123L, collector.readBytes());
assertEquals(456L, collector.writeBytes());
assertEquals(1000L, collector.rchar());
assertEquals(2000L, collector.wchar());
assertEquals(10L, collector.syscr());
assertEquals(20L, collector.syscw());
assertEquals(5L, collector.cancelledWriteBytes());

// Update the time, and the values should be re-read.
time.sleep(1);
assertEquals(124L, collector.readBytes());
assertEquals(457L, collector.writeBytes());
assertEquals(1001L, collector.rchar());
assertEquals(2001L, collector.wchar());
assertEquals(11L, collector.syscr());
assertEquals(21L, collector.syscw());
assertEquals(6L, collector.cancelledWriteBytes());
}

@Test
public void testAllMetricsWithRealWorldValues() throws IOException {
TestDirectory testDirectory = new TestDirectory();
Time time = new MockTime(0L, 100L, 1000L);
// Simulate real-world values where rchar/wchar are much larger than read_bytes/write_bytes
// (due to page cache hits)
testDirectory.writeProcFile(
1048576L, // read_bytes: 1 MB actually read from disk
2097152L, // write_bytes: 2 MB actually written to disk
10485760L, // rchar: 10 MB total reads (9 MB from cache)
20971520L, // wchar: 20 MB total writes (18 MB cached)
150L, // syscr: 150 read syscalls
300L, // syscw: 300 write syscalls
524288L // cancelled_write_bytes: 512 KB cancelled
);
LinuxIoMetricsCollector collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath(), time);

assertTrue(collector.usable());
assertEquals(1048576L, collector.readBytes());
assertEquals(2097152L, collector.writeBytes());
assertEquals(10485760L, collector.rchar());
assertEquals(20971520L, collector.wchar());
assertEquals(150L, collector.syscr());
assertEquals(300L, collector.syscw());
assertEquals(524288L, collector.cancelledWriteBytes());
}

@Test
Expand All @@ -78,14 +120,15 @@ static class TestDirectory {
selfDir = Files.createDirectories(baseDir.toPath().resolve("self"));
}

void writeProcFile(long readBytes, long writeBytes) throws IOException {
String bld = "rchar: 0\n" +
"wchar: 0\n" +
"syschr: 0\n" +
"syscw: 0\n" +
void writeProcFile(long readBytes, long writeBytes, long rchar, long wchar,
long syscr, long syscw, long cancelledWriteBytes) throws IOException {
String bld = "rchar: " + rchar + "\n" +
"wchar: " + wchar + "\n" +
"syscr: " + syscr + "\n" +
"syscw: " + syscw + "\n" +
"read_bytes: " + readBytes + "\n" +
"write_bytes: " + writeBytes + "\n" +
"cancelled_write_bytes: 0\n";
"cancelled_write_bytes: " + cancelledWriteBytes + "\n";
Files.writeString(selfDir.resolve("io"), bld);
}
}
Expand Down