diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkFileSystemFileIO.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkFileSystemFileIO.java
new file mode 100644
index 000000000000..70826d640db4
--- /dev/null
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkFileSystemFileIO.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.DelegateFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This {@link FileIO} implementation should be used when the FileSystem behind the Iceberg tables
+ * is also used for other purposes in a Flink job, like checkpoints/savepoints. Setting
+ * FlinkFileSystemFileIO prevents duplicated FileSystem related configurations and allows the usage
+ * of features already provided by the Flink FileSystem plugins, like Delegation Tokens.
+ *
+ *
The FlinkFileSystemFileIO should be set during catalog creation using the {@link
+ * org.apache.iceberg.CatalogProperties#FILE_IO_IMPL} property.
+ *
+ *
The FlinkFileSystemFileIO never should be set using table properties, as other engines will
+ * not be able to use the table in this case.
+ */
+public class FlinkFileSystemFileIO implements DelegateFileIO {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkFileSystemFileIO.class);
+ private static final String DELETE_FILE_PARALLELISM =
+ "iceberg.flinkfilesystem.delete-file-parallelism";
+ private static final String DELETE_FILE_POOL_NAME = "iceberg-flinkfilesystem-delete";
+ private static final int DELETE_RETRY_ATTEMPTS = 3;
+ private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+ private static volatile ExecutorService executorService;
+ private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of());
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return new FlinkInputFile(new Path(path));
+ }
+
+ @Override
+ public InputFile newInputFile(String path, long length) {
+ return new FlinkInputFile(new Path(path), length);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return new FlinkOutputFile(new Path(path));
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ Path toDelete = new Path(path);
+ try {
+ toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e);
+ }
+ }
+
+ @Override
+ public Iterable listPrefix(String prefix) {
+ LOG.debug("Listing {}", prefix);
+ Path prefixToList = new Path(prefix);
+ try {
+ return listPrefix(prefixToList.getFileSystem(), prefixToList);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to listing prefix: %s", prefix), e);
+ }
+ }
+
+ @Override
+ public void deletePrefix(String prefix) {
+ Path prefixToDelete = new Path(prefix);
+
+ try {
+ prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to delete prefix: %s", prefix), e);
+ }
+ }
+
+ @Override
+ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException {
+ AtomicInteger failureCount = new AtomicInteger(0);
+ Tasks.foreach(pathsToDelete)
+ .executeWith(executorService())
+ .retry(DELETE_RETRY_ATTEMPTS)
+ .stopRetryOn(FileNotFoundException.class)
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (f, e) -> {
+ LOG.error("Failure during bulk delete on file: {} ", f, e);
+ failureCount.incrementAndGet();
+ })
+ .run(this::deleteFile);
+
+ if (failureCount.get() != 0) {
+ throw new BulkDeletionFailureException(failureCount.get());
+ }
+ }
+
+ @Override
+ public void initialize(Map props) {
+ this.properties = SerializableMap.copyOf(props);
+ }
+
+ @Override
+ public Map properties() {
+ return properties.immutableMap();
+ }
+
+ private Iterable listPrefix(FileSystem fs, Path fileName) {
+ try {
+ FileStatus[] statuses = fs.listStatus(fileName);
+ LOG.debug("Listing path {} {}", fileName, fs.listStatus(fileName));
+ if (statuses == null) {
+ // Check the file is there and ready. If so, then we can assume this is an empty dir.
+ fs.getFileStatus(fileName);
+ statuses = new FileStatus[0];
+ }
+
+ return Iterables.concat(
+ Arrays.stream(statuses)
+ .map(
+ fileStatus -> {
+ if (fileStatus.isDir()) {
+ return listPrefix(fs, fileStatus.getPath());
+ } else {
+ return Collections.singleton(
+ new FileInfo(
+ fileStatus.getPath().toString(),
+ fileStatus.getLen(),
+ fileStatus.getModificationTime()));
+ }
+ })
+ .collect(Collectors.toList()));
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to list path recursively: %s", fileName), e);
+ }
+ }
+
+ private ExecutorService executorService() {
+ if (executorService == null) {
+ synchronized (FlinkFileSystemFileIO.class) {
+ if (executorService == null) {
+ executorService = ThreadPools.newWorkerPool(DELETE_FILE_POOL_NAME, deleteThreads());
+ }
+ }
+ }
+
+ return executorService;
+ }
+
+ private int deleteThreads() {
+ int defaultValue = Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE;
+ return properties.containsKey(DELETE_FILE_PARALLELISM)
+ ? Integer.parseInt(properties.get(DELETE_FILE_PARALLELISM))
+ : defaultValue;
+ }
+}
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java
new file mode 100644
index 000000000000..051f426bf1dc
--- /dev/null
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.encryption.NativeFileCryptoParameters;
+import org.apache.iceberg.encryption.NativelyEncryptedFile;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.DelegatingInputStream;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkInputFile implements InputFile, NativelyEncryptedFile {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkInputFile.class);
+
+ private final Path path;
+ private final FileSystem fs;
+ private FileStatus stat = null;
+ private Long length = null;
+ private NativeFileCryptoParameters nativeDecryptionParameters;
+
+ public FlinkInputFile(Path path) {
+ this.path = path;
+ try {
+ this.fs = path.getFileSystem();
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to get file system for path: %s", path), e);
+ }
+ }
+
+ public FlinkInputFile(Path path, long length) {
+ this(path);
+ this.length = length;
+ }
+
+ @Override
+ public long getLength() {
+ if (length == null) {
+ this.length = lazyStat().getLen();
+ }
+
+ return length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ try {
+ return new FlinkSeekableInputStream(path.getFileSystem().open(path));
+ } catch (FileNotFoundException e) {
+ throw new NotFoundException(e, "Failed to open input stream for file: %s", path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to open input stream for file: %s", path), e);
+ }
+ }
+
+ @Override
+ public String location() {
+ return path.toString();
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return lazyStat() != null;
+ } catch (NotFoundException e) {
+ return false;
+ }
+ }
+
+ private FileStatus lazyStat() {
+ if (stat == null) {
+ try {
+ this.stat = fs.getFileStatus(path);
+ } catch (FileNotFoundException e) {
+ throw new NotFoundException(e, "File does not exist: %s", path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to get status for file: %s", path), e);
+ }
+ }
+
+ return stat;
+ }
+
+ /** SeekableInputStream implementation for Flink FSDataInputStream. */
+ private static class FlinkSeekableInputStream extends SeekableInputStream
+ implements DelegatingInputStream {
+ private final FSDataInputStream stream;
+ private final StackTraceElement[] createStack;
+ private boolean closed;
+
+ FlinkSeekableInputStream(FSDataInputStream stream) {
+ this.stream = stream;
+ this.createStack = Thread.currentThread().getStackTrace();
+ this.closed = false;
+ }
+
+ @Override
+ public InputStream getDelegate() {
+ return stream;
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ this.closed = true;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ stream.seek(newPos);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return stream.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ @SuppressWarnings({"checkstyle:NoFinalizer", "Finalize", "deprecation"})
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!closed) {
+ close(); // releasing resources is more important than printing the warning
+ String trace =
+ Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
+ LOG.warn("Unclosed input stream created by:\n\t{}", trace);
+ }
+ }
+ }
+
+ @Override
+ public NativeFileCryptoParameters nativeCryptoParameters() {
+ return nativeDecryptionParameters;
+ }
+
+ @Override
+ public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
+ this.nativeDecryptionParameters = nativeCryptoParameters;
+ }
+}
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkOutputFile.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkOutputFile.java
new file mode 100644
index 000000000000..7b538b7d0368
--- /dev/null
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkOutputFile.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.encryption.NativeFileCryptoParameters;
+import org.apache.iceberg.encryption.NativelyEncryptedFile;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.DelegatingOutputStream;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkOutputFile implements OutputFile, NativelyEncryptedFile {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkOutputFile.class);
+
+ private final FileSystem fs;
+ private final Path path;
+ private NativeFileCryptoParameters nativeDecryptionParameters;
+
+ public FlinkOutputFile(Path path) {
+ this.path = path;
+ try {
+ this.fs = path.getFileSystem();
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to get file system for path: %s", path), e);
+ }
+ }
+
+ @Override
+ public PositionOutputStream create() {
+ try {
+ // Flink's FileSystem plugins do not contractually guarantee a typed exception when a file
+ // already exists with WriteMode.NO_OVERWRITE, so the existence check is done explicitly to
+ // surface an Iceberg AlreadyExistsException without reaching for a Hadoop dependency.
+ if (fileExists()) {
+ throw new AlreadyExistsException("Path already exists: %s", path);
+ }
+
+ return new FlinkPositionOutputStream(fs.create(path, FileSystem.WriteMode.NO_OVERWRITE));
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to create file: %s", path), e);
+ }
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite() {
+ try {
+ return new FlinkPositionOutputStream(fs.create(path, FileSystem.WriteMode.OVERWRITE));
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to create file: %s", path), e);
+ }
+ }
+
+ @Override
+ public String location() {
+ return path.toString();
+ }
+
+ @Override
+ public InputFile toInputFile() {
+ return new FlinkInputFile(path);
+ }
+
+ private boolean fileExists() throws IOException {
+ try {
+ fs.getFileStatus(path);
+ return true;
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ }
+
+ /** PositionOutputStream implementation for FSDataOutputStream. */
+ private static class FlinkPositionOutputStream extends PositionOutputStream
+ implements DelegatingOutputStream {
+ private final FSDataOutputStream stream;
+ private final StackTraceElement[] createStack;
+ private boolean closed;
+
+ FlinkPositionOutputStream(FSDataOutputStream stream) {
+ this.stream = stream;
+ this.createStack = Thread.currentThread().getStackTrace();
+ this.closed = false;
+ }
+
+ @Override
+ public OutputStream getDelegate() {
+ return stream;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ stream.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ this.closed = true;
+ }
+
+ @SuppressWarnings({"checkstyle:NoFinalizer", "Finalize", "deprecation"})
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!closed) {
+ close(); // releasing resources is more important than printing the warning
+ String trace =
+ Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
+ LOG.warn("Unclosed output stream created by:\n\t{}", trace);
+ }
+ }
+ }
+
+ @Override
+ public NativeFileCryptoParameters nativeCryptoParameters() {
+ return nativeDecryptionParameters;
+ }
+
+ @Override
+ public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
+ this.nativeDecryptionParameters = nativeCryptoParameters;
+ }
+}
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFileSystemFileIO.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFileSystemFileIO.java
new file mode 100644
index 000000000000..bf6a4c08ee30
--- /dev/null
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFileSystemFileIO.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestFlinkFileSystemFileIO {
+ private final Random random = new Random(1);
+
+ private FileSystem fs;
+ private FlinkFileSystemFileIO flinkFileSystemFileIO;
+
+ @TempDir private File tempDir;
+
+ @BeforeEach
+ public void before() throws Exception {
+ fs = FileSystem.getLocalFileSystem();
+
+ flinkFileSystemFileIO = new FlinkFileSystemFileIO();
+ }
+
+ @Test
+ void testListPrefix() {
+ Path parent = new Path(tempDir.toURI());
+
+ List scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+ scaleSizes.parallelStream()
+ .forEach(
+ scale -> {
+ Path scalePath = new Path(parent, Integer.toString(scale));
+
+ createRandomFiles(scalePath, scale);
+ assertThat(
+ Streams.stream(flinkFileSystemFileIO.listPrefix(scalePath.toUri().toString()))
+ .count())
+ .isEqualTo((long) scale);
+ });
+
+ long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum();
+ assertThat(Streams.stream(flinkFileSystemFileIO.listPrefix(parent.toUri().toString())).count())
+ .isEqualTo(totalFiles);
+ }
+
+ @Test
+ void testFileExists() throws IOException {
+ Path parent = new Path(tempDir.toURI());
+ Path randomFilePath = new Path(parent, "random-file-" + UUID.randomUUID());
+ fs.create(randomFilePath, FileSystem.WriteMode.OVERWRITE);
+
+ // check existence of the created file
+ assertThat(flinkFileSystemFileIO.newInputFile(randomFilePath.toUri().toString()).exists())
+ .isTrue();
+ fs.delete(randomFilePath, false);
+ assertThat(flinkFileSystemFileIO.newInputFile(randomFilePath.toUri().toString()).exists())
+ .isFalse();
+ }
+
+ @Test
+ void testDeletePrefix() {
+ Path parent = new Path(tempDir.toURI());
+
+ List scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+ scaleSizes.parallelStream()
+ .forEach(
+ scale -> {
+ Path scalePath = new Path(parent, Integer.toString(scale));
+
+ createRandomFiles(scalePath, scale);
+ flinkFileSystemFileIO.deletePrefix(scalePath.toUri().toString());
+
+ // Hadoop filesystem will throw if the path does not exist
+ assertThatThrownBy(
+ () ->
+ flinkFileSystemFileIO.listPrefix(scalePath.toUri().toString()).iterator())
+ .isInstanceOf(UncheckedIOException.class)
+ .hasMessageContaining("Failed to list path recursively");
+ });
+
+ flinkFileSystemFileIO.deletePrefix(parent.toUri().toString());
+ // Hadoop filesystem will throw if the path does not exist
+ assertThatThrownBy(() -> flinkFileSystemFileIO.listPrefix(parent.toUri().toString()).iterator())
+ .isInstanceOf(UncheckedIOException.class)
+ .hasMessageContaining("Failed to list path recursively");
+ }
+
+ @Test
+ void testDeleteFiles() {
+ Path parent = new Path(tempDir.toURI());
+ List filesCreated = createRandomFiles(parent, 10);
+ flinkFileSystemFileIO.deleteFiles(
+ filesCreated.stream().map(Path::toString).collect(Collectors.toList()));
+ filesCreated.forEach(
+ file -> assertThat(flinkFileSystemFileIO.newInputFile(file.toString()).exists()).isFalse());
+ }
+
+ @Test
+ void testDeleteFilesErrorHandling() {
+ List filesCreated =
+ random.ints(2).mapToObj(x -> "fakefsnotreal://file-" + x).collect(Collectors.toList());
+ assertThatThrownBy(() -> flinkFileSystemFileIO.deleteFiles(filesCreated))
+ .isInstanceOf(BulkDeletionFailureException.class)
+ .hasMessage("Failed to delete 2 files");
+ }
+
+ @Test
+ void testFlinkFileIOReadWrite() throws IOException {
+ FileIO testFlinkFileIO = new FlinkFileSystemFileIO();
+
+ Path parent = new Path(tempDir.toURI());
+ Path randomFilePath = new Path(parent, "random-file-" + UUID.randomUUID());
+ byte[] expected = "DUMMY".getBytes(StandardCharsets.UTF_8);
+
+ // Write
+ OutputFile outputFile = testFlinkFileIO.newOutputFile(randomFilePath.getPath());
+ try (PositionOutputStream outputStream = outputFile.create()) {
+ outputStream.write(expected);
+ }
+
+ // Read
+ InputFile inputFile = testFlinkFileIO.newInputFile(randomFilePath.getPath());
+ try (SeekableInputStream inputStream = inputFile.newStream()) {
+ byte[] actual = new byte[(int) inputFile.getLength()];
+ assertThat(inputStream.read(actual)).isEqualTo(actual.length);
+ assertThat(actual).isEqualTo(expected);
+ }
+ }
+
+ @Test
+ void testFlinkFileIOKryoSerialization() throws IOException {
+ FlinkFileSystemFileIO testFlinkFileIO = new FlinkFileSystemFileIO();
+
+ // Flink fileIO should be serializable through Flink's own Kryo serializer, since Flink uses it
+ // to serialize operators and state. This also avoids depending on a specific esotericsoftware
+ // Kryo version, which differs across the supported Flink lines.
+ testFlinkFileIO.initialize(ImmutableMap.of("k1", "v1"));
+ FlinkFileSystemFileIO roundTripSerializedFileIO =
+ roundTripKryoSerialize(FlinkFileSystemFileIO.class, testFlinkFileIO);
+
+ assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testFlinkFileIO.properties());
+ }
+
+ @Test
+ void testFlinkFileIOJavaSerialization() throws IOException, ClassNotFoundException {
+ FileIO testFlinkFileIO = new FlinkFileSystemFileIO();
+
+ // Flink fileIO should be serializable when properties are passed as immutable map
+ testFlinkFileIO.initialize(ImmutableMap.of("k1", "v1"));
+ FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testFlinkFileIO);
+
+ assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testFlinkFileIO.properties());
+ }
+
+ private List createRandomFiles(Path parent, int count) {
+ Vector paths = new Vector<>();
+ random
+ .ints(count)
+ .parallel()
+ .forEach(
+ i -> {
+ try {
+ Path path = new Path(parent, "file-" + i);
+ paths.add(path);
+ fs.create(path, FileSystem.WriteMode.OVERWRITE);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ return paths;
+ }
+}
diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkFileSystemFileIO.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkFileSystemFileIO.java
new file mode 100644
index 000000000000..70826d640db4
--- /dev/null
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkFileSystemFileIO.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.DelegateFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This {@link FileIO} implementation should be used when the FileSystem behind the Iceberg tables
+ * is also used for other purposes in a Flink job, like checkpoints/savepoints. Setting
+ * FlinkFileSystemFileIO prevents duplicated FileSystem related configurations and allows the usage
+ * of features already provided by the Flink FileSystem plugins, like Delegation Tokens.
+ *
+ * The FlinkFileSystemFileIO should be set during catalog creation using the {@link
+ * org.apache.iceberg.CatalogProperties#FILE_IO_IMPL} property.
+ *
+ *
The FlinkFileSystemFileIO never should be set using table properties, as other engines will
+ * not be able to use the table in this case.
+ */
+public class FlinkFileSystemFileIO implements DelegateFileIO {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkFileSystemFileIO.class);
+ private static final String DELETE_FILE_PARALLELISM =
+ "iceberg.flinkfilesystem.delete-file-parallelism";
+ private static final String DELETE_FILE_POOL_NAME = "iceberg-flinkfilesystem-delete";
+ private static final int DELETE_RETRY_ATTEMPTS = 3;
+ private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+ private static volatile ExecutorService executorService;
+ private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of());
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return new FlinkInputFile(new Path(path));
+ }
+
+ @Override
+ public InputFile newInputFile(String path, long length) {
+ return new FlinkInputFile(new Path(path), length);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return new FlinkOutputFile(new Path(path));
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ Path toDelete = new Path(path);
+ try {
+ toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e);
+ }
+ }
+
+ @Override
+ public Iterable listPrefix(String prefix) {
+ LOG.debug("Listing {}", prefix);
+ Path prefixToList = new Path(prefix);
+ try {
+ return listPrefix(prefixToList.getFileSystem(), prefixToList);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to listing prefix: %s", prefix), e);
+ }
+ }
+
+ @Override
+ public void deletePrefix(String prefix) {
+ Path prefixToDelete = new Path(prefix);
+
+ try {
+ prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to delete prefix: %s", prefix), e);
+ }
+ }
+
+ @Override
+ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException {
+ AtomicInteger failureCount = new AtomicInteger(0);
+ Tasks.foreach(pathsToDelete)
+ .executeWith(executorService())
+ .retry(DELETE_RETRY_ATTEMPTS)
+ .stopRetryOn(FileNotFoundException.class)
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (f, e) -> {
+ LOG.error("Failure during bulk delete on file: {} ", f, e);
+ failureCount.incrementAndGet();
+ })
+ .run(this::deleteFile);
+
+ if (failureCount.get() != 0) {
+ throw new BulkDeletionFailureException(failureCount.get());
+ }
+ }
+
+ @Override
+ public void initialize(Map props) {
+ this.properties = SerializableMap.copyOf(props);
+ }
+
+ @Override
+ public Map properties() {
+ return properties.immutableMap();
+ }
+
+ private Iterable listPrefix(FileSystem fs, Path fileName) {
+ try {
+ FileStatus[] statuses = fs.listStatus(fileName);
+ LOG.debug("Listing path {} {}", fileName, fs.listStatus(fileName));
+ if (statuses == null) {
+ // Check the file is there and ready. If so, then we can assume this is an empty dir.
+ fs.getFileStatus(fileName);
+ statuses = new FileStatus[0];
+ }
+
+ return Iterables.concat(
+ Arrays.stream(statuses)
+ .map(
+ fileStatus -> {
+ if (fileStatus.isDir()) {
+ return listPrefix(fs, fileStatus.getPath());
+ } else {
+ return Collections.singleton(
+ new FileInfo(
+ fileStatus.getPath().toString(),
+ fileStatus.getLen(),
+ fileStatus.getModificationTime()));
+ }
+ })
+ .collect(Collectors.toList()));
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to list path recursively: %s", fileName), e);
+ }
+ }
+
+ private ExecutorService executorService() {
+ if (executorService == null) {
+ synchronized (FlinkFileSystemFileIO.class) {
+ if (executorService == null) {
+ executorService = ThreadPools.newWorkerPool(DELETE_FILE_POOL_NAME, deleteThreads());
+ }
+ }
+ }
+
+ return executorService;
+ }
+
+ private int deleteThreads() {
+ int defaultValue = Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE;
+ return properties.containsKey(DELETE_FILE_PARALLELISM)
+ ? Integer.parseInt(properties.get(DELETE_FILE_PARALLELISM))
+ : defaultValue;
+ }
+}
diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java
new file mode 100644
index 000000000000..051f426bf1dc
--- /dev/null
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.encryption.NativeFileCryptoParameters;
+import org.apache.iceberg.encryption.NativelyEncryptedFile;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.DelegatingInputStream;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkInputFile implements InputFile, NativelyEncryptedFile {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkInputFile.class);
+
+ private final Path path;
+ private final FileSystem fs;
+ private FileStatus stat = null;
+ private Long length = null;
+ private NativeFileCryptoParameters nativeDecryptionParameters;
+
+ public FlinkInputFile(Path path) {
+ this.path = path;
+ try {
+ this.fs = path.getFileSystem();
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to get file system for path: %s", path), e);
+ }
+ }
+
+ public FlinkInputFile(Path path, long length) {
+ this(path);
+ this.length = length;
+ }
+
+ @Override
+ public long getLength() {
+ if (length == null) {
+ this.length = lazyStat().getLen();
+ }
+
+ return length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ try {
+ return new FlinkSeekableInputStream(path.getFileSystem().open(path));
+ } catch (FileNotFoundException e) {
+ throw new NotFoundException(e, "Failed to open input stream for file: %s", path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to open input stream for file: %s", path), e);
+ }
+ }
+
+ @Override
+ public String location() {
+ return path.toString();
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return lazyStat() != null;
+ } catch (NotFoundException e) {
+ return false;
+ }
+ }
+
+ private FileStatus lazyStat() {
+ if (stat == null) {
+ try {
+ this.stat = fs.getFileStatus(path);
+ } catch (FileNotFoundException e) {
+ throw new NotFoundException(e, "File does not exist: %s", path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to get status for file: %s", path), e);
+ }
+ }
+
+ return stat;
+ }
+
+ /** SeekableInputStream implementation for Flink FSDataInputStream. */
+ private static class FlinkSeekableInputStream extends SeekableInputStream
+ implements DelegatingInputStream {
+ private final FSDataInputStream stream;
+ private final StackTraceElement[] createStack;
+ private boolean closed;
+
+ FlinkSeekableInputStream(FSDataInputStream stream) {
+ this.stream = stream;
+ this.createStack = Thread.currentThread().getStackTrace();
+ this.closed = false;
+ }
+
+ @Override
+ public InputStream getDelegate() {
+ return stream;
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ this.closed = true;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ stream.seek(newPos);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return stream.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ @SuppressWarnings({"checkstyle:NoFinalizer", "Finalize", "deprecation"})
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!closed) {
+ close(); // releasing resources is more important than printing the warning
+ String trace =
+ Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
+ LOG.warn("Unclosed input stream created by:\n\t{}", trace);
+ }
+ }
+ }
+
+ @Override
+ public NativeFileCryptoParameters nativeCryptoParameters() {
+ return nativeDecryptionParameters;
+ }
+
+ @Override
+ public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
+ this.nativeDecryptionParameters = nativeCryptoParameters;
+ }
+}
diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkOutputFile.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkOutputFile.java
new file mode 100644
index 000000000000..7b538b7d0368
--- /dev/null
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkOutputFile.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.encryption.NativeFileCryptoParameters;
+import org.apache.iceberg.encryption.NativelyEncryptedFile;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.DelegatingOutputStream;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkOutputFile implements OutputFile, NativelyEncryptedFile {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkOutputFile.class);
+
+ private final FileSystem fs;
+ private final Path path;
+ private NativeFileCryptoParameters nativeDecryptionParameters;
+
+ public FlinkOutputFile(Path path) {
+ this.path = path;
+ try {
+ this.fs = path.getFileSystem();
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to get file system for path: %s", path), e);
+ }
+ }
+
+ @Override
+ public PositionOutputStream create() {
+ try {
+ // Flink's FileSystem plugins do not contractually guarantee a typed exception when a file
+ // already exists with WriteMode.NO_OVERWRITE, so the existence check is done explicitly to
+ // surface an Iceberg AlreadyExistsException without reaching for a Hadoop dependency.
+ if (fileExists()) {
+ throw new AlreadyExistsException("Path already exists: %s", path);
+ }
+
+ return new FlinkPositionOutputStream(fs.create(path, FileSystem.WriteMode.NO_OVERWRITE));
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to create file: %s", path), e);
+ }
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite() {
+ try {
+ return new FlinkPositionOutputStream(fs.create(path, FileSystem.WriteMode.OVERWRITE));
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to create file: %s", path), e);
+ }
+ }
+
+ @Override
+ public String location() {
+ return path.toString();
+ }
+
+ @Override
+ public InputFile toInputFile() {
+ return new FlinkInputFile(path);
+ }
+
+ private boolean fileExists() throws IOException {
+ try {
+ fs.getFileStatus(path);
+ return true;
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ }
+
+ /** PositionOutputStream implementation for FSDataOutputStream. */
+ private static class FlinkPositionOutputStream extends PositionOutputStream
+ implements DelegatingOutputStream {
+ private final FSDataOutputStream stream;
+ private final StackTraceElement[] createStack;
+ private boolean closed;
+
+ FlinkPositionOutputStream(FSDataOutputStream stream) {
+ this.stream = stream;
+ this.createStack = Thread.currentThread().getStackTrace();
+ this.closed = false;
+ }
+
+ @Override
+ public OutputStream getDelegate() {
+ return stream;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ stream.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ this.closed = true;
+ }
+
+ @SuppressWarnings({"checkstyle:NoFinalizer", "Finalize", "deprecation"})
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!closed) {
+ close(); // releasing resources is more important than printing the warning
+ String trace =
+ Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
+ LOG.warn("Unclosed output stream created by:\n\t{}", trace);
+ }
+ }
+ }
+
+ @Override
+ public NativeFileCryptoParameters nativeCryptoParameters() {
+ return nativeDecryptionParameters;
+ }
+
+ @Override
+ public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
+ this.nativeDecryptionParameters = nativeCryptoParameters;
+ }
+}
diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFileSystemFileIO.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFileSystemFileIO.java
new file mode 100644
index 000000000000..bf6a4c08ee30
--- /dev/null
+++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFileSystemFileIO.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestFlinkFileSystemFileIO {
+ private final Random random = new Random(1);
+
+ private FileSystem fs;
+ private FlinkFileSystemFileIO flinkFileSystemFileIO;
+
+ @TempDir private File tempDir;
+
+ @BeforeEach
+ public void before() throws Exception {
+ fs = FileSystem.getLocalFileSystem();
+
+ flinkFileSystemFileIO = new FlinkFileSystemFileIO();
+ }
+
+ @Test
+ void testListPrefix() {
+ Path parent = new Path(tempDir.toURI());
+
+ List scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+ scaleSizes.parallelStream()
+ .forEach(
+ scale -> {
+ Path scalePath = new Path(parent, Integer.toString(scale));
+
+ createRandomFiles(scalePath, scale);
+ assertThat(
+ Streams.stream(flinkFileSystemFileIO.listPrefix(scalePath.toUri().toString()))
+ .count())
+ .isEqualTo((long) scale);
+ });
+
+ long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum();
+ assertThat(Streams.stream(flinkFileSystemFileIO.listPrefix(parent.toUri().toString())).count())
+ .isEqualTo(totalFiles);
+ }
+
+ @Test
+ void testFileExists() throws IOException {
+ Path parent = new Path(tempDir.toURI());
+ Path randomFilePath = new Path(parent, "random-file-" + UUID.randomUUID());
+ fs.create(randomFilePath, FileSystem.WriteMode.OVERWRITE);
+
+ // check existence of the created file
+ assertThat(flinkFileSystemFileIO.newInputFile(randomFilePath.toUri().toString()).exists())
+ .isTrue();
+ fs.delete(randomFilePath, false);
+ assertThat(flinkFileSystemFileIO.newInputFile(randomFilePath.toUri().toString()).exists())
+ .isFalse();
+ }
+
+ @Test
+ void testDeletePrefix() {
+ Path parent = new Path(tempDir.toURI());
+
+ List scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+ scaleSizes.parallelStream()
+ .forEach(
+ scale -> {
+ Path scalePath = new Path(parent, Integer.toString(scale));
+
+ createRandomFiles(scalePath, scale);
+ flinkFileSystemFileIO.deletePrefix(scalePath.toUri().toString());
+
+ // Hadoop filesystem will throw if the path does not exist
+ assertThatThrownBy(
+ () ->
+ flinkFileSystemFileIO.listPrefix(scalePath.toUri().toString()).iterator())
+ .isInstanceOf(UncheckedIOException.class)
+ .hasMessageContaining("Failed to list path recursively");
+ });
+
+ flinkFileSystemFileIO.deletePrefix(parent.toUri().toString());
+ // Hadoop filesystem will throw if the path does not exist
+ assertThatThrownBy(() -> flinkFileSystemFileIO.listPrefix(parent.toUri().toString()).iterator())
+ .isInstanceOf(UncheckedIOException.class)
+ .hasMessageContaining("Failed to list path recursively");
+ }
+
+ @Test
+ void testDeleteFiles() {
+ Path parent = new Path(tempDir.toURI());
+ List filesCreated = createRandomFiles(parent, 10);
+ flinkFileSystemFileIO.deleteFiles(
+ filesCreated.stream().map(Path::toString).collect(Collectors.toList()));
+ filesCreated.forEach(
+ file -> assertThat(flinkFileSystemFileIO.newInputFile(file.toString()).exists()).isFalse());
+ }
+
+ @Test
+ void testDeleteFilesErrorHandling() {
+ List filesCreated =
+ random.ints(2).mapToObj(x -> "fakefsnotreal://file-" + x).collect(Collectors.toList());
+ assertThatThrownBy(() -> flinkFileSystemFileIO.deleteFiles(filesCreated))
+ .isInstanceOf(BulkDeletionFailureException.class)
+ .hasMessage("Failed to delete 2 files");
+ }
+
+ @Test
+ void testFlinkFileIOReadWrite() throws IOException {
+ FileIO testFlinkFileIO = new FlinkFileSystemFileIO();
+
+ Path parent = new Path(tempDir.toURI());
+ Path randomFilePath = new Path(parent, "random-file-" + UUID.randomUUID());
+ byte[] expected = "DUMMY".getBytes(StandardCharsets.UTF_8);
+
+ // Write
+ OutputFile outputFile = testFlinkFileIO.newOutputFile(randomFilePath.getPath());
+ try (PositionOutputStream outputStream = outputFile.create()) {
+ outputStream.write(expected);
+ }
+
+ // Read
+ InputFile inputFile = testFlinkFileIO.newInputFile(randomFilePath.getPath());
+ try (SeekableInputStream inputStream = inputFile.newStream()) {
+ byte[] actual = new byte[(int) inputFile.getLength()];
+ assertThat(inputStream.read(actual)).isEqualTo(actual.length);
+ assertThat(actual).isEqualTo(expected);
+ }
+ }
+
+ @Test
+ void testFlinkFileIOKryoSerialization() throws IOException {
+ FlinkFileSystemFileIO testFlinkFileIO = new FlinkFileSystemFileIO();
+
+ // Flink fileIO should be serializable through Flink's own Kryo serializer, since Flink uses it
+ // to serialize operators and state. This also avoids depending on a specific esotericsoftware
+ // Kryo version, which differs across the supported Flink lines.
+ testFlinkFileIO.initialize(ImmutableMap.of("k1", "v1"));
+ FlinkFileSystemFileIO roundTripSerializedFileIO =
+ roundTripKryoSerialize(FlinkFileSystemFileIO.class, testFlinkFileIO);
+
+ assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testFlinkFileIO.properties());
+ }
+
+ @Test
+ void testFlinkFileIOJavaSerialization() throws IOException, ClassNotFoundException {
+ FileIO testFlinkFileIO = new FlinkFileSystemFileIO();
+
+ // Flink fileIO should be serializable when properties are passed as immutable map
+ testFlinkFileIO.initialize(ImmutableMap.of("k1", "v1"));
+ FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testFlinkFileIO);
+
+ assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testFlinkFileIO.properties());
+ }
+
+ private List createRandomFiles(Path parent, int count) {
+ Vector paths = new Vector<>();
+ random
+ .ints(count)
+ .parallel()
+ .forEach(
+ i -> {
+ try {
+ Path path = new Path(parent, "file-" + i);
+ paths.add(path);
+ fs.create(path, FileSystem.WriteMode.OVERWRITE);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ return paths;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkFileSystemFileIO.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkFileSystemFileIO.java
new file mode 100644
index 000000000000..70826d640db4
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkFileSystemFileIO.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.DelegateFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This {@link FileIO} implementation should be used when the FileSystem behind the Iceberg tables
+ * is also used for other purposes in a Flink job, like checkpoints/savepoints. Setting
+ * FlinkFileSystemFileIO prevents duplicated FileSystem related configurations and allows the usage
+ * of features already provided by the Flink FileSystem plugins, like Delegation Tokens.
+ *
+ * The FlinkFileSystemFileIO should be set during catalog creation using the {@link
+ * org.apache.iceberg.CatalogProperties#FILE_IO_IMPL} property.
+ *
+ *
The FlinkFileSystemFileIO never should be set using table properties, as other engines will
+ * not be able to use the table in this case.
+ */
+public class FlinkFileSystemFileIO implements DelegateFileIO {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkFileSystemFileIO.class);
+ private static final String DELETE_FILE_PARALLELISM =
+ "iceberg.flinkfilesystem.delete-file-parallelism";
+ private static final String DELETE_FILE_POOL_NAME = "iceberg-flinkfilesystem-delete";
+ private static final int DELETE_RETRY_ATTEMPTS = 3;
+ private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+ private static volatile ExecutorService executorService;
+ private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of());
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return new FlinkInputFile(new Path(path));
+ }
+
+ @Override
+ public InputFile newInputFile(String path, long length) {
+ return new FlinkInputFile(new Path(path), length);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return new FlinkOutputFile(new Path(path));
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ Path toDelete = new Path(path);
+ try {
+ toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e);
+ }
+ }
+
+ @Override
+ public Iterable listPrefix(String prefix) {
+ LOG.debug("Listing {}", prefix);
+ Path prefixToList = new Path(prefix);
+ try {
+ return listPrefix(prefixToList.getFileSystem(), prefixToList);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to listing prefix: %s", prefix), e);
+ }
+ }
+
+ @Override
+ public void deletePrefix(String prefix) {
+ Path prefixToDelete = new Path(prefix);
+
+ try {
+ prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to delete prefix: %s", prefix), e);
+ }
+ }
+
+ @Override
+ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException {
+ AtomicInteger failureCount = new AtomicInteger(0);
+ Tasks.foreach(pathsToDelete)
+ .executeWith(executorService())
+ .retry(DELETE_RETRY_ATTEMPTS)
+ .stopRetryOn(FileNotFoundException.class)
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (f, e) -> {
+ LOG.error("Failure during bulk delete on file: {} ", f, e);
+ failureCount.incrementAndGet();
+ })
+ .run(this::deleteFile);
+
+ if (failureCount.get() != 0) {
+ throw new BulkDeletionFailureException(failureCount.get());
+ }
+ }
+
+ @Override
+ public void initialize(Map props) {
+ this.properties = SerializableMap.copyOf(props);
+ }
+
+ @Override
+ public Map properties() {
+ return properties.immutableMap();
+ }
+
+ private Iterable listPrefix(FileSystem fs, Path fileName) {
+ try {
+ FileStatus[] statuses = fs.listStatus(fileName);
+ LOG.debug("Listing path {} {}", fileName, fs.listStatus(fileName));
+ if (statuses == null) {
+ // Check the file is there and ready. If so, then we can assume this is an empty dir.
+ fs.getFileStatus(fileName);
+ statuses = new FileStatus[0];
+ }
+
+ return Iterables.concat(
+ Arrays.stream(statuses)
+ .map(
+ fileStatus -> {
+ if (fileStatus.isDir()) {
+ return listPrefix(fs, fileStatus.getPath());
+ } else {
+ return Collections.singleton(
+ new FileInfo(
+ fileStatus.getPath().toString(),
+ fileStatus.getLen(),
+ fileStatus.getModificationTime()));
+ }
+ })
+ .collect(Collectors.toList()));
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to list path recursively: %s", fileName), e);
+ }
+ }
+
+ private ExecutorService executorService() {
+ if (executorService == null) {
+ synchronized (FlinkFileSystemFileIO.class) {
+ if (executorService == null) {
+ executorService = ThreadPools.newWorkerPool(DELETE_FILE_POOL_NAME, deleteThreads());
+ }
+ }
+ }
+
+ return executorService;
+ }
+
+ private int deleteThreads() {
+ int defaultValue = Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE;
+ return properties.containsKey(DELETE_FILE_PARALLELISM)
+ ? Integer.parseInt(properties.get(DELETE_FILE_PARALLELISM))
+ : defaultValue;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java
new file mode 100644
index 000000000000..051f426bf1dc
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.encryption.NativeFileCryptoParameters;
+import org.apache.iceberg.encryption.NativelyEncryptedFile;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.DelegatingInputStream;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkInputFile implements InputFile, NativelyEncryptedFile {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkInputFile.class);
+
+ private final Path path;
+ private final FileSystem fs;
+ private FileStatus stat = null;
+ private Long length = null;
+ private NativeFileCryptoParameters nativeDecryptionParameters;
+
+ public FlinkInputFile(Path path) {
+ this.path = path;
+ try {
+ this.fs = path.getFileSystem();
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to get file system for path: %s", path), e);
+ }
+ }
+
+ public FlinkInputFile(Path path, long length) {
+ this(path);
+ this.length = length;
+ }
+
+ @Override
+ public long getLength() {
+ if (length == null) {
+ this.length = lazyStat().getLen();
+ }
+
+ return length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ try {
+ return new FlinkSeekableInputStream(path.getFileSystem().open(path));
+ } catch (FileNotFoundException e) {
+ throw new NotFoundException(e, "Failed to open input stream for file: %s", path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to open input stream for file: %s", path), e);
+ }
+ }
+
+ @Override
+ public String location() {
+ return path.toString();
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return lazyStat() != null;
+ } catch (NotFoundException e) {
+ return false;
+ }
+ }
+
+ private FileStatus lazyStat() {
+ if (stat == null) {
+ try {
+ this.stat = fs.getFileStatus(path);
+ } catch (FileNotFoundException e) {
+ throw new NotFoundException(e, "File does not exist: %s", path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to get status for file: %s", path), e);
+ }
+ }
+
+ return stat;
+ }
+
+ /** SeekableInputStream implementation for Flink FSDataInputStream. */
+ private static class FlinkSeekableInputStream extends SeekableInputStream
+ implements DelegatingInputStream {
+ private final FSDataInputStream stream;
+ private final StackTraceElement[] createStack;
+ private boolean closed;
+
+ FlinkSeekableInputStream(FSDataInputStream stream) {
+ this.stream = stream;
+ this.createStack = Thread.currentThread().getStackTrace();
+ this.closed = false;
+ }
+
+ @Override
+ public InputStream getDelegate() {
+ return stream;
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ this.closed = true;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ stream.seek(newPos);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return stream.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ @SuppressWarnings({"checkstyle:NoFinalizer", "Finalize", "deprecation"})
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!closed) {
+ close(); // releasing resources is more important than printing the warning
+ String trace =
+ Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
+ LOG.warn("Unclosed input stream created by:\n\t{}", trace);
+ }
+ }
+ }
+
+ @Override
+ public NativeFileCryptoParameters nativeCryptoParameters() {
+ return nativeDecryptionParameters;
+ }
+
+ @Override
+ public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
+ this.nativeDecryptionParameters = nativeCryptoParameters;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkOutputFile.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkOutputFile.java
new file mode 100644
index 000000000000..7b538b7d0368
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkOutputFile.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.encryption.NativeFileCryptoParameters;
+import org.apache.iceberg.encryption.NativelyEncryptedFile;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.DelegatingOutputStream;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkOutputFile implements OutputFile, NativelyEncryptedFile {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkOutputFile.class);
+
+ private final FileSystem fs;
+ private final Path path;
+ private NativeFileCryptoParameters nativeDecryptionParameters;
+
+ public FlinkOutputFile(Path path) {
+ this.path = path;
+ try {
+ this.fs = path.getFileSystem();
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to get file system for path: %s", path), e);
+ }
+ }
+
+ @Override
+ public PositionOutputStream create() {
+ try {
+ // Flink's FileSystem plugins do not contractually guarantee a typed exception when a file
+ // already exists with WriteMode.NO_OVERWRITE, so the existence check is done explicitly to
+ // surface an Iceberg AlreadyExistsException without reaching for a Hadoop dependency.
+ if (fileExists()) {
+ throw new AlreadyExistsException("Path already exists: %s", path);
+ }
+
+ return new FlinkPositionOutputStream(fs.create(path, FileSystem.WriteMode.NO_OVERWRITE));
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to create file: %s", path), e);
+ }
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite() {
+ try {
+ return new FlinkPositionOutputStream(fs.create(path, FileSystem.WriteMode.OVERWRITE));
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to create file: %s", path), e);
+ }
+ }
+
+ @Override
+ public String location() {
+ return path.toString();
+ }
+
+ @Override
+ public InputFile toInputFile() {
+ return new FlinkInputFile(path);
+ }
+
+ private boolean fileExists() throws IOException {
+ try {
+ fs.getFileStatus(path);
+ return true;
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ }
+
+ /** PositionOutputStream implementation for FSDataOutputStream. */
+ private static class FlinkPositionOutputStream extends PositionOutputStream
+ implements DelegatingOutputStream {
+ private final FSDataOutputStream stream;
+ private final StackTraceElement[] createStack;
+ private boolean closed;
+
+ FlinkPositionOutputStream(FSDataOutputStream stream) {
+ this.stream = stream;
+ this.createStack = Thread.currentThread().getStackTrace();
+ this.closed = false;
+ }
+
+ @Override
+ public OutputStream getDelegate() {
+ return stream;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ stream.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ this.closed = true;
+ }
+
+ @SuppressWarnings({"checkstyle:NoFinalizer", "Finalize", "deprecation"})
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!closed) {
+ close(); // releasing resources is more important than printing the warning
+ String trace =
+ Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
+ LOG.warn("Unclosed output stream created by:\n\t{}", trace);
+ }
+ }
+ }
+
+ @Override
+ public NativeFileCryptoParameters nativeCryptoParameters() {
+ return nativeDecryptionParameters;
+ }
+
+ @Override
+ public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
+ this.nativeDecryptionParameters = nativeCryptoParameters;
+ }
+}
diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFileSystemFileIO.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFileSystemFileIO.java
new file mode 100644
index 000000000000..bf6a4c08ee30
--- /dev/null
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFileSystemFileIO.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestFlinkFileSystemFileIO {
+ private final Random random = new Random(1);
+
+ private FileSystem fs;
+ private FlinkFileSystemFileIO flinkFileSystemFileIO;
+
+ @TempDir private File tempDir;
+
+ @BeforeEach
+ public void before() throws Exception {
+ fs = FileSystem.getLocalFileSystem();
+
+ flinkFileSystemFileIO = new FlinkFileSystemFileIO();
+ }
+
+ @Test
+ void testListPrefix() {
+ Path parent = new Path(tempDir.toURI());
+
+ List scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+ scaleSizes.parallelStream()
+ .forEach(
+ scale -> {
+ Path scalePath = new Path(parent, Integer.toString(scale));
+
+ createRandomFiles(scalePath, scale);
+ assertThat(
+ Streams.stream(flinkFileSystemFileIO.listPrefix(scalePath.toUri().toString()))
+ .count())
+ .isEqualTo((long) scale);
+ });
+
+ long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum();
+ assertThat(Streams.stream(flinkFileSystemFileIO.listPrefix(parent.toUri().toString())).count())
+ .isEqualTo(totalFiles);
+ }
+
+ @Test
+ void testFileExists() throws IOException {
+ Path parent = new Path(tempDir.toURI());
+ Path randomFilePath = new Path(parent, "random-file-" + UUID.randomUUID());
+ fs.create(randomFilePath, FileSystem.WriteMode.OVERWRITE);
+
+ // check existence of the created file
+ assertThat(flinkFileSystemFileIO.newInputFile(randomFilePath.toUri().toString()).exists())
+ .isTrue();
+ fs.delete(randomFilePath, false);
+ assertThat(flinkFileSystemFileIO.newInputFile(randomFilePath.toUri().toString()).exists())
+ .isFalse();
+ }
+
+ @Test
+ void testDeletePrefix() {
+ Path parent = new Path(tempDir.toURI());
+
+ List scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+ scaleSizes.parallelStream()
+ .forEach(
+ scale -> {
+ Path scalePath = new Path(parent, Integer.toString(scale));
+
+ createRandomFiles(scalePath, scale);
+ flinkFileSystemFileIO.deletePrefix(scalePath.toUri().toString());
+
+ // Hadoop filesystem will throw if the path does not exist
+ assertThatThrownBy(
+ () ->
+ flinkFileSystemFileIO.listPrefix(scalePath.toUri().toString()).iterator())
+ .isInstanceOf(UncheckedIOException.class)
+ .hasMessageContaining("Failed to list path recursively");
+ });
+
+ flinkFileSystemFileIO.deletePrefix(parent.toUri().toString());
+ // Hadoop filesystem will throw if the path does not exist
+ assertThatThrownBy(() -> flinkFileSystemFileIO.listPrefix(parent.toUri().toString()).iterator())
+ .isInstanceOf(UncheckedIOException.class)
+ .hasMessageContaining("Failed to list path recursively");
+ }
+
+ @Test
+ void testDeleteFiles() {
+ Path parent = new Path(tempDir.toURI());
+ List filesCreated = createRandomFiles(parent, 10);
+ flinkFileSystemFileIO.deleteFiles(
+ filesCreated.stream().map(Path::toString).collect(Collectors.toList()));
+ filesCreated.forEach(
+ file -> assertThat(flinkFileSystemFileIO.newInputFile(file.toString()).exists()).isFalse());
+ }
+
+ @Test
+ void testDeleteFilesErrorHandling() {
+ List filesCreated =
+ random.ints(2).mapToObj(x -> "fakefsnotreal://file-" + x).collect(Collectors.toList());
+ assertThatThrownBy(() -> flinkFileSystemFileIO.deleteFiles(filesCreated))
+ .isInstanceOf(BulkDeletionFailureException.class)
+ .hasMessage("Failed to delete 2 files");
+ }
+
+ @Test
+ void testFlinkFileIOReadWrite() throws IOException {
+ FileIO testFlinkFileIO = new FlinkFileSystemFileIO();
+
+ Path parent = new Path(tempDir.toURI());
+ Path randomFilePath = new Path(parent, "random-file-" + UUID.randomUUID());
+ byte[] expected = "DUMMY".getBytes(StandardCharsets.UTF_8);
+
+ // Write
+ OutputFile outputFile = testFlinkFileIO.newOutputFile(randomFilePath.getPath());
+ try (PositionOutputStream outputStream = outputFile.create()) {
+ outputStream.write(expected);
+ }
+
+ // Read
+ InputFile inputFile = testFlinkFileIO.newInputFile(randomFilePath.getPath());
+ try (SeekableInputStream inputStream = inputFile.newStream()) {
+ byte[] actual = new byte[(int) inputFile.getLength()];
+ assertThat(inputStream.read(actual)).isEqualTo(actual.length);
+ assertThat(actual).isEqualTo(expected);
+ }
+ }
+
+ @Test
+ void testFlinkFileIOKryoSerialization() throws IOException {
+ FlinkFileSystemFileIO testFlinkFileIO = new FlinkFileSystemFileIO();
+
+ // Flink fileIO should be serializable through Flink's own Kryo serializer, since Flink uses it
+ // to serialize operators and state. This also avoids depending on a specific esotericsoftware
+ // Kryo version, which differs across the supported Flink lines.
+ testFlinkFileIO.initialize(ImmutableMap.of("k1", "v1"));
+ FlinkFileSystemFileIO roundTripSerializedFileIO =
+ roundTripKryoSerialize(FlinkFileSystemFileIO.class, testFlinkFileIO);
+
+ assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testFlinkFileIO.properties());
+ }
+
+ @Test
+ void testFlinkFileIOJavaSerialization() throws IOException, ClassNotFoundException {
+ FileIO testFlinkFileIO = new FlinkFileSystemFileIO();
+
+ // Flink fileIO should be serializable when properties are passed as immutable map
+ testFlinkFileIO.initialize(ImmutableMap.of("k1", "v1"));
+ FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testFlinkFileIO);
+
+ assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testFlinkFileIO.properties());
+ }
+
+ private List createRandomFiles(Path parent, int count) {
+ Vector paths = new Vector<>();
+ random
+ .ints(count)
+ .parallel()
+ .forEach(
+ i -> {
+ try {
+ Path path = new Path(parent, "file-" + i);
+ paths.add(path);
+ fs.create(path, FileSystem.WriteMode.OVERWRITE);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ return paths;
+ }
+}