risingwave_frontend/optimizer/rule/
iceberg_intermediate_scan_rule.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This rule materializes a `LogicalIcebergIntermediateScan` to the final
16//! `LogicalIcebergScan` with delete file anti-joins.
17//!
18//! This is the final step in the Iceberg scan optimization pipeline:
19//! 1. `LogicalSource` -> `LogicalIcebergIntermediateScan`
20//! 2. Predicate pushdown and column pruning on `LogicalIcebergIntermediateScan`
21//! 3. `LogicalIcebergIntermediateScan` -> `LogicalIcebergScan` (this rule)
22//!
23//! At this point, the intermediate scan has accumulated:
24//! - The predicate to be pushed down to Iceberg
25//! - The output column indices for projection
26//!
27//! This rule:
28//! 1. Reads file scan tasks from Iceberg (data files and delete files)
29//! 2. Creates the `LogicalIcebergScan` for data files with pre-computed splits
30//! 3. Creates anti-joins for equality delete and position delete files
31//! 4. Adds a project if output columns differ from scan columns
32
33use std::collections::HashMap;
34
35use anyhow::Context;
36use iceberg::scan::FileScanTask;
37use iceberg::spec::{DataContentType, FormatVersion};
38use risingwave_common::catalog::{
39    ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
40    ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
41};
42use risingwave_common::util::iter_util::ZipEqFast;
43use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergSplitEnumerator};
44use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
45
46use super::prelude::{PlanRef, *};
47use crate::error::Result;
48use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
49use crate::optimizer::plan_node::generic::GenericPlanRef;
50use crate::optimizer::plan_node::{
51    Logical, LogicalIcebergIntermediateScan, LogicalIcebergScan, LogicalJoin, LogicalProject,
52    LogicalValues,
53};
54use crate::optimizer::rule::{ApplyResult, FallibleRule};
55use crate::utils::{ColIndexMapping, Condition, FRONTEND_RUNTIME};
56
57pub struct IcebergIntermediateScanRule;
58
59impl FallibleRule<Logical> for IcebergIntermediateScanRule {
60    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
61        let scan: &LogicalIcebergIntermediateScan =
62            match plan.as_logical_iceberg_intermediate_scan() {
63                Some(s) => s,
64                None => return ApplyResult::NotApplicable,
65            };
66
67        let Some(catalog) = scan.source_catalog() else {
68            return ApplyResult::NotApplicable;
69        };
70
71        // Create the IcebergSplitEnumerator to get file scan tasks
72        let enumerator = if let ConnectorProperties::Iceberg(prop) =
73            ConnectorProperties::extract(catalog.with_properties.clone(), false)?
74        {
75            IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
76        } else {
77            return ApplyResult::NotApplicable;
78        };
79
80        #[cfg(madsim)]
81        return ApplyResult::Err(
82            crate::error::ErrorCode::BindError(
83                "iceberg_scan can't be used in the madsim mode".to_string(),
84            )
85            .into(),
86        );
87
88        #[cfg(not(madsim))]
89        {
90            use risingwave_connector::source::iceberg::IcebergListResult;
91
92            let list_result =
93                tokio::task::block_in_place(|| {
94                    FRONTEND_RUNTIME.block_on(enumerator.list_scan_tasks(
95                        Some(scan.time_travel_info.clone()),
96                        scan.predicate.clone(),
97                    ))
98                })?;
99            let Some(IcebergListResult {
100                mut data_files,
101                mut equality_delete_files,
102                mut position_delete_files,
103                equality_delete_columns,
104                format_version,
105            }) = list_result
106            else {
107                tracing::info!(
108                    "There is no valid snapshot for the Iceberg table, returning empty table plan"
109                );
110                return ApplyResult::Ok(empty_table_plan(&plan, scan));
111            };
112            if data_files.is_empty() {
113                tracing::info!(
114                    "There is no data file for the Iceberg table, returning empty table plan"
115                );
116                return ApplyResult::Ok(empty_table_plan(&plan, scan));
117            }
118
119            // Build the data file scan with pre-computed splits
120            let schema = data_files[0].schema.clone();
121            let mut projection_columns: Vec<&str> = scan
122                .output_columns
123                .iter()
124                .chain(&equality_delete_columns)
125                .map(|col| col.as_str())
126                .collect();
127            projection_columns.sort_unstable_by_key(|&s| schema.field_id_by_name(s));
128            projection_columns.dedup();
129            set_project_field_ids(&mut data_files, &schema, projection_columns.iter())?;
130            match format_version {
131                FormatVersion::V1 | FormatVersion::V2 => {
132                    for file in &mut data_files {
133                        file.deletes.clear();
134                    }
135                }
136                FormatVersion::V3 => {
137                    for file in &mut data_files {
138                        file.deletes.retain(|delete| {
139                            delete.data_file_content == DataContentType::PositionDeletes
140                        });
141                    }
142                }
143            }
144
145            let column_catalog_map: HashMap<&str, &ColumnCatalog> = catalog
146                .columns
147                .iter()
148                .map(|c| (c.column_desc.name.as_str(), c))
149                .collect();
150            if !equality_delete_files.is_empty() {
151                projection_columns.push(ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
152            }
153            let use_position_delete_join =
154                !position_delete_files.is_empty() && format_version < FormatVersion::V3;
155            if use_position_delete_join {
156                projection_columns.push(ICEBERG_FILE_PATH_COLUMN_NAME);
157                projection_columns.push(ICEBERG_FILE_POS_COLUMN_NAME);
158            }
159            let column_catalogs =
160                build_column_catalogs(projection_columns.iter(), &column_catalog_map)?;
161            let core = scan.core.clone_with_column_catalog(column_catalogs);
162            let mut plan: PlanRef =
163                LogicalIcebergScan::new(core, IcebergFileScanTask::Data(data_files)).into();
164
165            // Add anti-join for equality delete files
166            if !equality_delete_files.is_empty() {
167                set_project_field_ids(
168                    &mut equality_delete_files,
169                    &schema,
170                    equality_delete_columns.iter(),
171                )?;
172                plan = build_equality_delete_hashjoin_scan(
173                    scan,
174                    &column_catalog_map,
175                    plan,
176                    equality_delete_files,
177                    equality_delete_columns,
178                )?;
179            }
180
181            // Add anti-join for position delete files
182            if use_position_delete_join {
183                set_project_field_ids(
184                    &mut position_delete_files,
185                    &schema,
186                    std::iter::empty::<&str>(),
187                )?;
188                plan = build_position_delete_hashjoin_scan(
189                    scan,
190                    &column_catalog_map,
191                    plan,
192                    position_delete_files,
193                )?;
194            }
195
196            // Add projection if output columns differ from scan columns
197            let schema_len = plan.schema().len();
198            let schema_names = plan.schema().fields.iter().map(|f| f.name.as_str());
199            let output_names = scan.output_columns.iter().map(|s| s.as_str());
200            if schema_len != scan.output_columns.len()
201                || !itertools::equal(schema_names, output_names)
202            {
203                let col_map: HashMap<&str, usize> = plan
204                    .schema()
205                    .fields
206                    .iter()
207                    .enumerate()
208                    .map(|(idx, field)| (field.name.as_str(), idx))
209                    .collect();
210                let output_col_idx: Vec<_> = scan
211                    .output_columns
212                    .iter()
213                    .map(|col| {
214                        col_map.get(col.as_str()).copied().with_context(|| {
215                            format!("Output column {} not found in scan schema", col)
216                        })
217                    })
218                    .try_collect()?;
219                let mapping = ColIndexMapping::with_remaining_columns(&output_col_idx, schema_len);
220                plan = LogicalProject::with_mapping(plan, mapping).into();
221            }
222
223            ApplyResult::Ok(plan)
224        }
225    }
226}
227
228impl IcebergIntermediateScanRule {
229    pub fn create() -> BoxedRule {
230        Box::new(IcebergIntermediateScanRule)
231    }
232}
233
234/// Returns an empty table plan with the same schema as the scan.
235fn empty_table_plan(plan: &PlanRef, scan: &LogicalIcebergIntermediateScan) -> PlanRef {
236    LogicalValues::new(vec![], scan.schema().clone(), plan.ctx()).into()
237}
238
239/// Builds a mapping of column names to their catalogs by looking them up from a catalog map.
240fn build_column_catalogs(
241    column_names: impl Iterator<Item = impl AsRef<str>>,
242    column_catalog_map: &HashMap<&str, &ColumnCatalog>,
243) -> Result<Vec<ColumnCatalog>> {
244    let res = column_names
245        .map(|name| {
246            let name = name.as_ref();
247            column_catalog_map
248                .get(name)
249                .map(|&c| c.clone())
250                .with_context(|| format!("Column catalog not found for column {}", name))
251        })
252        .try_collect()?;
253    Ok(res)
254}
255
256/// Sets the project field IDs for a list of files based on column names.
257fn set_project_field_ids(
258    files: &mut [FileScanTask],
259    schema: &iceberg::spec::Schema,
260    column_names: impl Iterator<Item = impl AsRef<str>>,
261) -> Result<()> {
262    let project_field_ids: Vec<i32> = column_names
263        .map(|name| {
264            let name = name.as_ref();
265            schema
266                .field_id_by_name(name)
267                .with_context(|| format!("Column {} not found in data file schema", name))
268        })
269        .try_collect()?;
270    for file in files {
271        file.project_field_ids = project_field_ids.clone();
272    }
273    Ok(())
274}
275
276/// Builds equality conditions between two sets of input references.
277fn build_equal_conditions(
278    left_inputs: Vec<InputRef>,
279    right_inputs: Vec<InputRef>,
280) -> Result<Vec<ExprImpl>> {
281    left_inputs
282        .into_iter()
283        .zip_eq_fast(right_inputs.into_iter())
284        .map(|(left, right)| {
285            Ok(FunctionCall::new(ExprType::Equal, vec![left.into(), right.into()])?.into())
286        })
287        .collect()
288}
289
290pub fn build_equality_delete_hashjoin_scan(
291    scan: &LogicalIcebergIntermediateScan,
292    column_catalog_map: &HashMap<&str, &ColumnCatalog>,
293    child: PlanRef,
294    equality_delete_files: Vec<FileScanTask>,
295    equality_delete_columns: Vec<String>,
296) -> Result<PlanRef> {
297    let column_names = equality_delete_columns
298        .iter()
299        .map(|s| s.as_str())
300        .chain(std::iter::once(ICEBERG_SEQUENCE_NUM_COLUMN_NAME));
301    let column_catalogs = build_column_catalogs(column_names, column_catalog_map)?;
302    let source = scan.core.clone_with_column_catalog(column_catalogs);
303
304    let equality_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
305        source,
306        IcebergFileScanTask::EqualityDelete(equality_delete_files),
307    )
308    .into();
309
310    let data_columns_len = child.schema().len();
311    // Build join condition: equality delete columns are equal AND sequence number is less than.
312    // Join type is LeftAnti to exclude rows that match delete records.
313    let build_inputs = |scan: &PlanRef, offset: usize| -> Result<(Vec<InputRef>, InputRef)> {
314        let delete_column_index_map = scan
315            .schema()
316            .fields()
317            .iter()
318            .enumerate()
319            .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
320            .collect::<std::collections::HashMap<_, _>>();
321        let delete_column_inputs = equality_delete_columns
322            .iter()
323            .map(|name| {
324                let (index, data_type) = delete_column_index_map
325                    .get(name)
326                    .with_context(|| format!("Delete column {} not found in scan schema", name))?;
327                Ok(InputRef {
328                    index: offset + index,
329                    data_type: (*data_type).clone(),
330                })
331            })
332            .collect::<Result<Vec<InputRef>>>()?;
333        let seq_num_inputs = InputRef {
334            index: scan
335                .schema()
336                .fields()
337                .iter()
338                .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
339                .context("Sequence number column not found in scan schema")?
340                + offset,
341            data_type: risingwave_common::types::DataType::Int64,
342        };
343        Ok((delete_column_inputs, seq_num_inputs))
344    };
345    let (left_delete_column_inputs, left_seq_num_input) = build_inputs(&child, 0)?;
346    let (right_delete_column_inputs, right_seq_num_input) =
347        build_inputs(&equality_delete_iceberg_scan, data_columns_len)?;
348
349    let mut eq_join_expr =
350        build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
351    eq_join_expr.push(
352        FunctionCall::new(
353            ExprType::LessThan,
354            vec![left_seq_num_input.into(), right_seq_num_input.into()],
355        )?
356        .into(),
357    );
358    let on = Condition {
359        conjunctions: eq_join_expr,
360    };
361    let join = LogicalJoin::new(
362        child,
363        equality_delete_iceberg_scan,
364        risingwave_pb::plan_common::JoinType::LeftAnti,
365        on,
366    );
367    Ok(join.into())
368}
369
370pub fn build_position_delete_hashjoin_scan(
371    scan: &LogicalIcebergIntermediateScan,
372    column_catalog_map: &HashMap<&str, &ColumnCatalog>,
373    child: PlanRef,
374    position_delete_files: Vec<FileScanTask>,
375) -> Result<PlanRef> {
376    // Position delete files use file path and position to identify deleted rows.
377    let delete_column_names = [ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME];
378    let column_catalogs = build_column_catalogs(delete_column_names.iter(), column_catalog_map)?;
379    let position_delete_source = scan.core.clone_with_column_catalog(column_catalogs);
380
381    let position_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
382        position_delete_source,
383        IcebergFileScanTask::PositionDelete(position_delete_files),
384    )
385    .into();
386    let data_columns_len = child.schema().len();
387
388    let build_inputs = |scan: &PlanRef, offset: usize| {
389        scan.schema()
390            .fields()
391            .iter()
392            .enumerate()
393            .filter_map(|(index, data_column)| {
394                if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
395                    || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
396                {
397                    Some(InputRef {
398                        index: offset + index,
399                        data_type: data_column.data_type(),
400                    })
401                } else {
402                    None
403                }
404            })
405            .collect::<Vec<InputRef>>()
406    };
407    let left_delete_column_inputs = build_inputs(&child, 0);
408    let right_delete_column_inputs = build_inputs(&position_delete_iceberg_scan, data_columns_len);
409    let eq_join_expr =
410        build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
411    let on = Condition {
412        conjunctions: eq_join_expr,
413    };
414    let join = LogicalJoin::new(
415        child,
416        position_delete_iceberg_scan,
417        risingwave_pb::plan_common::JoinType::LeftAnti,
418        on,
419    );
420    Ok(join.into())
421}