diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..299973742e 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -138,7 +138,12 @@ impl SnapshotProduceOperation for FastAppendOperation { Ok(manifest_list .entries() .iter() - .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .filter(|entry| { + // Keep delete-only manifests too: they record which files were removed and + // must persist across snapshots until `expire_snapshots` cleans them up. + // Dropping them lets the removed files reappear as live data (see #2148). + entry.has_added_files() || entry.has_existing_files() || entry.has_deleted_files() + }) .cloned() .collect()) } @@ -147,14 +152,212 @@ impl SnapshotProduceOperation for FastAppendOperation { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::fs; use std::sync::Arc; + use minijinja::{AutoEscape, Environment, Value, context}; + use tempfile::TempDir; + use uuid::Uuid; + + use crate::io::FileIO; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct, + DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, ManifestEntry, + ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata, }; + use crate::table::Table; + use crate::test_utils::test_runtime; use crate::transaction::tests::make_v2_minimal_table; use crate::transaction::{Transaction, TransactionAction}; - use crate::{TableRequirement, TableUpdate}; + use crate::{TableIdent, TableRequirement, TableUpdate}; + + fn render_template(template: &str, ctx: Value) -> String { + let mut env = Environment::new(); + env.set_auto_escape_callback(|_| AutoEscape::None); + env.render_str(template, ctx).unwrap() + } + + /// Builds a table whose current snapshot's manifest list contains a data manifest + /// followed by a delete-only manifest (one entry with `ManifestStatus::Deleted`, + /// so `deleted_files_count > 0` while `added_files_count == existing_files_count == 0`). + /// + /// Returns the table plus the `manifest_path` of the delete-only manifest so callers + /// can assert whether a subsequent append carries it forward. + async fn make_table_with_delete_only_manifest() -> (Table, TempDir, String) { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().join("table1"); + let manifest_list_location = table_location.join("metadata/manifests_list_1.avro"); + let table_metadata_location = table_location.join("metadata/v1.json"); + + let file_io = FileIO::new_with_fs(); + + let template = fs::read_to_string(format!( + "{}/testdata/example_table_metadata_v2.json", + env!("CARGO_MANIFEST_DIR") + )) + .unwrap(); + // The template has two snapshots; point the current one at our manifest list. + let metadata_json = render_template(&template, context! { + table_location => &table_location, + manifest_list_1_location => &manifest_list_location, + manifest_list_2_location => &manifest_list_location, + table_metadata_1_location => &table_metadata_location, + }); + let table_metadata = serde_json::from_str::(&metadata_json).unwrap(); + + let table = Table::builder() + .metadata(table_metadata) + .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) + .file_io(file_io) + .metadata_location(table_metadata_location.to_str().unwrap()) + .runtime(test_runtime()) + .build() + .unwrap(); + + let current_snapshot = table.metadata().current_snapshot().unwrap(); + let schema = current_snapshot.schema(table.metadata()).unwrap(); + let partition_spec = table.metadata().default_partition_spec(); + + let next_manifest_file = |location: &str| { + table + .file_io() + .new_output(format!( + "{}/metadata/manifest_{}.avro", + location, + Uuid::new_v4() + )) + .unwrap() + }; + let table_location_str = table_location.to_str().unwrap().to_string(); + + // Data manifest: one Added data file. + let mut data_writer = ManifestWriterBuilder::new( + next_manifest_file(&table_location_str), + Some(current_snapshot.snapshot_id()), + None, + schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + data_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(format!("{table_location_str}/data.parquet")) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + let data_manifest = data_writer.write_manifest_file().await.unwrap(); + + // Delete-only manifest: a single Deleted entry, nothing added or existing. + let mut delete_writer = ManifestWriterBuilder::new( + next_manifest_file(&table_location_str), + Some(current_snapshot.snapshot_id()), + None, + schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + delete_writer + .add_delete_entry( + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .sequence_number(0) + .file_sequence_number(0) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(format!("{table_location_str}/removed.parquet")) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + let delete_manifest = delete_writer.write_manifest_file().await.unwrap(); + let delete_manifest_path = delete_manifest.manifest_path.clone(); + + // Sanity: the delete manifest really is delete-only. + assert!(delete_manifest.has_deleted_files()); + assert!(!delete_manifest.has_added_files()); + assert!(!delete_manifest.has_existing_files()); + + let mut manifest_list_write = ManifestListWriter::v2( + table + .file_io() + .new_output(current_snapshot.manifest_list()) + .unwrap(), + current_snapshot.snapshot_id(), + current_snapshot.parent_snapshot_id(), + current_snapshot.sequence_number(), + ); + manifest_list_write + .add_manifests(vec![data_manifest, delete_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + + (table, tmp_dir, delete_manifest_path) + } + + /// Regression test for #2148: a `fast_append` must carry delete-only manifests + /// forward into the new snapshot. Dropping them lets the files they mark as + /// removed reappear as live data on the next append. + #[tokio::test] + async fn test_fast_append_preserves_delete_only_manifest() { + let (table, _tmp_dir, delete_manifest_path) = make_table_with_delete_only_manifest().await; + + // Append a new data file via the public transaction API. + let new_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/appended.parquet", table.metadata().location())) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![new_file]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + unreachable!("first update of a fast append should be AddSnapshot") + }; + + let manifest_list = new_snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + assert!( + manifest_list + .entries() + .iter() + .any(|m| m.manifest_path == delete_manifest_path), + "delete-only manifest {delete_manifest_path} was dropped from the new snapshot's \ + manifest list; the files it removed would reappear as live data" + ); + } #[tokio::test] async fn test_empty_data_append_action() {