risingwave_frontend/optimizer/rule/
iceberg_intermediate_scan_rule.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This rule materializes a `LogicalIcebergIntermediateScan` to the final
16//! `LogicalIcebergScan` with delete file anti-joins.
17//!
18//! This is the final step in the Iceberg scan optimization pipeline:
19//! 1. `LogicalSource` -> `LogicalIcebergIntermediateScan`
20//! 2. Predicate pushdown and column pruning on `LogicalIcebergIntermediateScan`
21//! 3. `LogicalIcebergIntermediateScan` -> `LogicalIcebergScan` (this rule)
22//!
23//! At this point, the intermediate scan has accumulated:
24//! - The predicate to be pushed down to Iceberg
25//! - The output column indices for projection
26//!
27//! This rule:
28//! 1. Reads file scan tasks from Iceberg (data files and delete files)
29//! 2. Creates the `LogicalIcebergScan` for data files with pre-computed splits
30//! 3. Creates anti-joins for equality delete and position delete files
31//! 4. Adds a project if output columns differ from scan columns
32
33use std::collections::HashMap;
34
35use anyhow::Context;
36use iceberg::scan::FileScanTask;
37use iceberg::spec::{DataContentType, FormatVersion};
38use risingwave_common::catalog::{
39    ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
40    ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
41};
42use risingwave_common::util::iter_util::ZipEqFast;
43use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergSplitEnumerator};
44use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
45
46use super::prelude::{PlanRef, *};
47use crate::error::Result;
48use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
49use crate::optimizer::plan_node::generic::GenericPlanRef;
50use crate::optimizer::plan_node::{
51    Logical, LogicalIcebergIntermediateScan, LogicalIcebergScan, LogicalJoin, LogicalProject,
52    LogicalValues,
53};
54use crate::optimizer::rule::{ApplyResult, FallibleRule};
55use crate::utils::{ColIndexMapping, Condition, FRONTEND_RUNTIME};
56
57pub struct IcebergIntermediateScanRule;
58
59impl FallibleRule<Logical> for IcebergIntermediateScanRule {
60    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
61        let scan: &LogicalIcebergIntermediateScan =
62            match plan.as_logical_iceberg_intermediate_scan() {
63                Some(s) => s,
64                None => return ApplyResult::NotApplicable,
65            };
66
67        let Some(catalog) = scan.source_catalog() else {
68            return ApplyResult::NotApplicable;
69        };
70
71        // Create the IcebergSplitEnumerator to get file scan tasks
72        let enumerator = if let ConnectorProperties::Iceberg(prop) =
73            ConnectorProperties::extract(catalog.with_properties.clone(), false)?
74        {
75            IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
76        } else {
77            return ApplyResult::NotApplicable;
78        };
79
80        #[cfg(madsim)]
81        return ApplyResult::Err(
82            crate::error::ErrorCode::BindError(
83                "iceberg_scan can't be used in the madsim mode".to_string(),
84            )
85            .into(),
86        );
87
88        #[cfg(not(madsim))]
89        {
90            use risingwave_connector::source::iceberg::IcebergListResult;
91
92            let list_result = tokio::task::block_in_place(|| {
93                FRONTEND_RUNTIME.block_on(enumerator.list_scan_tasks(
94                    Some(scan.time_travel_info.clone()),
95                    scan.iceberg_predicate.clone(),
96                ))
97            })?;
98            let Some(IcebergListResult {
99                mut data_files,
100                mut equality_delete_files,
101                mut position_delete_files,
102                equality_delete_columns,
103                format_version,
104                schema: table_schema,
105            }) = list_result
106            else {
107                tracing::info!(
108                    "There is no valid snapshot for the Iceberg table, returning empty table plan"
109                );
110                return ApplyResult::Ok(empty_table_plan(&plan, scan));
111            };
112            if data_files.is_empty() {
113                tracing::info!(
114                    "There is no data file for the Iceberg table, returning empty table plan"
115                );
116                return ApplyResult::Ok(empty_table_plan(&plan, scan));
117            }
118
119            // Build the data file scan with pre-computed splits
120            let mut projection_columns: Vec<&str> = scan
121                .output_columns()
122                .chain(equality_delete_columns.iter().map(|s| s.as_str()))
123                .collect();
124            projection_columns.sort_unstable_by_key(|&s| table_schema.field_id_by_name(s));
125            projection_columns.dedup();
126            set_project_field_ids(
127                &mut data_files,
128                table_schema.as_ref(),
129                projection_columns.iter(),
130            )?;
131            match format_version {
132                FormatVersion::V1 | FormatVersion::V2 => {
133                    for file in &mut data_files {
134                        file.deletes.clear();
135                    }
136                }
137                FormatVersion::V3 => {
138                    for file in &mut data_files {
139                        file.deletes.retain(|delete| {
140                            delete.data_file_content == DataContentType::PositionDeletes
141                        });
142                    }
143                }
144            }
145
146            let column_catalog_map: HashMap<&str, &ColumnCatalog> = catalog
147                .columns
148                .iter()
149                .map(|c| (c.column_desc.name.as_str(), c))
150                .collect();
151            if !equality_delete_files.is_empty() {
152                projection_columns.push(ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
153            }
154            let use_position_delete_join =
155                !position_delete_files.is_empty() && format_version < FormatVersion::V3;
156            if use_position_delete_join {
157                projection_columns.push(ICEBERG_FILE_PATH_COLUMN_NAME);
158                projection_columns.push(ICEBERG_FILE_POS_COLUMN_NAME);
159            }
160            let column_catalogs =
161                build_column_catalogs(projection_columns.iter(), &column_catalog_map)?;
162            let core = scan.core.clone_with_column_catalog(column_catalogs);
163            let mut plan: PlanRef =
164                LogicalIcebergScan::new(core, IcebergFileScanTask::Data(data_files)).into();
165
166            // Add anti-join for equality delete files
167            if !equality_delete_files.is_empty() {
168                set_project_field_ids(
169                    &mut equality_delete_files,
170                    table_schema.as_ref(),
171                    equality_delete_columns.iter(),
172                )?;
173                plan = build_equality_delete_hashjoin_scan(
174                    scan,
175                    &column_catalog_map,
176                    plan,
177                    equality_delete_files,
178                    equality_delete_columns,
179                )?;
180            }
181
182            // Add anti-join for position delete files
183            if use_position_delete_join {
184                set_project_field_ids(
185                    &mut position_delete_files,
186                    table_schema.as_ref(),
187                    std::iter::empty::<&str>(),
188                )?;
189                plan = build_position_delete_hashjoin_scan(
190                    scan,
191                    &column_catalog_map,
192                    plan,
193                    position_delete_files,
194                )?;
195            }
196
197            // Add projection if output columns differ from scan columns
198            let schema_len = plan.schema().len();
199            let schema_names = plan.schema().fields.iter().map(|f| f.name.as_str());
200            let output_columns = scan.output_columns();
201            if schema_len != output_columns.len() || !itertools::equal(schema_names, output_columns)
202            {
203                let col_map: HashMap<&str, usize> = plan
204                    .schema()
205                    .fields
206                    .iter()
207                    .enumerate()
208                    .map(|(idx, field)| (field.name.as_str(), idx))
209                    .collect();
210                let output_col_idx: Vec<_> = scan
211                    .output_columns()
212                    .map(|col| {
213                        col_map.get(col).copied().with_context(|| {
214                            format!("Output column {} not found in scan schema", col)
215                        })
216                    })
217                    .try_collect()?;
218                let mapping = ColIndexMapping::with_remaining_columns(&output_col_idx, schema_len);
219                plan = LogicalProject::with_mapping(plan, mapping).into();
220            }
221
222            // For Iceberg engine tables, the intermediate scan's output schema has
223            // Hummock types (via table_column_type_mapping), but the LogicalIcebergScan
224            // reads from Iceberg with Iceberg types. Add casts to match the expected
225            // Hummock output types.
226            if scan.has_type_mapping() {
227                let cast_exprs: Vec<ExprImpl> = plan
228                    .schema()
229                    .fields
230                    .iter()
231                    .enumerate()
232                    .map(|(i, field)| {
233                        let input_ref: ExprImpl = InputRef::new(i, field.data_type.clone()).into();
234                        if let Some(target_type) = scan.table_column_type_mapping.get(&field.name) {
235                            if &field.data_type != target_type {
236                                match input_ref.cast_explicit(target_type) {
237                                    Ok(casted) => casted,
238                                    Err(_) => InputRef::new(i, field.data_type.clone()).into(),
239                                }
240                            } else {
241                                input_ref
242                            }
243                        } else {
244                            input_ref
245                        }
246                    })
247                    .collect();
248                plan = LogicalProject::create(plan, cast_exprs);
249            }
250
251            ApplyResult::Ok(plan)
252        }
253    }
254}
255
256impl IcebergIntermediateScanRule {
257    pub fn create() -> BoxedRule {
258        Box::new(IcebergIntermediateScanRule)
259    }
260}
261
262/// Returns an empty table plan with the same schema as the scan.
263fn empty_table_plan(plan: &PlanRef, scan: &LogicalIcebergIntermediateScan) -> PlanRef {
264    LogicalValues::new(vec![], scan.schema().clone(), plan.ctx()).into()
265}
266
267/// Builds a mapping of column names to their catalogs by looking them up from a catalog map.
268fn build_column_catalogs(
269    column_names: impl Iterator<Item = impl AsRef<str>>,
270    column_catalog_map: &HashMap<&str, &ColumnCatalog>,
271) -> Result<Vec<ColumnCatalog>> {
272    let res = column_names
273        .map(|name| {
274            let name = name.as_ref();
275            column_catalog_map
276                .get(name)
277                .map(|&c| c.clone())
278                .with_context(|| format!("Column catalog not found for column {}", name))
279        })
280        .try_collect()?;
281    Ok(res)
282}
283
284/// Sets the project field IDs for a list of files based on column names.
285fn set_project_field_ids(
286    files: &mut [FileScanTask],
287    schema: &iceberg::spec::Schema,
288    column_names: impl Iterator<Item = impl AsRef<str>>,
289) -> Result<()> {
290    let project_field_ids: Vec<i32> = column_names
291        .map(|name| {
292            let name = name.as_ref();
293            schema
294                .field_id_by_name(name)
295                .with_context(|| format!("Column {} not found in table schema", name))
296        })
297        .try_collect()?;
298    for file in files {
299        file.project_field_ids = project_field_ids.clone();
300    }
301    Ok(())
302}
303
304/// Builds equality conditions between two sets of input references.
305fn build_equal_conditions(
306    left_inputs: Vec<InputRef>,
307    right_inputs: Vec<InputRef>,
308) -> Result<Vec<ExprImpl>> {
309    left_inputs
310        .into_iter()
311        .zip_eq_fast(right_inputs.into_iter())
312        .map(|(left, right)| {
313            Ok(FunctionCall::new(ExprType::Equal, vec![left.into(), right.into()])?.into())
314        })
315        .collect()
316}
317
318pub fn build_equality_delete_hashjoin_scan(
319    scan: &LogicalIcebergIntermediateScan,
320    column_catalog_map: &HashMap<&str, &ColumnCatalog>,
321    child: PlanRef,
322    equality_delete_files: Vec<FileScanTask>,
323    equality_delete_columns: Vec<String>,
324) -> Result<PlanRef> {
325    let column_names = equality_delete_columns
326        .iter()
327        .map(|s| s.as_str())
328        .chain(std::iter::once(ICEBERG_SEQUENCE_NUM_COLUMN_NAME));
329    let column_catalogs = build_column_catalogs(column_names, column_catalog_map)?;
330    let source = scan.core.clone_with_column_catalog(column_catalogs);
331
332    let equality_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
333        source,
334        IcebergFileScanTask::EqualityDelete(equality_delete_files),
335    )
336    .into();
337
338    let data_columns_len = child.schema().len();
339    // Build join condition: equality delete columns are equal AND sequence number is less than.
340    // Join type is LeftAnti to exclude rows that match delete records.
341    let build_inputs = |scan: &PlanRef, offset: usize| -> Result<(Vec<InputRef>, InputRef)> {
342        let delete_column_index_map = scan
343            .schema()
344            .fields()
345            .iter()
346            .enumerate()
347            .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
348            .collect::<std::collections::HashMap<_, _>>();
349        let delete_column_inputs = equality_delete_columns
350            .iter()
351            .map(|name| {
352                let (index, data_type) = delete_column_index_map
353                    .get(name)
354                    .with_context(|| format!("Delete column {} not found in scan schema", name))?;
355                Ok(InputRef {
356                    index: offset + index,
357                    data_type: (*data_type).clone(),
358                })
359            })
360            .collect::<Result<Vec<InputRef>>>()?;
361        let seq_num_inputs = InputRef {
362            index: scan
363                .schema()
364                .fields()
365                .iter()
366                .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
367                .context("Sequence number column not found in scan schema")?
368                + offset,
369            data_type: risingwave_common::types::DataType::Int64,
370        };
371        Ok((delete_column_inputs, seq_num_inputs))
372    };
373    let (left_delete_column_inputs, left_seq_num_input) = build_inputs(&child, 0)?;
374    let (right_delete_column_inputs, right_seq_num_input) =
375        build_inputs(&equality_delete_iceberg_scan, data_columns_len)?;
376
377    let mut eq_join_expr =
378        build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
379    eq_join_expr.push(
380        FunctionCall::new(
381            ExprType::LessThan,
382            vec![left_seq_num_input.into(), right_seq_num_input.into()],
383        )?
384        .into(),
385    );
386    let on = Condition {
387        conjunctions: eq_join_expr,
388    };
389    let join = LogicalJoin::new(
390        child,
391        equality_delete_iceberg_scan,
392        risingwave_pb::plan_common::JoinType::LeftAnti,
393        on,
394    );
395    Ok(join.into())
396}
397
398pub fn build_position_delete_hashjoin_scan(
399    scan: &LogicalIcebergIntermediateScan,
400    column_catalog_map: &HashMap<&str, &ColumnCatalog>,
401    child: PlanRef,
402    position_delete_files: Vec<FileScanTask>,
403) -> Result<PlanRef> {
404    // Position delete files use file path and position to identify deleted rows.
405    let delete_column_names = [ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME];
406    let column_catalogs = build_column_catalogs(delete_column_names.iter(), column_catalog_map)?;
407    let position_delete_source = scan.core.clone_with_column_catalog(column_catalogs);
408
409    let position_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
410        position_delete_source,
411        IcebergFileScanTask::PositionDelete(position_delete_files),
412    )
413    .into();
414    let data_columns_len = child.schema().len();
415
416    let build_inputs = |scan: &PlanRef, offset: usize| {
417        scan.schema()
418            .fields()
419            .iter()
420            .enumerate()
421            .filter_map(|(index, data_column)| {
422                if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
423                    || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
424                {
425                    Some(InputRef {
426                        index: offset + index,
427                        data_type: data_column.data_type(),
428                    })
429                } else {
430                    None
431                }
432            })
433            .collect::<Vec<InputRef>>()
434    };
435    let left_delete_column_inputs = build_inputs(&child, 0);
436    let right_delete_column_inputs = build_inputs(&position_delete_iceberg_scan, data_columns_len);
437    let eq_join_expr =
438        build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
439    let on = Condition {
440        conjunctions: eq_join_expr,
441    };
442    let join = LogicalJoin::new(
443        child,
444        position_delete_iceberg_scan,
445        risingwave_pb::plan_common::JoinType::LeftAnti,
446        on,
447    );
448    Ok(join.into())
449}