Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Comparator;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -48,7 +49,7 @@ static int extractSegmentId(final Path p) {

static Stream<Path> listFiles(Path path, String suffix) throws IOException {
try(final Stream<Path> files = Files.list(path)) {
return files.filter(p -> p.toString().endsWith(suffix))
return files.filter(p -> p.getFileName().toString().endsWith(suffix))
.collect(Collectors.toList()).stream();
}
}
Expand All @@ -57,9 +58,42 @@ static Stream<Path> listSegmentPaths(Path path) throws IOException {
return listFiles(path, ".log");
}

static Stream<Path> listSegmentPathsSortedBySegmentId(Path path) throws IOException {
return listSegmentPaths(path)
.sorted(Comparator.comparingInt(DeadLetterQueueUtils::extractSegmentId));
/**
* Single-pass scan using OS-level glob filtering; avoids materializing
* all paths into a collection when only the numeric maximum is needed.
Comment thread
mashhurs marked this conversation as resolved.
Outdated
Comment thread
mashhurs marked this conversation as resolved.
Outdated
*/
static int maxSegmentId(Path path) throws IOException {
int max = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.log")) {
for (Path p : stream) {
max = Math.max(max, extractSegmentId(p));
}
}
return max;
}

/**
* Single-pass scan that finds the segment path with the smallest segment ID,
* optionally skipping segments with size less than or equal to {@code minFileSize}
* when {@code minFileSize} is greater than zero.
* Returns empty when no matching segment exists.
*/
static Optional<Path> oldestSegmentPath(Path path, long minFileSize) throws IOException {
Path oldest = null;
int oldestId = Integer.MAX_VALUE;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.log")) {
for (Path p : stream) {
int id = extractSegmentId(p);
if (id < oldestId) {
if (minFileSize > 0 && Files.size(p) <= minFileSize) {
continue;
}
Comment thread
andsel marked this conversation as resolved.
oldestId = id;
oldest = p;
}
}
}
return Optional.ofNullable(oldest);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@
import org.logstash.FileLockFactory;
import org.logstash.Timestamp;

import static org.logstash.common.io.DeadLetterQueueUtils.listFiles;
import static org.logstash.common.io.DeadLetterQueueUtils.listSegmentPaths;
import static org.logstash.common.io.DeadLetterQueueUtils.listSegmentPathsSortedBySegmentId;
import static org.logstash.common.io.RecordIOReader.SegmentStatus;
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
Expand Down Expand Up @@ -303,10 +300,7 @@ private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, f

cleanupTempFiles();
updateOldestSegmentReference();
currentSegmentIndex = listSegmentPaths(queuePath)
.map(s -> s.getFileName().toString().split("\\.")[0])
.mapToInt(Integer::parseInt)
.max().orElse(0);
currentSegmentIndex = DeadLetterQueueUtils.maxSegmentId(queuePath);
nextWriter();
Comment thread
mashhurs marked this conversation as resolved.
this.lastEntryTimestamp = Timestamp.now();
this.flusherService = flusherService;
Expand Down Expand Up @@ -577,9 +571,7 @@ private long deleteTailSegment(Path segment, String motivation) throws IOExcepti
// package-private for testing
void updateOldestSegmentReference() throws IOException {
final Optional<Path> previousOldestSegmentPath = oldestSegmentPath;
oldestSegmentPath = listSegmentPathsSortedBySegmentId(this.queuePath)
.filter(p -> p.toFile().length() > 1) // take the files that have content to process
.findFirst();
oldestSegmentPath = DeadLetterQueueUtils.oldestSegmentPath(this.queuePath, 1);
if (!oldestSegmentPath.isPresent()) {
oldestSegmentTimestamp = Optional.empty();
return;
Expand Down Expand Up @@ -634,7 +626,7 @@ static Optional<Timestamp> readTimestampOfLastEventInSegment(Path segmentPath) t
// package-private for testing
void dropTailSegment() throws IOException {
// remove oldest segment
final Optional<Path> oldestSegment = listSegmentPathsSortedBySegmentId(queuePath).findFirst();
final Optional<Path> oldestSegment = DeadLetterQueueUtils.oldestSegmentPath(queuePath, 0);
if (oldestSegment.isPresent()) {
final Path beheadedSegment = oldestSegment.get();
deleteTailSegment(beheadedSegment, "dead letter queue size exceeded dead_letter_queue.max_bytes size(" + maxQueueSize + ")");
Expand All @@ -656,6 +648,7 @@ private static boolean alreadyProcessed(final Event event) {
return event.includes(DEAD_LETTER_QUEUE_METADATA_KEY);
}

// main method for flush scheduler
private void scheduledFlushCheck() {
logger.trace("Running scheduled check");
lock.lock();
Expand Down Expand Up @@ -716,7 +709,7 @@ private void sealSegment(int segmentIndex, SealReason motivation) throws IOExcep
}

private long computeQueueSize() throws IOException {
return listSegmentPaths(this.queuePath)
return DeadLetterQueueUtils.listSegmentPaths(this.queuePath)
.mapToLong(DeadLetterQueueWriter::safeFileSize)
.sum();
}
Expand Down Expand Up @@ -753,12 +746,12 @@ private void nextWriter() throws IOException {
// segment file with the same base name exists, or rename the
// temp file to the segment file, which can happen when a process ends abnormally
private void cleanupTempFiles() throws IOException {
listFiles(queuePath, ".log.tmp")
DeadLetterQueueUtils.listFiles(queuePath, ".log.tmp")
.forEach(this::cleanupTempFile);
}

// check if there is a corresponding .log file - if yes delete the temp file, if no atomic move the
// temp file to be a new segment file..
// temp file to be a new segment file.
private void cleanupTempFile(final Path tempFile) {
String segmentName = tempFile.getFileName().toString().split("\\.")[0];
Path segmentFile = queuePath.resolve(String.format("%s.log", segmentName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1124,26 +1124,4 @@ public void testReaderLockProhibitMultipleInstances() throws IOException {
}
}
}

@Test
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to dedicated DeadLetterQueueUtilsTest.java space

public void testExtractSegmentIdWithValidFileName() {
Path validPath = Paths.get("123.log");
assertEquals(123, DeadLetterQueueUtils.extractSegmentId(validPath));

Path singleDigitPath = Paths.get("1.log");
assertEquals(1, DeadLetterQueueUtils.extractSegmentId(singleDigitPath));

Path largeNumberPath = Paths.get("999999.log");
assertEquals(999999, DeadLetterQueueUtils.extractSegmentId(largeNumberPath));
}

@Test
public void testExtractSegmentIdWithNoLogExtensionThrowsException() {
Path noExtensionPath = Paths.get("123.txt");
IllegalArgumentException exception = Assert.assertThrows(
IllegalArgumentException.class,
() -> DeadLetterQueueUtils.extractSegmentId(noExtensionPath)
);
assertThat(exception.getMessage(), containsString("Invalid segment file name"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.common.io;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class DeadLetterQueueUtilsTest {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

private Path dir;

@Before
public void setUp() throws Exception {
dir = temporaryFolder.newFolder().toPath();
}

private void createSegmentFile(int id, int size) throws IOException {
Files.write(dir.resolve(id + ".log"), new byte[size]);
}

private void createSegmentFile(int id) throws IOException {
createSegmentFile(id, 1024);
}

// --- extractSegmentId ---

@Test
public void testExtractSegmentIdWithValidFileName() {
assertEquals(123, DeadLetterQueueUtils.extractSegmentId(Paths.get("123.log")));
assertEquals(1, DeadLetterQueueUtils.extractSegmentId(Paths.get("1.log")));
assertEquals(999999, DeadLetterQueueUtils.extractSegmentId(Paths.get("999999.log")));
}

@Test
public void testExtractSegmentIdWithNoLogExtensionThrowsException() {
IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> DeadLetterQueueUtils.extractSegmentId(Paths.get("123.txt"))
);
assertThat(exception.getMessage(), containsString("Invalid segment file name"));
}

// --- maxSegmentId ---

@Test
public void testMaxSegmentIdEmptyDirectory() throws IOException {
assertEquals(0, DeadLetterQueueUtils.maxSegmentId(dir));
}

@Test
public void testMaxSegmentIdSingleSegment() throws IOException {
createSegmentFile(5);
assertEquals(5, DeadLetterQueueUtils.maxSegmentId(dir));
}

@Test
public void testMaxSegmentIdMultipleSegments() throws IOException {
createSegmentFile(1);
createSegmentFile(3);
createSegmentFile(7);
createSegmentFile(2);
assertEquals(7, DeadLetterQueueUtils.maxSegmentId(dir));
}

@Test
public void testMaxSegmentIdNonContiguousIds() throws IOException {
createSegmentFile(10);
createSegmentFile(500);
createSegmentFile(42);
assertEquals(500, DeadLetterQueueUtils.maxSegmentId(dir));
}

@Test
public void testMaxSegmentIdIgnoresNonLogFiles() throws IOException {
createSegmentFile(3);
Files.write(dir.resolve("5.log.tmp"), new byte[100]);
Files.write(dir.resolve("notes.txt"), new byte[100]);
assertEquals(3, DeadLetterQueueUtils.maxSegmentId(dir));
}

// --- oldestSegmentPath (no minFileSize) ---

@Test
public void testOldestSegmentPathEmptyDirectory() throws IOException {
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0);
assertFalse(result.isPresent());
}

@Test
public void testOldestSegmentPathSingleSegment() throws IOException {
createSegmentFile(5);
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0);
assertTrue(result.isPresent());
assertEquals("5.log", result.get().getFileName().toString());
}

@Test
public void testOldestSegmentPathMultipleSegments() throws IOException {
createSegmentFile(3);
createSegmentFile(1);
createSegmentFile(7);
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0);
assertTrue(result.isPresent());
assertEquals("1.log", result.get().getFileName().toString());
}

@Test
public void testOldestSegmentPathNonContiguousIds() throws IOException {
createSegmentFile(100);
createSegmentFile(42);
createSegmentFile(999);
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0);
assertTrue(result.isPresent());
assertEquals("42.log", result.get().getFileName().toString());
}

@Test
public void testOldestSegmentPathIgnoresNonLogFiles() throws IOException {
createSegmentFile(10);
Files.write(dir.resolve("1.log.tmp"), new byte[100]);
Files.write(dir.resolve("data.txt"), new byte[100]);
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0);
assertTrue(result.isPresent());
assertEquals("10.log", result.get().getFileName().toString());
}

// --- oldestSegmentPath (with minFileSize) ---

@Test
public void testOldestSegmentPathWithMinSizeSkipsSmallFiles() throws IOException {
createSegmentFile(1, 0);
createSegmentFile(2, 1);
createSegmentFile(3, 100);
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1);
assertTrue(result.isPresent());
assertEquals("3.log", result.get().getFileName().toString());
}

@Test
public void testOldestSegmentPathWithMinSizeReturnsSmallestQualifyingId() throws IOException {
createSegmentFile(5, 0);
createSegmentFile(10, 512);
createSegmentFile(3, 512);
createSegmentFile(7, 0);
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1);
assertTrue(result.isPresent());
assertEquals("3.log", result.get().getFileName().toString());
}

@Test
public void testOldestSegmentPathWithMinSizeAllTooSmall() throws IOException {
createSegmentFile(1, 0);
createSegmentFile(2, 1);
createSegmentFile(3, 0);
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1);
assertFalse(result.isPresent());
}

@Test
public void testOldestSegmentPathWithMinSizeEmptyDirectory() throws IOException {
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 1);
assertFalse(result.isPresent());
}

@Test
public void testOldestSegmentPathWithMinSizeZeroBehavesAsNoFilter() throws IOException {
createSegmentFile(5, 0);
createSegmentFile(2, 0);
Optional<Path> result = DeadLetterQueueUtils.oldestSegmentPath(dir, 0);
assertTrue(result.isPresent());
assertEquals("2.log", result.get().getFileName().toString());
}

}
Loading