Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Comparator;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -48,7 +49,7 @@ static int extractSegmentId(final Path p) {

static Stream<Path> listFiles(Path path, String suffix) throws IOException {
try(final Stream<Path> files = Files.list(path)) {
return files.filter(p -> p.toString().endsWith(suffix))
return files.filter(p -> p.getFileName().toString().endsWith(suffix))
.collect(Collectors.toList()).stream();
}
}
Expand All @@ -57,9 +58,41 @@ static Stream<Path> listSegmentPaths(Path path) throws IOException {
return listFiles(path, ".log");
}

static Stream<Path> listSegmentPathsSortedBySegmentId(Path path) throws IOException {
return listSegmentPaths(path)
.sorted(Comparator.comparingInt(DeadLetterQueueUtils::extractSegmentId));
/**
Finds the max segment ID between the `.log` file segments using OS-level glob filtering.
*/
static int maxSegmentId(Path path) throws IOException {
int max = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.log")) {
for (Path p : stream) {
max = Math.max(max, extractSegmentId(p));
}
}
return max;
}

/**
* Single-pass scan that finds the segment path with the smallest segment ID,
* optionally skipping segments with size less than or equal to {@code minFileSize}
* when {@code minFileSize} is greater than zero.
* Returns empty when no matching segment exists.
*/
static Optional<Path> oldestSegmentPath(Path path, long minFileSize) throws IOException {
Path oldest = null;
int oldestId = Integer.MAX_VALUE;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.log")) {
for (Path p : stream) {
int id = extractSegmentId(p);
if (id < oldestId) {
if (minFileSize > 0 && Files.size(p) <= minFileSize) {
continue;
}
oldestId = id;
oldest = p;
}
}
}
return Optional.ofNullable(oldest);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@
import org.logstash.FileLockFactory;
import org.logstash.Timestamp;

import static org.logstash.common.io.DeadLetterQueueUtils.listFiles;
import static org.logstash.common.io.DeadLetterQueueUtils.listSegmentPaths;
import static org.logstash.common.io.DeadLetterQueueUtils.listSegmentPathsSortedBySegmentId;
import static org.logstash.common.io.RecordIOReader.SegmentStatus;
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
Expand Down Expand Up @@ -303,10 +300,7 @@ private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, f

cleanupTempFiles();
updateOldestSegmentReference();
currentSegmentIndex = listSegmentPaths(queuePath)
.map(s -> s.getFileName().toString().split("\\.")[0])
.mapToInt(Integer::parseInt)
.max().orElse(0);
currentSegmentIndex = DeadLetterQueueUtils.maxSegmentId(queuePath);
nextWriter();
this.lastEntryTimestamp = Timestamp.now();
this.flusherService = flusherService;
Expand Down Expand Up @@ -547,8 +541,6 @@ private void deleteExpiredSegments() throws IOException {
updateOldestSegmentReference();
cleanNextSegment = isOldestSegmentExpired();
} while (cleanNextSegment);

this.currentQueueSize.set(computeQueueSize());
}

/**
Expand All @@ -563,8 +555,10 @@ private void deleteExpiredSegments() throws IOException {
* */
private long deleteTailSegment(Path segment, String motivation) throws IOException {
try {
long segmentSize = Files.size(segment);
long eventsInSegment = DeadLetterQueueUtils.countEventsInSegment(segment);
Files.delete(segment);
currentQueueSize.addAndGet(-segmentSize);
logger.debug("Removed segment file {} due to {}", segment, motivation);
return eventsInSegment;
} catch (NoSuchFileException nsfex) {
Expand All @@ -577,9 +571,7 @@ private long deleteTailSegment(Path segment, String motivation) throws IOExcepti
// package-private for testing
void updateOldestSegmentReference() throws IOException {
final Optional<Path> previousOldestSegmentPath = oldestSegmentPath;
oldestSegmentPath = listSegmentPathsSortedBySegmentId(this.queuePath)
.filter(p -> p.toFile().length() > 1) // take the files that have content to process
.findFirst();
oldestSegmentPath = DeadLetterQueueUtils.oldestSegmentPath(this.queuePath, 1);
if (!oldestSegmentPath.isPresent()) {
oldestSegmentTimestamp = Optional.empty();
return;
Expand Down Expand Up @@ -634,14 +626,13 @@ static Optional<Timestamp> readTimestampOfLastEventInSegment(Path segmentPath) t
// package-private for testing
void dropTailSegment() throws IOException {
// remove oldest segment
final Optional<Path> oldestSegment = listSegmentPathsSortedBySegmentId(queuePath).findFirst();
final Optional<Path> oldestSegment = DeadLetterQueueUtils.oldestSegmentPath(queuePath, 0);
if (oldestSegment.isPresent()) {
final Path beheadedSegment = oldestSegment.get();
deleteTailSegment(beheadedSegment, "dead letter queue size exceeded dead_letter_queue.max_bytes size(" + maxQueueSize + ")");
} else {
logger.info("Queue size {} exceeded, but no complete DLQ segments found", maxQueueSize);
}
this.currentQueueSize.set(computeQueueSize());
}

/**
Expand All @@ -656,6 +647,7 @@ private static boolean alreadyProcessed(final Event event) {
return event.includes(DEAD_LETTER_QUEUE_METADATA_KEY);
}

// main method for flush scheduler
private void scheduledFlushCheck() {
logger.trace("Running scheduled check");
lock.lock();
Expand Down Expand Up @@ -716,7 +708,7 @@ private void sealSegment(int segmentIndex, SealReason motivation) throws IOExcep
}

private long computeQueueSize() throws IOException {
return listSegmentPaths(this.queuePath)
return DeadLetterQueueUtils.listSegmentPaths(this.queuePath)
.mapToLong(DeadLetterQueueWriter::safeFileSize)
.sum();
}
Expand Down Expand Up @@ -753,12 +745,12 @@ private void nextWriter() throws IOException {
// segment file with the same base name exists, or rename the
// temp file to the segment file, which can happen when a process ends abnormally
private void cleanupTempFiles() throws IOException {
listFiles(queuePath, ".log.tmp")
DeadLetterQueueUtils.listFiles(queuePath, ".log.tmp")
.forEach(this::cleanupTempFile);
}

// check if there is a corresponding .log file - if yes delete the temp file, if no atomic move the
// temp file to be a new segment file..
// temp file to be a new segment file.
private void cleanupTempFile(final Path tempFile) {
String segmentName = tempFile.getFileName().toString().split("\\.")[0];
Path segmentFile = queuePath.resolve(String.format("%s.log", segmentName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1124,26 +1124,4 @@ public void testReaderLockProhibitMultipleInstances() throws IOException {
}
}
}

@Test
public void testExtractSegmentIdWithValidFileName() {
Path validPath = Paths.get("123.log");
assertEquals(123, DeadLetterQueueUtils.extractSegmentId(validPath));

Path singleDigitPath = Paths.get("1.log");
assertEquals(1, DeadLetterQueueUtils.extractSegmentId(singleDigitPath));

Path largeNumberPath = Paths.get("999999.log");
assertEquals(999999, DeadLetterQueueUtils.extractSegmentId(largeNumberPath));
}

@Test
public void testExtractSegmentIdWithNoLogExtensionThrowsException() {
Path noExtensionPath = Paths.get("123.txt");
IllegalArgumentException exception = Assert.assertThrows(
IllegalArgumentException.class,
() -> DeadLetterQueueUtils.extractSegmentId(noExtensionPath)
);
assertThat(exception.getMessage(), containsString("Invalid segment file name"));
}
}
Loading
Loading