diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java index 06a6a507462..fbd1dc314bd 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java @@ -24,10 +24,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.Comparator; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -48,7 +49,7 @@ static int extractSegmentId(final Path p) { static Stream listFiles(Path path, String suffix) throws IOException { try(final Stream files = Files.list(path)) { - return files.filter(p -> p.toString().endsWith(suffix)) + return files.filter(p -> p.getFileName().toString().endsWith(suffix)) .collect(Collectors.toList()).stream(); } } @@ -57,9 +58,41 @@ static Stream listSegmentPaths(Path path) throws IOException { return listFiles(path, ".log"); } - static Stream listSegmentPathsSortedBySegmentId(Path path) throws IOException { - return listSegmentPaths(path) - .sorted(Comparator.comparingInt(DeadLetterQueueUtils::extractSegmentId)); + /** +Finds the max segment ID between the `.log` file segments using OS-level glob filtering. + */ + static int maxSegmentId(Path path) throws IOException { + int max = 0; + try (DirectoryStream stream = Files.newDirectoryStream(path, "*.log")) { + for (Path p : stream) { + max = Math.max(max, extractSegmentId(p)); + } + } + return max; + } + + /** + * Single-pass scan that finds the segment path with the smallest segment ID, + * optionally skipping segments with size less than or equal to {@code minFileSize} + * when {@code minFileSize} is greater than zero. + * Returns empty when no matching segment exists. + */ + static Optional oldestSegmentPath(Path path, long minFileSize) throws IOException { + Path oldest = null; + int oldestId = Integer.MAX_VALUE; + try (DirectoryStream stream = Files.newDirectoryStream(path, "*.log")) { + for (Path p : stream) { + int id = extractSegmentId(p); + if (id < oldestId) { + if (minFileSize > 0 && Files.size(p) <= minFileSize) { + continue; + } + oldestId = id; + oldest = p; + } + } + } + return Optional.ofNullable(oldest); } /** diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index 94a25ad93d1..23c4e7e8f29 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -73,9 +73,6 @@ import org.logstash.FileLockFactory; import org.logstash.Timestamp; -import static org.logstash.common.io.DeadLetterQueueUtils.listFiles; -import static org.logstash.common.io.DeadLetterQueueUtils.listSegmentPaths; -import static org.logstash.common.io.DeadLetterQueueUtils.listSegmentPathsSortedBySegmentId; import static org.logstash.common.io.RecordIOReader.SegmentStatus; import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE; import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE; @@ -303,10 +300,7 @@ private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, f cleanupTempFiles(); updateOldestSegmentReference(); - currentSegmentIndex = listSegmentPaths(queuePath) - .map(s -> s.getFileName().toString().split("\\.")[0]) - .mapToInt(Integer::parseInt) - .max().orElse(0); + currentSegmentIndex = DeadLetterQueueUtils.maxSegmentId(queuePath); nextWriter(); this.lastEntryTimestamp = Timestamp.now(); this.flusherService = flusherService; @@ -547,8 +541,6 @@ private void deleteExpiredSegments() throws IOException { updateOldestSegmentReference(); cleanNextSegment = isOldestSegmentExpired(); } while (cleanNextSegment); - - this.currentQueueSize.set(computeQueueSize()); } /** @@ -563,8 +555,10 @@ private void deleteExpiredSegments() throws IOException { * */ private long deleteTailSegment(Path segment, String motivation) throws IOException { try { + long segmentSize = Files.size(segment); long eventsInSegment = DeadLetterQueueUtils.countEventsInSegment(segment); Files.delete(segment); + currentQueueSize.addAndGet(-segmentSize); logger.debug("Removed segment file {} due to {}", segment, motivation); return eventsInSegment; } catch (NoSuchFileException nsfex) { @@ -577,9 +571,7 @@ private long deleteTailSegment(Path segment, String motivation) throws IOExcepti // package-private for testing void updateOldestSegmentReference() throws IOException { final Optional previousOldestSegmentPath = oldestSegmentPath; - oldestSegmentPath = listSegmentPathsSortedBySegmentId(this.queuePath) - .filter(p -> p.toFile().length() > 1) // take the files that have content to process - .findFirst(); + oldestSegmentPath = DeadLetterQueueUtils.oldestSegmentPath(this.queuePath, 1); if (!oldestSegmentPath.isPresent()) { oldestSegmentTimestamp = Optional.empty(); return; @@ -634,14 +626,13 @@ static Optional readTimestampOfLastEventInSegment(Path segmentPath) t // package-private for testing void dropTailSegment() throws IOException { // remove oldest segment - final Optional oldestSegment = listSegmentPathsSortedBySegmentId(queuePath).findFirst(); + final Optional oldestSegment = DeadLetterQueueUtils.oldestSegmentPath(queuePath, 0); if (oldestSegment.isPresent()) { final Path beheadedSegment = oldestSegment.get(); deleteTailSegment(beheadedSegment, "dead letter queue size exceeded dead_letter_queue.max_bytes size(" + maxQueueSize + ")"); } else { logger.info("Queue size {} exceeded, but no complete DLQ segments found", maxQueueSize); } - this.currentQueueSize.set(computeQueueSize()); } /** @@ -656,6 +647,7 @@ private static boolean alreadyProcessed(final Event event) { return event.includes(DEAD_LETTER_QUEUE_METADATA_KEY); } + // main method for flush scheduler private void scheduledFlushCheck() { logger.trace("Running scheduled check"); lock.lock(); @@ -716,7 +708,7 @@ private void sealSegment(int segmentIndex, SealReason motivation) throws IOExcep } private long computeQueueSize() throws IOException { - return listSegmentPaths(this.queuePath) + return DeadLetterQueueUtils.listSegmentPaths(this.queuePath) .mapToLong(DeadLetterQueueWriter::safeFileSize) .sum(); } @@ -753,12 +745,12 @@ private void nextWriter() throws IOException { // segment file with the same base name exists, or rename the // temp file to the segment file, which can happen when a process ends abnormally private void cleanupTempFiles() throws IOException { - listFiles(queuePath, ".log.tmp") + DeadLetterQueueUtils.listFiles(queuePath, ".log.tmp") .forEach(this::cleanupTempFile); } // check if there is a corresponding .log file - if yes delete the temp file, if no atomic move the - // temp file to be a new segment file.. + // temp file to be a new segment file. private void cleanupTempFile(final Path tempFile) { String segmentName = tempFile.getFileName().toString().split("\\.")[0]; Path segmentFile = queuePath.resolve(String.format("%s.log", segmentName)); diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index 90f53df7739..da8ae7e91eb 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -1124,26 +1124,4 @@ public void testReaderLockProhibitMultipleInstances() throws IOException { } } } - - @Test - public void testExtractSegmentIdWithValidFileName() { - Path validPath = Paths.get("123.log"); - assertEquals(123, DeadLetterQueueUtils.extractSegmentId(validPath)); - - Path singleDigitPath = Paths.get("1.log"); - assertEquals(1, DeadLetterQueueUtils.extractSegmentId(singleDigitPath)); - - Path largeNumberPath = Paths.get("999999.log"); - assertEquals(999999, DeadLetterQueueUtils.extractSegmentId(largeNumberPath)); - } - - @Test - public void testExtractSegmentIdWithNoLogExtensionThrowsException() { - Path noExtensionPath = Paths.get("123.txt"); - IllegalArgumentException exception = Assert.assertThrows( - IllegalArgumentException.class, - () -> DeadLetterQueueUtils.extractSegmentId(noExtensionPath) - ); - assertThat(exception.getMessage(), containsString("Invalid segment file name")); - } } diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueUtilsTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueUtilsTest.java new file mode 100644 index 00000000000..6e7d9bdee22 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueUtilsTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common.io; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class DeadLetterQueueUtilsTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path dir; + + @Before + public void setUp() throws Exception { + dir = temporaryFolder.newFolder().toPath(); + } + + private void createSegmentFile(int id, int size) throws IOException { + Files.write(dir.resolve(id + ".log"), new byte[size]); + } + + private void createSegmentFile(int id) throws IOException { + createSegmentFile(id, 1024); + } + + // --- extractSegmentId --- + + @Test + public void testExtractSegmentIdWithValidFileName() { + assertEquals(123, DeadLetterQueueUtils.extractSegmentId(Paths.get("123.log"))); + assertEquals(1, DeadLetterQueueUtils.extractSegmentId(Paths.get("1.log"))); + assertEquals(999999, DeadLetterQueueUtils.extractSegmentId(Paths.get("999999.log"))); + } + + @Test + public void testExtractSegmentIdWithNoLogExtensionThrowsException() { + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> DeadLetterQueueUtils.extractSegmentId(Paths.get("123.txt")) + ); + assertThat(exception.getMessage(), containsString("Invalid segment file name")); + } + + // --- maxSegmentId --- + + @Test + public void testMaxSegmentIdEmptyDirectory() throws IOException { + assertEquals(0, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + @Test + public void testMaxSegmentIdSingleSegment() throws IOException { + createSegmentFile(5); + assertEquals(5, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + @Test + public void testMaxSegmentIdMultipleSegments() throws IOException { + createSegmentFile(1); + createSegmentFile(3); + createSegmentFile(7); + createSegmentFile(2); + assertEquals(7, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + @Test + public void testMaxSegmentIdNonContiguousIds() throws IOException { + createSegmentFile(10); + createSegmentFile(500); + createSegmentFile(42); + assertEquals(500, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + @Test + public void testMaxSegmentIdIgnoresNonLogFiles() throws IOException { + createSegmentFile(3); + Files.write(dir.resolve("5.log.tmp"), new byte[100]); + Files.write(dir.resolve("notes.txt"), new byte[100]); + assertEquals(3, DeadLetterQueueUtils.maxSegmentId(dir)); + } + + // --- oldestSegmentPath (no minFileSize) --- + + @Test + public void testOldestSegmentPathEmptyDirectory() throws IOException { + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertFalse(result.isPresent()); + } + + @Test + public void testOldestSegmentPathSingleSegment() throws IOException { + createSegmentFile(5); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("5.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathMultipleSegments() throws IOException { + createSegmentFile(3); + createSegmentFile(1); + createSegmentFile(7); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("1.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathNonContiguousIds() throws IOException { + createSegmentFile(100); + createSegmentFile(42); + createSegmentFile(999); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("42.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathIgnoresNonLogFiles() throws IOException { + createSegmentFile(10); + Files.write(dir.resolve("1.log.tmp"), new byte[100]); + Files.write(dir.resolve("data.txt"), new byte[100]); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("10.log", result.get().getFileName().toString()); + } + + // --- oldestSegmentPath (with minFileSize) --- + + @Test + public void testOldestSegmentPathWithMinSizeSkipsSmallFiles() throws IOException { + createSegmentFile(1, 0); + createSegmentFile(2, 1); + createSegmentFile(3, 100); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1); + assertTrue(result.isPresent()); + assertEquals("3.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathWithMinSizeReturnsSmallestQualifyingId() throws IOException { + createSegmentFile(5, 0); + createSegmentFile(10, 512); + createSegmentFile(3, 512); + createSegmentFile(7, 0); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1); + assertTrue(result.isPresent()); + assertEquals("3.log", result.get().getFileName().toString()); + } + + @Test + public void testOldestSegmentPathWithMinSizeAllTooSmall() throws IOException { + createSegmentFile(1, 0); + createSegmentFile(2, 1); + createSegmentFile(3, 0); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1); + assertFalse(result.isPresent()); + } + + @Test + public void testOldestSegmentPathWithMinSizeEmptyDirectory() throws IOException { + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1); + assertFalse(result.isPresent()); + } + + @Test + public void testOldestSegmentPathWithMinSizeZeroBehavesAsNoFilter() throws IOException { + createSegmentFile(5, 0); + createSegmentFile(2, 0); + Optional result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0); + assertTrue(result.isPresent()); + assertEquals("2.log", result.get().getFileName().toString()); + } + +}