Skip to main content

risingwave_meta/manager/iceberg_v3_sink/
backfill.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::Result;
18use iceberg::spec::DataFile;
19
20pub fn backfill_dv_partitions(
21    writer_files: &[DataFile],
22    dv_delete_files: &mut [DataFile],
23) -> Result<()> {
24    if dv_delete_files.is_empty() {
25        return Ok(());
26    }
27    let mut waiting_delete_files = dv_delete_files
28        .iter_mut()
29        .filter(|dv| dv.partition().fields().is_empty())
30        .map(|dv| (dv.referenced_data_file().unwrap(), dv))
31        .collect::<HashMap<_, _>>();
32    if waiting_delete_files.is_empty() {
33        return Ok(());
34    }
35
36    for f in writer_files {
37        if let Some(dv) = waiting_delete_files.remove(f.file_path()) {
38            dv.set_partition(f.partition().clone());
39            if waiting_delete_files.is_empty() {
40                return Ok(());
41            }
42        }
43    }
44
45    if !waiting_delete_files.is_empty() {
46        anyhow::bail!(
47            "backfill iceberg v3 sink delete files failed, missing referenced data files: {:?}",
48            waiting_delete_files.keys().collect::<Vec<_>>()
49        );
50    }
51    Ok(())
52}
53
54#[cfg(test)]
55mod tests {
56    use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct};
57
58    use super::*;
59
60    fn writer_file(path: &str, partition: Struct) -> DataFile {
61        DataFileBuilder::default()
62            .content(DataContentType::Data)
63            .file_path(path.to_owned())
64            .file_format(DataFileFormat::Parquet)
65            .partition(partition)
66            .partition_spec_id(0)
67            .record_count(1)
68            .file_size_in_bytes(1)
69            .build()
70            .expect("writer file build")
71    }
72
73    fn dv_file(path: &str, referenced: &str, partition: Struct) -> DataFile {
74        DataFileBuilder::default()
75            .content(DataContentType::PositionDeletes)
76            .file_path(path.to_owned())
77            .file_format(DataFileFormat::Puffin)
78            .partition(partition)
79            .partition_spec_id(0)
80            .record_count(1)
81            .file_size_in_bytes(1)
82            .referenced_data_file(Some(referenced.to_owned()))
83            .build()
84            .expect("dv file build")
85    }
86
87    fn struct_with_int(value: i32) -> Struct {
88        Struct::from_iter([Some(Literal::int(value))])
89    }
90
91    #[test]
92    fn empty_inputs_noop() -> Result<()> {
93        let mut dvs: Vec<DataFile> = vec![];
94        backfill_dv_partitions(&[], &mut dvs)?;
95        assert!(dvs.is_empty());
96
97        let writers = [writer_file("data/a.parquet", struct_with_int(7))];
98        let mut dvs: Vec<DataFile> = vec![];
99        backfill_dv_partitions(&writers, &mut dvs)?;
100
101        assert!(dvs.is_empty());
102        Ok(())
103    }
104
105    #[test]
106    fn dv_referencing_same_epoch_writer_gets_partition() -> Result<()> {
107        let writers = [writer_file("data/a.parquet", struct_with_int(42))];
108        let mut dvs = vec![dv_file("dv/a.puff", "data/a.parquet", Struct::empty())];
109
110        backfill_dv_partitions(&writers, &mut dvs)?;
111
112        assert_eq!(dvs[0].partition(), &struct_with_int(42));
113        Ok(())
114    }
115
116    #[test]
117    fn dv_referencing_older_snapshot_left_alone() -> Result<()> {
118        // DV references a data file NOT in the uncommitted writer set — its
119        // partition was correctly populated by DvMerger from the older
120        // snapshot and must not be overwritten.
121        let writers = [writer_file("data/a.parquet", struct_with_int(1))];
122        let dv_partition = struct_with_int(99);
123        let mut dvs = vec![dv_file(
124            "dv/old.puff",
125            "data/somewhere_old.parquet",
126            dv_partition.clone(),
127        )];
128
129        backfill_dv_partitions(&writers, &mut dvs)?;
130
131        assert_eq!(dvs[0].partition(), &dv_partition, "must not overwrite");
132        Ok(())
133    }
134
135    #[test]
136    fn mixed_set_partial_backfill() -> Result<()> {
137        let writers = [
138            writer_file("data/a.parquet", struct_with_int(1)),
139            writer_file("data/b.parquet", struct_with_int(2)),
140        ];
141        let mut dvs = vec![
142            dv_file("dv/a.puff", "data/a.parquet", Struct::empty()),
143            dv_file("dv/old.puff", "data/c.parquet", struct_with_int(99)),
144            dv_file("dv/b.puff", "data/b.parquet", Struct::empty()),
145        ];
146
147        backfill_dv_partitions(&writers, &mut dvs)?;
148
149        assert_eq!(dvs[0].partition(), &struct_with_int(1));
150        assert_eq!(
151            dvs[1].partition(),
152            &struct_with_int(99),
153            "older snapshot DV untouched"
154        );
155        assert_eq!(dvs[2].partition(), &struct_with_int(2));
156        Ok(())
157    }
158}