From b0eb041bb4db8229d594e3ef45fa34613ff6045e Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Thu, 16 Apr 2026 10:41:34 -0700 Subject: [PATCH] Optimize DLQ segment directory scans with single-pass logic. (#18970) * Optimize DLQ segment directory scans with single-pass DirectoryStream lookups Before this change, listing segment files and finding max segment ID logic was using plain Java stream to list all files, then filter by size and sort. With this PR change, we optimize DLQ segment file lookups to use single-pass directory scans. Use DirectoryStream with OS-level glob instead of listing all files, find the min or max segment. There are use-cases which require size > 0 when updating oldest file segment and no size check when removing oldest segment file which will be handled in a single logic. * Move file size condition after the extract segment ID. * Add unit tests * Update logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestions from code review Refine the code comment. Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> * When removing the segment, track DLQ currentQueueSize incrementally instead of rescanning filesystem * Update logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java Apply Java doc suggestion, provides clearer signal. Co-authored-by: Andrea Selva --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Andrea Selva (cherry picked from commit 894ca214c91bf9eabeb80a8f5c8956ee0dfc6b5c) --- .../common/io/DeadLetterQueueUtils.java | 43 +++- .../common/io/DeadLetterQueueWriter.java | 26 +-- .../common/io/DeadLetterQueueReaderTest.java | 22 -- .../common/io/DeadLetterQueueUtilsTest.java | 209 ++++++++++++++++++ 4 files changed, 256 insertions(+), 44 deletions(-) create mode 100644 logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueUtilsTest.java diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java index 06a6a507462..fbd1dc314bd 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java @@ -24,10 +24,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.Comparator; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -48,7 +49,7 @@ static int extractSegmentId(final Path p) { static Stream listFiles(Path path, String suffix) throws IOException { try(final Stream files = Files.list(path)) { - return files.filter(p -> p.toString().endsWith(suffix)) + return files.filter(p -> p.getFileName().toString().endsWith(suffix)) .collect(Collectors.toList()).stream(); } } @@ -57,9 +58,41 @@ static Stream listSegmentPaths(Path path) throws IOException { return listFiles(path, ".log"); } - static Stream listSegmentPathsSortedBySegmentId(Path path) throws IOException { - return listSegmentPaths(path) - .sorted(Comparator.comparingInt(DeadLetterQueueUtils::extractSegmentId)); + /** +Finds the max segment ID between the `.log` file segments using OS-level glob filtering. + */ + static int maxSegmentId(Path path) throws IOException { + int max = 0; + try (DirectoryStream stream = Files.newDirectoryStream(path, "*.log")) { + for (Path p : stream) { + max = Math.max(max, extractSegmentId(p)); + } + } + return max; + } + + /** + * Single-pass scan that finds the segment path with the smallest segment ID, + * optionally skipping segments with size less than or equal to {@code minFileSize} + * when {@code minFileSize} is greater than zero. + * Returns empty when no matching segment exists. + */ + static Optional oldestSegmentPath(Path path, long minFileSize) throws IOException { + Path oldest = null; + int oldestId = Integer.MAX_VALUE; + try (DirectoryStream stream = Files.newDirectoryStream(path, "*.log")) { + for (Path p : stream) { + int id = extractSegmentId(p); + if (id < oldestId) { + if (minFileSize > 0 && Files.size(p) <= minFileSize) { + continue; + } + oldestId = id; + oldest = p; + } + } + } + return Optional.ofNullable(oldest); } /** diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index 94a25ad93d1..23c4e7e8f29 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -73,9 +73,6 @@ import org.logstash.FileLockFactory; import org.logstash.Timestamp; -import static org.logstash.common.io.DeadLetterQueueUtils.listFiles; -import static org.logstash.common.io.DeadLetterQueueUtils.listSegmentPaths; -import static org.logstash.common.io.DeadLetterQueueUtils.listSegmentPathsSortedBySegmentId; import static org.logstash.common.io.RecordIOReader.SegmentStatus; import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE; import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE; @@ -303,10 +300,7 @@ private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, f cleanupTempFiles(); updateOldestSegmentReference(); - currentSegmentIndex = listSegmentPaths(queuePath) - .map(s -> s.getFileName().toString().split("\\.")[0]) - .mapToInt(Integer::parseInt) - .max().orElse(0); + currentSegmentIndex = DeadLetterQueueUtils.maxSegmentId(queuePath); nextWriter(); this.lastEntryTimestamp = Timestamp.now(); this.flusherService = flusherService; @@ -547,8 +541,6 @@ private void deleteExpiredSegments() throws IOException { updateOldestSegmentReference(); cleanNextSegment = isOldestSegmentExpired(); } while (cleanNextSegment); - - this.currentQueueSize.set(computeQueueSize()); } /** @@ -563,8 +555,10 @@ private void deleteExpiredSegments() throws IOException { * */ private long deleteTailSegment(Path segment, String motivation) throws IOException { try { + long segmentSize = Files.size(segment); long eventsInSegment = DeadLetterQueueUtils.countEventsInSegment(segment); Files.delete(segment); + currentQueueSize.addAndGet(-segmentSize); logger.debug("Removed segment file {} due to {}", segment, motivation); return eventsInSegment; } catch (NoSuchFileException nsfex) { @@ -577,9 +571,7 @@ private long deleteTailSegment(Path segment, String motivation) throws IOExcepti // package-private for testing void updateOldestSegmentReference() throws IOException { final Optional previousOldestSegmentPath = oldestSegmentPath; - oldestSegmentPath = listSegmentPathsSortedBySegmentId(this.queuePath) - .filter(p -> p.toFile().length() > 1) // take the files that have content to process - .findFirst(); + oldestSegmentPath = DeadLetterQueueUtils.oldestSegmentPath(this.queuePath, 1); if (!oldestSegmentPath.isPresent()) { oldestSegmentTimestamp = Optional.empty(); return; @@ -634,14 +626,13 @@ static Optional readTimestampOfLastEventInSegment(Path segmentPath) t // package-private for testing void dropTailSegment() throws IOException { // remove oldest segment - final Optional oldestSegment = listSegmentPathsSortedBySegmentId(queuePath).findFirst(); + final Optional oldestSegment = DeadLetterQueueUtils.oldestSegmentPath(queuePath, 0); if (oldestSegment.isPresent()) { final Path beheadedSegment = oldestSegment.get(); deleteTailSegment(beheadedSegment, "dead letter queue size exceeded dead_letter_queue.max_bytes size(" + maxQueueSize + ")"); } else { logger.info("Queue size {} exceeded, but no complete DLQ segments found", maxQueueSize); } - this.currentQueueSize.set(computeQueueSize()); } /** @@ -656,6 +647,7 @@ private static boolean alreadyProcessed(final Event event) { return event.includes(DEAD_LETTER_QUEUE_METADATA_KEY); } + // main method for flush scheduler private void scheduledFlushCheck() { logger.trace("Running scheduled check"); lock.lock(); @@ -716,7 +708,7 @@ private void sealSegment(int segmentIndex, SealReason motivation) throws IOExcep } private long computeQueueSize() throws IOException { - return listSegmentPaths(this.queuePath) + return DeadLetterQueueUtils.listSegmentPaths(this.queuePath) .mapToLong(DeadLetterQueueWriter::safeFileSize) .sum(); } @@ -753,12 +745,12 @@ private void nextWriter() throws IOException { // segment file with the same base name exists, or rename the // temp file to the segment file, which can happen when a process ends abnormally private void cleanupTempFiles() throws IOException { - listFiles(queuePath, ".log.tmp") + DeadLetterQueueUtils.listFiles(queuePath, ".log.tmp") .forEach(this::cleanupTempFile); } // check if there is a corresponding .log file - if yes delete the temp file, if no atomic move the - // temp file to be a new segment file.. + // temp file to be a new segment file. private void cleanupTempFile(final Path tempFile) { String segmentName = tempFile.getFileName().toString().split("\\.")[0]; Path segmentFile = queuePath.resolve(String.format("%s.log", segmentName)); diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index 90f53df7739..da8ae7e91eb 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -1124,26 +1124,4 @@ public void testReaderLockProhibitMultipleInstances() throws IOException { } } } - - @Test - public void testExtractSegmentIdWithValidFileName() { - Path validPath = Paths.get("123.log"); - assertEquals(123, DeadLetterQueueUtils.extractSegmentId(validPath)); - - Path singleDigitPath = Paths.get("1.log"); - assertEquals(1, DeadLetterQueueUtils.extractSegmentId(singleDigitPath)); - - Path largeNumberPath = Paths.get("999999.log"); - assertEquals(999999, DeadLetterQueueUtils.extractSegmentId(largeNumberPath)); - } - - @Test - public void testExtractSegmentIdWithNoLogExtensionThrowsException() { - Path noExtensionPath = Paths.get("123.txt"); - IllegalArgumentException exception = Assert.assertThrows( - IllegalArgumentException.class, - () -> DeadLetterQueueUtils.extractSegmentId(noExtensionPath) - ); - assertThat(exception.getMessage(), containsString("Invalid segment file name")); - } } diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueUtilsTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueUtilsTest.java new file mode 100644 index 00000000000..6e7d9bdee22 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueUtilsTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common.io; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class DeadLetterQueueUtilsTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path dir; + + @Before + public void setUp() throws Exception { + dir = temporaryFolder.newFolder().toPath(); + } + + private void createSegmentFile(int id, int size) throws IOException { + Files.write(dir.resolve(id + ".log"), new byte[size]); + } + + private void createSegmentFile(int id) throws IOException { + createSegmentFile(id, 1024); + } + + // --- extractSegmentId --- + + @Test + public void testExtractSegmentIdWithValidFileName() { + assertEquals(123, DeadLetterQueueUtils.extractSegmentId(Paths.get("123.log"))); + assertEquals(1, DeadLetterQueueUtils.extractSegmentId(Paths.get("1.log"))); + assertEquals(999999, DeadLetterQueueUtils.extractSegmentId(Paths.get("999999.log"))); + } + + @Test + public void testExtractSegmentIdWithNoLogExtensionThrowsException() { + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> DeadLetterQueueUtils.extractSegmentId(Paths.get("123.txt")) + ); + assertThat(exception.getMessage(), containsString("Invalid segment file name")); + } + + // --- maxSegmentId --- + + @Test + public void testMaxSegmentIdEmptyDirectory() throws IOException { + assertEquals(0, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + @Test + public void testMaxSegmentIdSingleSegment() throws IOException { + createSegmentFile(5); + assertEquals(5, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + @Test + public void testMaxSegmentIdMultipleSegments() throws IOException { + createSegmentFile(1); + createSegmentFile(3); + createSegmentFile(7); + createSegmentFile(2); + assertEquals(7, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + @Test + public void testMaxSegmentIdNonContiguousIds() throws IOException { + createSegmentFile(10); + createSegmentFile(500); + createSegmentFile(42); + assertEquals(500, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + @Test + public void testMaxSegmentIdIgnoresNonLogFiles() throws IOException { + createSegmentFile(3); + Files.write(dir.resolve("5.log.tmp"), new byte[100]); + Files.write(dir.resolve("notes.txt"), new byte[100]); + assertEquals(3, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + // --- oldestSegmentPath (no minFileSize) --- + + @Test + public void testOldestSegmentPathEmptyDirectory() throws IOException { + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertFalse(result.isPresent()); + } + + @Test + public void testOldestSegmentPathSingleSegment() throws IOException { + createSegmentFile(5); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("5.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathMultipleSegments() throws IOException { + createSegmentFile(3); + createSegmentFile(1); + createSegmentFile(7); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("1.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathNonContiguousIds() throws IOException { + createSegmentFile(100); + createSegmentFile(42); + createSegmentFile(999); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("42.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathIgnoresNonLogFiles() throws IOException { + createSegmentFile(10); + Files.write(dir.resolve("1.log.tmp"), new byte[100]); + Files.write(dir.resolve("data.txt"), new byte[100]); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("10.log", result.get().getFileName().toString()); + } + + // --- oldestSegmentPath (with minFileSize) --- + + @Test + public void testOldestSegmentPathWithMinSizeSkipsSmallFiles() throws IOException { + createSegmentFile(1, 0); + createSegmentFile(2, 1); + createSegmentFile(3, 100); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1); + assertTrue(result.isPresent()); + assertEquals("3.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathWithMinSizeReturnsSmallestQualifyingId() throws IOException { + createSegmentFile(5, 0); + createSegmentFile(10, 512); + createSegmentFile(3, 512); + createSegmentFile(7, 0); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1); + assertTrue(result.isPresent()); + assertEquals("3.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathWithMinSizeAllTooSmall() throws IOException { + createSegmentFile(1, 0); + createSegmentFile(2, 1); + createSegmentFile(3, 0); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1); + assertFalse(result.isPresent()); + } + + @Test + public void testOldestSegmentPathWithMinSizeEmptyDirectory() throws IOException { + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1); + assertFalse(result.isPresent()); + } + + @Test + public void testOldestSegmentPathWithMinSizeZeroBehavesAsNoFilter() throws IOException { + createSegmentFile(5, 0); + createSegmentFile(2, 0); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("2.log", result.get().getFileName().toString()); + } + +}