risingwave_meta/manager/iceberg_v3_sink/
backfill.rs1use std::collections::HashMap;
16
17use anyhow::Result;
18use iceberg::spec::DataFile;
19
20pub fn backfill_dv_partitions(
21 writer_files: &[DataFile],
22 dv_delete_files: &mut [DataFile],
23) -> Result<()> {
24 if dv_delete_files.is_empty() {
25 return Ok(());
26 }
27 let mut waiting_delete_files = dv_delete_files
28 .iter_mut()
29 .filter(|dv| dv.partition().fields().is_empty())
30 .map(|dv| (dv.referenced_data_file().unwrap(), dv))
31 .collect::<HashMap<_, _>>();
32 if waiting_delete_files.is_empty() {
33 return Ok(());
34 }
35
36 for f in writer_files {
37 if let Some(dv) = waiting_delete_files.remove(f.file_path()) {
38 dv.set_partition(f.partition().clone());
39 if waiting_delete_files.is_empty() {
40 return Ok(());
41 }
42 }
43 }
44
45 if !waiting_delete_files.is_empty() {
46 anyhow::bail!(
47 "backfill iceberg v3 sink delete files failed, missing referenced data files: {:?}",
48 waiting_delete_files.keys().collect::<Vec<_>>()
49 );
50 }
51 Ok(())
52}
53
54#[cfg(test)]
55mod tests {
56 use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct};
57
58 use super::*;
59
60 fn writer_file(path: &str, partition: Struct) -> DataFile {
61 DataFileBuilder::default()
62 .content(DataContentType::Data)
63 .file_path(path.to_owned())
64 .file_format(DataFileFormat::Parquet)
65 .partition(partition)
66 .partition_spec_id(0)
67 .record_count(1)
68 .file_size_in_bytes(1)
69 .build()
70 .expect("writer file build")
71 }
72
73 fn dv_file(path: &str, referenced: &str, partition: Struct) -> DataFile {
74 DataFileBuilder::default()
75 .content(DataContentType::PositionDeletes)
76 .file_path(path.to_owned())
77 .file_format(DataFileFormat::Puffin)
78 .partition(partition)
79 .partition_spec_id(0)
80 .record_count(1)
81 .file_size_in_bytes(1)
82 .referenced_data_file(Some(referenced.to_owned()))
83 .build()
84 .expect("dv file build")
85 }
86
87 fn struct_with_int(value: i32) -> Struct {
88 Struct::from_iter([Some(Literal::int(value))])
89 }
90
91 #[test]
92 fn empty_inputs_noop() -> Result<()> {
93 let mut dvs: Vec<DataFile> = vec![];
94 backfill_dv_partitions(&[], &mut dvs)?;
95 assert!(dvs.is_empty());
96
97 let writers = [writer_file("data/a.parquet", struct_with_int(7))];
98 let mut dvs: Vec<DataFile> = vec![];
99 backfill_dv_partitions(&writers, &mut dvs)?;
100
101 assert!(dvs.is_empty());
102 Ok(())
103 }
104
105 #[test]
106 fn dv_referencing_same_epoch_writer_gets_partition() -> Result<()> {
107 let writers = [writer_file("data/a.parquet", struct_with_int(42))];
108 let mut dvs = vec![dv_file("dv/a.puff", "data/a.parquet", Struct::empty())];
109
110 backfill_dv_partitions(&writers, &mut dvs)?;
111
112 assert_eq!(dvs[0].partition(), &struct_with_int(42));
113 Ok(())
114 }
115
116 #[test]
117 fn dv_referencing_older_snapshot_left_alone() -> Result<()> {
118 let writers = [writer_file("data/a.parquet", struct_with_int(1))];
122 let dv_partition = struct_with_int(99);
123 let mut dvs = vec![dv_file(
124 "dv/old.puff",
125 "data/somewhere_old.parquet",
126 dv_partition.clone(),
127 )];
128
129 backfill_dv_partitions(&writers, &mut dvs)?;
130
131 assert_eq!(dvs[0].partition(), &dv_partition, "must not overwrite");
132 Ok(())
133 }
134
135 #[test]
136 fn mixed_set_partial_backfill() -> Result<()> {
137 let writers = [
138 writer_file("data/a.parquet", struct_with_int(1)),
139 writer_file("data/b.parquet", struct_with_int(2)),
140 ];
141 let mut dvs = vec![
142 dv_file("dv/a.puff", "data/a.parquet", Struct::empty()),
143 dv_file("dv/old.puff", "data/c.parquet", struct_with_int(99)),
144 dv_file("dv/b.puff", "data/b.parquet", Struct::empty()),
145 ];
146
147 backfill_dv_partitions(&writers, &mut dvs)?;
148
149 assert_eq!(dvs[0].partition(), &struct_with_int(1));
150 assert_eq!(
151 dvs[1].partition(),
152 &struct_with_int(99),
153 "older snapshot DV untouched"
154 );
155 assert_eq!(dvs[2].partition(), &struct_with_int(2));
156 Ok(())
157 }
158}