future : futures) {
+ future.get(10, TimeUnit.SECONDS);
+ }
+
+ assertEquals(0, ioControl.used());
+ assertEquals(0, ioControl.iocbGet());
+ assertEquals(0, ioControl.iocbPut());
+ assertTrue(ioControl.isValid());
+ } finally {
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+}
diff --git a/artemis-journal/pom.xml b/artemis-journal/pom.xml
index 5edc4c9ea8a..46516d095dd 100644
--- a/artemis-journal/pom.xml
+++ b/artemis-journal/pom.xml
@@ -92,4 +92,56 @@
test
+
+
+ jdk24onwards
+
+ [24,)
+
+
+
+ org.apache.artemis
+ artemis-ffm
+ ${project.version}
+ compile
+
+
+
+
+
+ maven-compiler-plugin
+
+
+ java24-compile
+ compile
+
+ compile
+
+
+ 24
+ ${project.basedir}/src/main/java24
+ true
+
+
+
+
+
+ maven-jar-plugin
+
+
+
+ default-jar
+ process-test-classes
+
+ jar
+
+
+
+
+
+
+
+
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
index c999d936f48..f7c0d00204b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
@@ -34,6 +34,18 @@ default IOCriticalErrorListener getCriticalErrorListener() {
default void setCriticalErrorListener(IOCriticalErrorListener listener) {
}
+ default SequentialFileFactory disableBufferReuse() {
+ return this;
+ }
+
+ default ByteBuffer newNativeBuffer(int size, int alignment) {
+ throw new UnsupportedOperationException();
+ }
+
+ default void freeNativeBuffer(ByteBuffer buffer) {
+ throw new UnsupportedOperationException();
+ }
+
default CriticalAnalyzer getCriticalAnalyzer() {
return null;
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index 40dabfc2350..09b699e3536 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -132,8 +132,10 @@ public void enableBufferReuse() {
this.reuseBuffers = true;
}
- public void disableBufferReuse() {
+ @Override
+ public AIOSequentialFileFactory disableBufferReuse() {
this.reuseBuffers = false;
+ return this;
}
@Override
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio2/AIO2Helper.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio2/AIO2Helper.java
new file mode 100644
index 00000000000..f0df184b2f8
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio2/AIO2Helper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io.aio2;
+
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AIO2 helper for JDK less than version 22.
+ * This version uses stub implementations that throw UnsupportedOperationException.
+ * For JDK 22+, see the real implementation in src/main/java22.
+ */
+public class AIO2Helper {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static boolean isSupported() {
+ logger.info("AIO2Helper from earlier JDKs being used");
+ return false;
+ }
+
+ public static long getTotalMaxIO() {
+ return 0;
+ }
+
+ public static SequentialFileFactory getAIO2SequentialFileFactory(File journalDir, int maxIO) {
+ return null;
+ }
+
+ public static SequentialFileFactory getAIO2SequentialFileFactory(File journalDir,
+ IOCriticalErrorListener listener,
+ int maxIO) {
+ return null;
+ }
+
+ public static SequentialFileFactory getAIO2SequentialFileFactory(File journalDir,
+ int bufferSize,
+ int bufferTimeout,
+ int maxIO,
+ boolean logRates) {
+ return null;
+ }
+
+ public static SequentialFileFactory getAIO2SequentialFileFactory(File journalDir,
+ int bufferSize,
+ int bufferTimeout,
+ int maxIO,
+ boolean logRates,
+ IOCriticalErrorListener listener,
+ CriticalAnalyzer analyzer) {
+ return null;
+ }
+
+}
\ No newline at end of file
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index 8f144598c04..e9381d3d4aa 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -98,6 +98,7 @@ public MappedSequentialFileFactory enableBufferReuse() {
return this;
}
+ @Override
public MappedSequentialFileFactory disableBufferReuse() {
this.bufferPooling = false;
return this;
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
index ead344b8f03..c176f680d5b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -110,8 +110,10 @@ public void enableBufferReuse() {
this.bufferPooling = true;
}
- public void disableBufferReuse() {
+ @Override
+ public NIOSequentialFileFactory disableBufferReuse() {
this.bufferPooling = false;
+ return this;
}
@Override
diff --git a/artemis-journal/src/main/java24/org/apache/activemq/artemis/core/io/aio2/AIO2Helper.java b/artemis-journal/src/main/java24/org/apache/activemq/artemis/core/io/aio2/AIO2Helper.java
new file mode 100644
index 00000000000..e2b41afb72e
--- /dev/null
+++ b/artemis-journal/src/main/java24/org/apache/activemq/artemis/core/io/aio2/AIO2Helper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io.aio2;
+
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.apache.artemis.nativo.jlibaio.LibaioContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AIO2 helper for JDK 22+.
+ * This version uses the real AIO2SequentialFileFactory implementation with Panama FFM support.
+ * For JDK < 22, see the stub version in src/main/java.
+ */
+public class AIO2Helper {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static boolean isSupported() {
+ return AIO2SequentialFileFactory.isSupported();
+ }
+
+ public static long getTotalMaxIO() {
+ return 0;
+ }
+
+ public static SequentialFileFactory getAIO2SequentialFileFactory(File journalDir, int maxIO) {
+ try {
+ return new AIO2SequentialFileFactory(journalDir, maxIO);
+ } catch (UnsupportedOperationException | LinkageError e) {
+ logger.debug("AIO2 not available: {}", e.getMessage(), e);
+ return null;
+ }
+ }
+
+ public static SequentialFileFactory getAIO2SequentialFileFactory(File journalDir,
+ IOCriticalErrorListener listener,
+ int maxIO) {
+ try {
+ return new AIO2SequentialFileFactory(journalDir, listener, maxIO);
+ } catch (UnsupportedOperationException | LinkageError e) {
+ logger.debug("AIO2 not available: {}", e.getMessage(), e);
+ return null;
+ }
+ }
+
+ public static SequentialFileFactory getAIO2SequentialFileFactory(File journalDir,
+ int bufferSize,
+ int bufferTimeout,
+ int maxIO,
+ boolean logRates) {
+ try {
+ return new AIO2SequentialFileFactory(journalDir, bufferSize, bufferTimeout, maxIO, logRates);
+ } catch (UnsupportedOperationException | LinkageError e) {
+ logger.debug("AIO2 not available: {}", e.getMessage(), e);
+ return null;
+ }
+ }
+
+ public static SequentialFileFactory getAIO2SequentialFileFactory(File journalDir,
+ int bufferSize,
+ int bufferTimeout,
+ int maxIO,
+ boolean logRates,
+ IOCriticalErrorListener listener,
+ CriticalAnalyzer analyzer) {
+ try {
+ return new AIO2SequentialFileFactory(journalDir, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer);
+ } catch (UnsupportedOperationException | LinkageError e) {
+ logger.debug("AIO2 not available: {}", e.getMessage(), e);
+ return null;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/artemis-journal/src/main/java24/org/apache/activemq/artemis/core/io/aio2/AIO2SequentialFile.java b/artemis-journal/src/main/java24/org/apache/activemq/artemis/core/io/aio2/AIO2SequentialFile.java
new file mode 100644
index 00000000000..034f2e3a746
--- /dev/null
+++ b/artemis-journal/src/main/java24/org/apache/activemq/artemis/core/io/aio2/AIO2SequentialFile.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.aio2;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQNativeIOError;
+import org.apache.activemq.artemis.core.io.AbstractSequentialFile;
+import org.apache.activemq.artemis.core.io.DummyCallback;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
+import org.apache.artemis.nativo.jlibaio.LibaioFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is implementing Runnable to reuse a callback to close it.
+ */
+public class AIO2SequentialFile extends AbstractSequentialFile {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private boolean opened = false;
+
+ private LibaioFile aioFile;
+
+ private final AIO2SequentialFileFactory aioFactory;
+
+ /**
+ * Used to determine the next writing sequence
+ */
+ private final AtomicLong nextWritingSequence = new AtomicLong(0);
+
+ /**
+ * AIO can't guarantee ordering over callbacks.
+ *
+ * We use this {@link PriorityQueue} to hold values until they are in order
+ */
+ final PriorityQueue pendingCallbackList = new PriorityQueue<>();
+
+ /**
+ * Used to determine the next writing sequence. This is accessed from a single thread (the Poller Thread)
+ */
+ private long nextReadSequence = 0;
+
+ public AIO2SequentialFile(final AIO2SequentialFileFactory factory,
+ final int bufferSize,
+ final long bufferTimeoutMilliseconds,
+ final File directory,
+ final String fileName) {
+ super(directory, fileName, factory);
+ this.aioFactory = factory;
+ }
+
+ @Override
+ public ByteBuffer map(int position, long size) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return opened;
+ }
+
+ @Override
+ public int calculateBlockStart(final int position) {
+ return factory.calculateBlockSize(position);
+ }
+
+ @Override
+ public SequentialFile cloneFile() {
+ return new AIO2SequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName());
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException, ActiveMQException {
+ close(true, true);
+ }
+
+ @Override
+ public synchronized void close(boolean waitSync,
+ boolean blockOnWait) throws IOException, InterruptedException, ActiveMQException {
+ // a double call on close, should result on it waitingNotPending before another close is called
+ if (!opened) {
+ return;
+ }
+
+ aioFactory.beforeClose();
+
+ super.close();
+ opened = false;
+ this.timedBuffer = null;
+
+ try {
+ aioFile.close();
+ } catch (Throwable e) {
+ // an exception here would means a double
+ logger.debug("Exeption while closing file", e);
+ } finally {
+ aioFile = null;
+ aioFactory.afterClose();
+ }
+ }
+
+ @Override
+ public synchronized void fill(final int size) throws Exception {
+ logger.trace("Filling file: {}", getFileName());
+
+ checkOpened();
+ aioFile.fill(aioFactory.getAlignment(), size);
+
+ fileSize = aioFile.getSize();
+ }
+
+ @Override
+ public void open() throws Exception {
+ open(aioFactory.getMaxIO(), true);
+ }
+
+ @Override
+ public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException, IOException {
+ // in case we are opening a file that was just closed, we need to wait previous executions to be done
+ if (opened) {
+ return;
+ }
+ opened = true;
+
+ logger.trace("Opening file: {}", getFileName());
+
+ try {
+ aioFile = aioFactory.libaioContext.openFile(getFile(), factory.isDatasync());
+ } catch (IOException e) {
+ logger.error("Error opening file: {}", getFileName());
+ factory.onIOError(e, e.getMessage(), this);
+ throw new ActiveMQNativeIOError(e.getMessage(), e);
+ }
+
+ position.set(0);
+
+ fileSize = aioFile.getSize();
+ }
+
+ @Override
+ public int read(final ByteBuffer bytes, final IOCallback callback) throws ActiveMQException {
+ checkOpened();
+ int bytesToRead = bytes.limit();
+ long positionToRead = position.getAndAdd(bytesToRead);
+
+ bytes.rewind();
+
+ try {
+ // We don't send the buffer to the callback on read,
+ // because we want the buffer available.
+ // Sending it through the callback would make it released
+ aioFile.read(positionToRead, bytesToRead, bytes, getCallback(callback, null));
+ } catch (IOException e) {
+ logger.error("IOError reading file: {}", getFileName(), e);
+ factory.onIOError(e, e.getMessage(), this);
+ throw new ActiveMQNativeIOError(e.getMessage(), e);
+ }
+
+ return bytesToRead;
+ }
+
+ @Override
+ public int read(final ByteBuffer bytes) throws Exception {
+ SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
+
+ int bytesRead = read(bytes, waitCompletion);
+
+ waitCompletion.waitCompletion();
+
+ return bytesRead;
+ }
+
+ @Override
+ public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Write Direct, Sync: {} File: {}", sync, getFileName());
+ }
+
+ if (sync) {
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+ writeDirect(bytes, true, completion);
+
+ completion.waitCompletion();
+ } else {
+ writeDirect(bytes, false, DummyCallback.getInstance());
+ }
+ }
+
+ @Override
+ public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
+ logger.trace("Write Direct, Sync: true File: {}", getFileName());
+
+ final SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+ try {
+ checkOpened();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ completion.onError(-1, e.getClass() + " during blocking write direct: " + e.getMessage());
+ return;
+ }
+
+ final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+
+ final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+ final AIO2SequentialFileFactory.AIO2SequentialCallback runnableCallback = getCallback(completion, bytes, releaseBuffer);
+ runnableCallback.initWrite(positionToWrite, bytesToWrite);
+ runnableCallback.run();
+
+ completion.waitCompletion();
+ }
+
+ /**
+ * Note: Parameter sync is not used on AIO
+ */
+ @Override
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) {
+ try {
+ checkOpened();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ callback.onError(-1, e.getClass() + " during write direct: " + e.getMessage());
+ return;
+ }
+
+ final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+
+ final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+ AIO2SequentialFileFactory.AIO2SequentialCallback runnableCallback = getCallback(callback, bytes);
+ runnableCallback.initWrite(positionToWrite, bytesToWrite);
+ runnableCallback.run();
+ }
+
+ AIO2SequentialFileFactory.AIO2SequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {
+ return getCallback(originalCallback, buffer, true);
+ }
+
+ AIO2SequentialFileFactory.AIO2SequentialCallback getCallback(IOCallback originalCallback,
+ ByteBuffer buffer,
+ boolean releaseBuffer) {
+ AIO2SequentialFileFactory.AIO2SequentialCallback callback = aioFactory.getCallback();
+ callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer, releaseBuffer);
+ return callback;
+ }
+
+ void done(AIO2SequentialFileFactory.AIO2SequentialCallback callback) {
+ if (callback.writeSequence == -1) {
+ callback.sequentialDone();
+ }
+
+ if (callback.writeSequence == nextReadSequence) {
+ nextReadSequence++;
+ try {
+ callback.sequentialDone();
+ } finally {
+ flushCallbacks();
+ }
+ } else {
+ pendingCallbackList.add(callback);
+ }
+
+ }
+
+ private void flushCallbacks() {
+ while (!pendingCallbackList.isEmpty() && pendingCallbackList.peek().writeSequence == nextReadSequence) {
+ AIO2SequentialFileFactory.AIO2SequentialCallback callback = pendingCallbackList.poll();
+ try {
+ callback.sequentialDone();
+ } finally {
+ nextReadSequence++;
+ }
+ }
+ }
+
+ @Override
+ public void sync() {
+ throw new UnsupportedOperationException("This method is not supported on AIO");
+ }
+
+ @Override
+ public long size() throws Exception {
+ if (aioFile == null) {
+ return getFile().length();
+ } else {
+ return aioFile.getSize();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AIOSequentialFile{" + getFileName() + ", opened=" + opened + '}';
+ }
+
+ private void checkOpened() {
+ if (aioFile == null || !opened) {
+ throw new NullPointerException("File not opened, file=null on fileName = " + getFileName());
+ }
+ }
+
+}
diff --git a/artemis-journal/src/main/java24/org/apache/activemq/artemis/core/io/aio2/AIO2SequentialFileFactory.java b/artemis-journal/src/main/java24/org/apache/activemq/artemis/core/io/aio2/AIO2SequentialFileFactory.java
new file mode 100644
index 00000000000..0c628303087
--- /dev/null
+++ b/artemis-journal/src/main/java24/org/apache/activemq/artemis/core/io/aio2/AIO2SequentialFileFactory.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.aio2;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.foreign.MemorySegment;
+import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.artemis.nativo.jlibaio.LibaioContext;
+import org.apache.artemis.nativo.jlibaio.LibaioFile;
+import org.apache.artemis.nativo.jlibaio.SubmitInfo;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.PowerOf2Util;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.jctools.queues.MpmcArrayQueue;
+import org.jctools.queues.atomic.MpmcAtomicArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AIO2SequentialFileFactory extends AbstractSequentialFileFactory {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // This is useful in cases where you want to disable loading the native library. (e.g. testsuite)
+ private static final boolean DISABLED = System.getProperty(AIO2SequentialFileFactory.class.getName() + ".DISABLED") != null;
+
+ static {
+ // This is usually only used on testsuite.
+ // In case it's used, I would rather have it on the loggers so we know what's happening
+ if (DISABLED) {
+
+ // This is only used in tests, hence I'm not creating a Logger for this
+ logger.info("{}.DISABLED = true", AIO2SequentialFileFactory.class.getName());
+ }
+ }
+
+ private final ReuseBuffersController buffersControl = new ReuseBuffersController();
+
+ private volatile boolean reuseBuffers = true;
+
+ private Thread pollerThread;
+
+ volatile LibaioContext libaioContext;
+
+ private final Queue callbackPool;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ private static final String AIO_TEST_FILE = ".aio-test";
+
+ @Override
+ public boolean isSyncSupported() {
+ return false;
+ }
+
+ public void beforeClose() {
+ }
+
+ public void afterClose() {
+ }
+
+ public AIO2SequentialFileFactory(final File journalDir, int maxIO) {
+ this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null, null);
+ }
+
+ public AIO2SequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, int maxIO) {
+ this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, listener, null);
+ }
+
+ public AIO2SequentialFileFactory(final File journalDir,
+ final int bufferSize,
+ final int bufferTimeout,
+ final int maxIO,
+ final boolean logRates) {
+ this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null, null);
+ }
+
+ public AIO2SequentialFileFactory(final File journalDir,
+ final int bufferSize,
+ final int bufferTimeout,
+ final int maxIO,
+ final boolean logRates,
+ final IOCriticalErrorListener listener,
+ final CriticalAnalyzer analyzer) {
+ super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer);
+ logger.debug("CONSTRUCTOR: bufferSize={}, DEFAULT={}", this.bufferSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO);
+ if (maxIO == 1) {
+ logger.warn("Using journal-max-io 1 isn't a proper use of ASYNCIO journal: consider rise this value or use NIO.");
+ }
+ final int adjustedMaxIO = Math.max(2, maxIO);
+ callbackPool = PlatformDependent.hasUnsafe() ? new MpmcArrayQueue<>(adjustedMaxIO) : new MpmcAtomicArrayQueue<>(adjustedMaxIO);
+ logger.trace("New AIO File Created");
+ }
+
+ public AIO2SequentialCallback getCallback() {
+ AIO2SequentialCallback callback = callbackPool.poll();
+ if (callback == null) {
+ callback = new AIO2SequentialCallback();
+ }
+
+ return callback;
+ }
+
+ public void enableBufferReuse() {
+ this.reuseBuffers = true;
+ }
+
+ @Override
+ public AIO2SequentialFileFactory disableBufferReuse() {
+ this.reuseBuffers = false;
+ return this;
+ }
+
+ @Override
+ public SequentialFile createSequentialFile(final String fileName) {
+ return new AIO2SequentialFile(this, bufferSize, bufferTimeout, journalDir, fileName);
+ }
+
+ @Override
+ public boolean isSupportsCallbacks() {
+ return true;
+ }
+
+ public static boolean isSupported() {
+ return !DISABLED && LibaioContext.isLoaded();
+ }
+
+ public static boolean isSupported(File journalPath) {
+ if (!isSupported()) {
+ return false;
+ }
+
+ File aioTestFile = new File(journalPath, AIO_TEST_FILE);
+ try {
+ int fd = LibaioContext.open(aioTestFile.getAbsolutePath(), true);
+ LibaioContext.close(fd);
+ aioTestFile.delete();
+ } catch (Exception e) {
+ // try to handle the file using plain Java
+ // return false if and only if we can create/remove the file using
+ // plain Java but not using AIO
+ try {
+ if (!aioTestFile.exists()) {
+ if (!aioTestFile.createNewFile()) {
+ return true;
+ }
+ }
+ if (!aioTestFile.delete()) {
+ return true;
+ }
+ } catch (Exception ie) {
+ // we can not even create the test file using plain java
+ return true;
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ByteBuffer allocateDirectBuffer(final int size) {
+
+ final int alignedSize = calculateBlockSize(size);
+
+ // The buffer on AIO has to be a multiple of getAlignment()
+ ByteBuffer buffer = LibaioContext.newAlignedBuffer(alignedSize, getAlignment()).asByteBuffer();
+
+ buffer.limit(size);
+
+ return buffer;
+ }
+
+ @Override
+ public void releaseDirectBuffer(final ByteBuffer buffer) {
+ buffer.clear();
+ LibaioContext.freeBuffer(MemorySegment.ofBuffer(buffer));
+ }
+
+ @Override
+ public ByteBuffer newBuffer(int size) {
+ return newBuffer(size, true);
+ }
+
+ @Override
+ public ByteBuffer newBuffer(int size, boolean zeroed) {
+ final int alignedSize = calculateBlockSize(size);
+ return buffersControl.newBuffer(alignedSize, zeroed);
+ }
+
+ @Override
+ public void clearBuffer(final ByteBuffer directByteBuffer) {
+ directByteBuffer.position(0);
+ if (PlatformDependent.hasUnsafe()) {
+ // that's the same semantic of libaioContext.memsetBuffer: it hasn't any JNI cost
+ ByteUtil.zeros(directByteBuffer, 0, directByteBuffer.limit());
+ } else {
+ // JNI cost
+ libaioContext.memsetBuffer(directByteBuffer);
+ }
+ }
+
+ @Override
+ public int getAlignment() {
+ if (alignment < 0) {
+ alignment = calculateAlignment(journalDir);
+ }
+ return alignment;
+ }
+
+ @Override
+ public ByteBuffer newNativeBuffer(int size, int alignment) {
+ return LibaioContext.newAlignedBuffer(size, alignment).asByteBuffer();
+ }
+
+
+ @Override
+ public void freeNativeBuffer(ByteBuffer buffer) {
+ LibaioContext.freeBuffer(MemorySegment.ofBuffer(buffer));
+ }
+
+ private static int calculateAlignment(File journalDir) {
+ File checkFile = null;
+ int alignment;
+ try {
+ journalDir.mkdirs();
+ checkFile = File.createTempFile("journalCheck", ".tmp", journalDir);
+ checkFile.mkdirs();
+ checkFile.createNewFile();
+ alignment = LibaioContext.getBlockSize(checkFile);
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ alignment = 512;
+ } finally {
+ if (checkFile != null) {
+ checkFile.delete();
+ }
+ }
+ return alignment;
+ }
+
+ // For tests only
+ @Override
+ public ByteBuffer wrapBuffer(final byte[] bytes) {
+ ByteBuffer newbuffer = newBuffer(bytes.length);
+ newbuffer.put(bytes);
+ return newbuffer;
+ }
+
+ @Override
+ public int calculateBlockSize(final int position) {
+ final int alignment = getAlignment();
+ if (!PowerOf2Util.isPowOf2(alignment)) {
+ return align(position, alignment);
+ } else {
+ return PowerOf2Util.align(position, alignment);
+ }
+ }
+
+ /**
+ * It can be used to align {@code size} if alignment is not a power of 2: otherwise better to use
+ * {@link PowerOf2Util#align(int, int)} instead.
+ */
+ private static int align(int size, int alignment) {
+ return (size / alignment + (size % alignment != 0 ? 1 : 0)) * alignment;
+ }
+
+ @Override
+ public synchronized void releaseBuffer(final ByteBuffer buffer) {
+ // resetting buffer offsets to original
+ buffer.clear();
+ LibaioContext.freeBuffer(MemorySegment.ofBuffer(buffer));
+ }
+
+ @Override
+ public void start() {
+ if (running.compareAndSet(false, true)) {
+ super.start();
+
+ this.libaioContext = new LibaioContext(maxIO, true, dataSync);
+
+ this.running.set(true);
+
+ pollerThread = new PollerThread();
+ pollerThread.start();
+ }
+
+ }
+
+ @Override
+ public void stop() {
+ if (this.running.compareAndSet(true, false)) {
+ buffersControl.stop();
+
+ libaioContext.close();
+ libaioContext = null;
+
+ if (pollerThread != null) {
+ try {
+ pollerThread.join(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT * 1000);
+
+ if (pollerThread.isAlive()) {
+ ActiveMQJournalLogger.LOGGER.timeoutOnPollerShutdown(new Exception("trace"));
+ }
+ } catch (InterruptedException e) {
+ throw new ActiveMQInterruptedException(e);
+ }
+ }
+
+ super.stop();
+ }
+ }
+
+ /**
+ * The same callback is used for Runnable executor. This way we can save some memory over the pool.
+ */
+ public class AIO2SequentialCallback implements SubmitInfo, Runnable, Comparable {
+
+ IOCallback callback;
+ boolean error = false;
+ AIO2SequentialFile sequentialFile;
+ ByteBuffer buffer;
+ LibaioFile libaioFile;
+ String errorMessage;
+ int errorCode = -1;
+ long writeSequence;
+ boolean releaseBuffer;
+ long position;
+ int bytes;
+
+ @Override
+ public String toString() {
+ return "AIOSequentialCallback{" + "error=" + error + ", errorMessage='" + errorMessage + '\'' + ", errorCode=" + errorCode + ", writeSequence=" + writeSequence + ", releaseBuffer=" + releaseBuffer + ", position=" + position + '}';
+ }
+
+ public AIO2SequentialCallback initWrite(long positionToWrite, int bytesToWrite) {
+ this.position = positionToWrite;
+ this.bytes = bytesToWrite;
+ return this;
+ }
+
+ @Override
+ public void run() {
+ try {
+ libaioFile.write(position, bytes, buffer, this);
+ } catch (IOException e) {
+ callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getClass() + " during write to " + sequentialFile.getFileName() + ": " + e.getMessage());
+ onIOError(e, "Failed to write to file", sequentialFile);
+ }
+ }
+
+ @Override
+ public int compareTo(AIO2SequentialCallback other) {
+ if (this == other || this.writeSequence == other.writeSequence) {
+ return 0;
+ } else if (other.writeSequence < this.writeSequence) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+
+ public AIO2SequentialCallback init(long writeSequence,
+ IOCallback IOCallback,
+ LibaioFile libaioFile,
+ AIO2SequentialFile sequentialFile,
+ ByteBuffer usedBuffer,
+ boolean releaseBuffer) {
+ this.callback = IOCallback;
+ this.sequentialFile = sequentialFile;
+ this.error = false;
+ this.buffer = usedBuffer;
+ this.libaioFile = libaioFile;
+ this.writeSequence = writeSequence;
+ this.errorMessage = null;
+ this.releaseBuffer = releaseBuffer;
+ return this;
+ }
+
+ @Override
+ public void onError(int errno, String message) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("AIO on error issued. Error(code: {} msg: {})", errno, message);
+ }
+
+ this.error = true;
+ this.errorCode = errno;
+ this.errorMessage = message;
+ }
+
+ /**
+ * this is called by libaio.
+ */
+ @Override
+ public void done() {
+ this.sequentialFile.done(this);
+ }
+
+ /**
+ * This is callbed by the AIOSequentialFile, after determined the callbacks were returned in sequence
+ */
+ public void sequentialDone() {
+
+ if (error) {
+ if (callback != null) {
+ callback.onError(errorCode, errorMessage);
+ }
+ onIOError(new ActiveMQException(errorCode, errorMessage), errorMessage);
+ errorMessage = null;
+ } else {
+ if (callback != null) {
+ callback.done();
+ }
+
+ if (buffer != null && reuseBuffers && releaseBuffer) {
+ buffersControl.bufferDone(buffer);
+ }
+
+ callbackPool.offer(AIO2SequentialCallback.this);
+ }
+ }
+ }
+
+ private class PollerThread extends Thread {
+
+ private PollerThread() {
+ super("activemq-libaio-poller");
+ }
+
+ @Override
+ public void run() {
+ while (running.get()) {
+ // To optimize performance, libaioContext.poll should always be invoked from the same thread.
+ // This approach leverages kernel-level efficiencies in context switching.
+ // Consistent polling from a dedicated thread will yield substantial performance gains.
+ try {
+ libaioContext.poll();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ onIOError(new ActiveMQException("Error on libaio poll"), e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Class that will control buffer-reuse
+ */
+ private class ReuseBuffersController {
+
+ private volatile long bufferReuseLastTime = System.currentTimeMillis();
+
+ private final Queue reuseBuffersQueue = new ConcurrentLinkedQueue<>();
+
+ private boolean stopped = false;
+
+ private int alignedBufferSize = 0;
+
+ private int getAlignedBufferSize() {
+ if (alignedBufferSize == 0) {
+ alignedBufferSize = calculateBlockSize(bufferSize);
+ }
+
+ return alignedBufferSize;
+ }
+
+ public ByteBuffer newBuffer(final int size, final boolean zeroed) {
+ // if a new buffer wasn't requested in 10 seconds, we clear the queue
+ // This is being done this way as we don't need another Timeout Thread
+ // just to cleanup this
+ if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Clearing reuse buffers queue with {} elements", reuseBuffersQueue.size());
+ }
+
+ bufferReuseLastTime = System.currentTimeMillis();
+
+ clearPoll();
+ }
+
+ // if a buffer is bigger than the configured-bufferSize, we just create a new
+ // buffer.
+ if (size > getAlignedBufferSize()) {
+ return LibaioContext.newAlignedBuffer(size, getAlignment()).asByteBuffer();
+ } else {
+ // We need to allocate buffers following the rules of the storage
+ // being used (AIO/NIO)
+ final int alignedSize;
+
+ if (size < getAlignedBufferSize()) {
+ alignedSize = getAlignedBufferSize();
+ } else {
+ alignedSize = calculateBlockSize(size);
+ }
+
+ // Try getting a buffer from the queue...
+ ByteBuffer buffer = reuseBuffersQueue.poll();
+
+ if (buffer == null) {
+ // if empty create a new one.
+ buffer = LibaioContext.newAlignedBuffer(alignedSize, getAlignment()).asByteBuffer();
+
+ buffer.limit(calculateBlockSize(size));
+ } else {
+ if (zeroed) {
+ clearBuffer(buffer);
+ } else {
+ buffer.position(0);
+ }
+
+ // set the limit of the buffer to the bufferSize being required
+ buffer.limit(calculateBlockSize(size));
+ }
+
+ buffer.rewind();
+
+ return buffer;
+ }
+ }
+
+ public synchronized void stop() {
+ stopped = true;
+ clearPoll();
+ }
+
+ public synchronized void clearPoll() {
+ ByteBuffer reusedBuffer;
+
+ while ((reusedBuffer = reuseBuffersQueue.poll()) != null) {
+ releaseBuffer(reusedBuffer);
+ }
+ }
+
+ public void bufferDone(final ByteBuffer buffer) {
+ synchronized (this) {
+
+ if (stopped) {
+ releaseBuffer(buffer);
+ } else {
+ bufferReuseLastTime = System.currentTimeMillis();
+
+ // If a buffer has any other than the configured bufferSize, the buffer
+ // will be just sent to GC
+ if (buffer.capacity() == getAlignedBufferSize()) {
+ reuseBuffersQueue.offer(buffer);
+ } else {
+ releaseBuffer(buffer);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return AIO2SequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped + "):" + super.toString();
+ }
+}
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index 875ca798626..32e2677d920 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -382,5 +382,20 @@