Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 186 additions & 3 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,12 @@ impl SnapshotProduceOperation for FastAppendOperation {
Ok(manifest_list
.entries()
.iter()
.filter(|entry| entry.has_added_files() || entry.has_existing_files())
.filter(|entry| {
// Keep delete-only manifests too: they record which files were removed and
// must persist across snapshots until `expire_snapshots` cleans them up.
// Dropping them lets the removed files reappear as live data (see #2148).
entry.has_added_files() || entry.has_existing_files() || entry.has_deleted_files()
})
.cloned()
.collect())
}
Expand All @@ -147,14 +152,192 @@ impl SnapshotProduceOperation for FastAppendOperation {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;

use minijinja::{AutoEscape, Environment, Value, context};
use tempfile::TempDir;
use uuid::Uuid;

use super::FastAppendOperation;
use crate::io::FileIO;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, ManifestEntry,
ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata,
};
use crate::table::Table;
use crate::test_utils::test_runtime;
use crate::transaction::snapshot::{SnapshotProduceOperation, SnapshotProducer};
use crate::transaction::tests::make_v2_minimal_table;
use crate::transaction::{Transaction, TransactionAction};
use crate::{TableRequirement, TableUpdate};
use crate::{TableIdent, TableRequirement, TableUpdate};

fn render_template(template: &str, ctx: Value) -> String {
let mut env = Environment::new();
env.set_auto_escape_callback(|_| AutoEscape::None);
env.render_str(template, ctx).unwrap()
}

/// Builds a table whose current snapshot's manifest list contains a data manifest
/// followed by a delete-only manifest (one entry with `ManifestStatus::Deleted`,
/// so `deleted_files_count > 0` while `added_files_count == existing_files_count == 0`).
///
/// Returns the table plus the `manifest_path` of the delete-only manifest so callers
/// can assert whether `existing_manifest()` carries it forward.
async fn make_table_with_delete_only_manifest() -> (Table, TempDir, String) {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let manifest_list_location = table_location.join("metadata/manifests_list_1.avro");
let table_metadata_location = table_location.join("metadata/v1.json");

let file_io = FileIO::new_with_fs();

let template = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
// The template has two snapshots; point the current one at our manifest list.
let metadata_json = render_template(&template, context! {
table_location => &table_location,
manifest_list_1_location => &manifest_list_location,
manifest_list_2_location => &manifest_list_location,
table_metadata_1_location => &table_metadata_location,
});
let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_json).unwrap();

let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io)
.metadata_location(table_metadata_location.to_str().unwrap())
.runtime(test_runtime())
.build()
.unwrap();

let current_snapshot = table.metadata().current_snapshot().unwrap();
let schema = current_snapshot.schema(table.metadata()).unwrap();
let partition_spec = table.metadata().default_partition_spec();

let next_manifest_file = |location: &str| {
table
.file_io()
.new_output(format!(
"{}/metadata/manifest_{}.avro",
location,
Uuid::new_v4()
))
.unwrap()
};
let table_location_str = table_location.to_str().unwrap().to_string();

// Data manifest: one Added data file.
let mut data_writer = ManifestWriterBuilder::new(
next_manifest_file(&table_location_str),
Some(current_snapshot.snapshot_id()),
None,
schema.clone(),
partition_spec.as_ref().clone(),
)
.build_v2_data();
data_writer
.add_entry(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{table_location_str}/data.parquet"))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let data_manifest = data_writer.write_manifest_file().await.unwrap();

// Delete-only manifest: a single Deleted entry, nothing added or existing.
let mut delete_writer = ManifestWriterBuilder::new(
next_manifest_file(&table_location_str),
Some(current_snapshot.snapshot_id()),
None,
schema.clone(),
partition_spec.as_ref().clone(),
)
.build_v2_data();
delete_writer
.add_delete_entry(
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.sequence_number(0)
.file_sequence_number(0)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{table_location_str}/removed.parquet"))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let delete_manifest = delete_writer.write_manifest_file().await.unwrap();
let delete_manifest_path = delete_manifest.manifest_path.clone();

// Sanity: the delete manifest really is delete-only.
assert!(delete_manifest.has_deleted_files());
assert!(!delete_manifest.has_added_files());
assert!(!delete_manifest.has_existing_files());

let mut manifest_list_write = ManifestListWriter::v2(
table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifests(vec![data_manifest, delete_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();

(table, tmp_dir, delete_manifest_path)
}

/// Regression test for #2148: `FastAppendOperation::existing_manifest()` must carry
/// delete-only manifests forward into the new snapshot. Dropping them lets the files
/// they mark as removed reappear as live data on the next append.
#[tokio::test]
async fn test_existing_manifest_preserves_delete_only_manifest() {
let (table, _tmp_dir, delete_manifest_path) = make_table_with_delete_only_manifest().await;

let producer = SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use the FastAppend tx api rather using the low level api for tests, you can find examples in other uts.


let carried_forward = FastAppendOperation
.existing_manifest(&producer)
.await
.unwrap();

assert!(
carried_forward
.iter()
.any(|m| m.manifest_path == delete_manifest_path),
"delete-only manifest {delete_manifest_path} was dropped from the new snapshot's \
manifest list; the files it removed would reappear as live data"
);
}

#[tokio::test]
async fn test_empty_data_append_action() {
Expand Down
Loading