risingwave_frontend/optimizer/rule/
iceberg_intermediate_scan_rule.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This rule materializes a `LogicalIcebergIntermediateScan` to the final
16//! `LogicalIcebergScan` with delete file anti-joins.
17//!
18//! This is the final step in the Iceberg scan optimization pipeline:
19//! 1. `LogicalSource` -> `LogicalIcebergIntermediateScan`
20//! 2. Predicate pushdown and column pruning on `LogicalIcebergIntermediateScan`
21//! 3. `LogicalIcebergIntermediateScan` -> `LogicalIcebergScan` (this rule)
22//!
23//! At this point, the intermediate scan has accumulated:
24//! - The predicate to be pushed down to Iceberg
25//! - The output column indices for projection
26//!
27//! This rule:
28//! 1. Reads file scan tasks from Iceberg (data files and delete files)
29//! 2. Creates the `LogicalIcebergScan` for data files with pre-computed splits
30//! 3. Creates anti-joins for equality delete and position delete files
31//! 4. Adds a project if output columns differ from scan columns
32
33use std::collections::HashMap;
34
35use anyhow::Context;
36use iceberg::scan::FileScanTask;
37use risingwave_common::catalog::{
38    ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
39    ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
40};
41use risingwave_common::util::iter_util::ZipEqFast;
42use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergSplitEnumerator};
43use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
44
45use super::prelude::{PlanRef, *};
46use crate::error::Result;
47use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
48use crate::optimizer::plan_node::generic::GenericPlanRef;
49use crate::optimizer::plan_node::{
50    Logical, LogicalIcebergIntermediateScan, LogicalIcebergScan, LogicalJoin, LogicalProject,
51    LogicalValues,
52};
53use crate::optimizer::rule::{ApplyResult, FallibleRule};
54use crate::utils::{ColIndexMapping, Condition, FRONTEND_RUNTIME};
55
56pub struct IcebergIntermediateScanRule;
57
58impl FallibleRule<Logical> for IcebergIntermediateScanRule {
59    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
60        let scan: &LogicalIcebergIntermediateScan =
61            match plan.as_logical_iceberg_intermediate_scan() {
62                Some(s) => s,
63                None => return ApplyResult::NotApplicable,
64            };
65
66        let Some(catalog) = scan.source_catalog() else {
67            return ApplyResult::NotApplicable;
68        };
69
70        // Create the IcebergSplitEnumerator to get file scan tasks
71        let enumerator = if let ConnectorProperties::Iceberg(prop) =
72            ConnectorProperties::extract(catalog.with_properties.clone(), false)?
73        {
74            IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
75        } else {
76            return ApplyResult::NotApplicable;
77        };
78
79        #[cfg(madsim)]
80        return ApplyResult::Err(
81            crate::error::ErrorCode::BindError(
82                "iceberg_scan can't be used in the madsim mode".to_string(),
83            )
84            .into(),
85        );
86
87        #[cfg(not(madsim))]
88        {
89            use risingwave_connector::source::iceberg::IcebergListResult;
90
91            let list_result =
92                tokio::task::block_in_place(|| {
93                    FRONTEND_RUNTIME.block_on(enumerator.list_scan_tasks(
94                        Some(scan.time_travel_info.clone()),
95                        scan.predicate.clone(),
96                    ))
97                })?;
98            let Some(IcebergListResult {
99                mut data_files,
100                mut equality_delete_files,
101                mut position_delete_files,
102                equality_delete_columns,
103            }) = list_result
104            else {
105                tracing::info!(
106                    "There is no valid snapshot for the Iceberg table, returning empty table plan"
107                );
108                return ApplyResult::Ok(empty_table_plan(&plan, scan));
109            };
110            if data_files.is_empty() {
111                tracing::info!(
112                    "There is no data file for the Iceberg table, returning empty table plan"
113                );
114                return ApplyResult::Ok(empty_table_plan(&plan, scan));
115            }
116
117            // Build the data file scan with pre-computed splits
118            let schema = data_files[0].schema.clone();
119            let mut projection_columns: Vec<&str> = scan
120                .output_columns
121                .iter()
122                .chain(&equality_delete_columns)
123                .map(|col| col.as_str())
124                .collect();
125            projection_columns.sort_unstable_by_key(|&s| schema.field_id_by_name(s));
126            projection_columns.dedup();
127            set_project_field_ids(&mut data_files, &schema, projection_columns.iter())?;
128            for file in &mut data_files {
129                file.deletes.clear();
130            }
131
132            let column_catalog_map: HashMap<&str, &ColumnCatalog> = catalog
133                .columns
134                .iter()
135                .map(|c| (c.column_desc.name.as_str(), c))
136                .collect();
137            if !equality_delete_files.is_empty() {
138                projection_columns.push(ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
139            }
140            if !position_delete_files.is_empty() {
141                projection_columns.push(ICEBERG_FILE_PATH_COLUMN_NAME);
142                projection_columns.push(ICEBERG_FILE_POS_COLUMN_NAME);
143            }
144            let column_catalogs =
145                build_column_catalogs(projection_columns.iter(), &column_catalog_map)?;
146            let core = scan.core.clone_with_column_catalog(column_catalogs);
147            let mut plan: PlanRef =
148                LogicalIcebergScan::new(core, IcebergFileScanTask::Data(data_files)).into();
149
150            // Add anti-join for equality delete files
151            if !equality_delete_files.is_empty() {
152                set_project_field_ids(
153                    &mut equality_delete_files,
154                    &schema,
155                    equality_delete_columns.iter(),
156                )?;
157                plan = build_equality_delete_hashjoin_scan(
158                    scan,
159                    &column_catalog_map,
160                    plan,
161                    equality_delete_files,
162                    equality_delete_columns,
163                )?;
164            }
165
166            // Add anti-join for position delete files
167            if !position_delete_files.is_empty() {
168                set_project_field_ids(
169                    &mut position_delete_files,
170                    &schema,
171                    std::iter::empty::<&str>(),
172                )?;
173                plan = build_position_delete_hashjoin_scan(
174                    scan,
175                    &column_catalog_map,
176                    plan,
177                    position_delete_files,
178                )?;
179            }
180
181            // Add projection if output columns differ from scan columns
182            let schema_len = plan.schema().len();
183            let schema_names = plan.schema().fields.iter().map(|f| f.name.as_str());
184            let output_names = scan.output_columns.iter().map(|s| s.as_str());
185            if schema_len != scan.output_columns.len()
186                || !itertools::equal(schema_names, output_names)
187            {
188                let col_map: HashMap<&str, usize> = plan
189                    .schema()
190                    .fields
191                    .iter()
192                    .enumerate()
193                    .map(|(idx, field)| (field.name.as_str(), idx))
194                    .collect();
195                let output_col_idx: Vec<_> = scan
196                    .output_columns
197                    .iter()
198                    .map(|col| {
199                        col_map.get(col.as_str()).copied().with_context(|| {
200                            format!("Output column {} not found in scan schema", col)
201                        })
202                    })
203                    .try_collect()?;
204                let mapping = ColIndexMapping::with_remaining_columns(&output_col_idx, schema_len);
205                plan = LogicalProject::with_mapping(plan, mapping).into();
206            }
207
208            ApplyResult::Ok(plan)
209        }
210    }
211}
212
213impl IcebergIntermediateScanRule {
214    pub fn create() -> BoxedRule {
215        Box::new(IcebergIntermediateScanRule)
216    }
217}
218
219/// Returns an empty table plan with the same schema as the scan.
220fn empty_table_plan(plan: &PlanRef, scan: &LogicalIcebergIntermediateScan) -> PlanRef {
221    LogicalValues::new(vec![], scan.schema().clone(), plan.ctx()).into()
222}
223
224/// Builds a mapping of column names to their catalogs by looking them up from a catalog map.
225fn build_column_catalogs(
226    column_names: impl Iterator<Item = impl AsRef<str>>,
227    column_catalog_map: &HashMap<&str, &ColumnCatalog>,
228) -> Result<Vec<ColumnCatalog>> {
229    let res = column_names
230        .map(|name| {
231            let name = name.as_ref();
232            column_catalog_map
233                .get(name)
234                .map(|&c| c.clone())
235                .with_context(|| format!("Column catalog not found for column {}", name))
236        })
237        .try_collect()?;
238    Ok(res)
239}
240
241/// Sets the project field IDs for a list of files based on column names.
242fn set_project_field_ids(
243    files: &mut [FileScanTask],
244    schema: &iceberg::spec::Schema,
245    column_names: impl Iterator<Item = impl AsRef<str>>,
246) -> Result<()> {
247    let project_field_ids: Vec<i32> = column_names
248        .map(|name| {
249            let name = name.as_ref();
250            schema
251                .field_id_by_name(name)
252                .with_context(|| format!("Column {} not found in data file schema", name))
253        })
254        .try_collect()?;
255    for file in files {
256        file.project_field_ids = project_field_ids.clone();
257    }
258    Ok(())
259}
260
261/// Builds equality conditions between two sets of input references.
262fn build_equal_conditions(
263    left_inputs: Vec<InputRef>,
264    right_inputs: Vec<InputRef>,
265) -> Result<Vec<ExprImpl>> {
266    left_inputs
267        .into_iter()
268        .zip_eq_fast(right_inputs.into_iter())
269        .map(|(left, right)| {
270            Ok(FunctionCall::new(ExprType::Equal, vec![left.into(), right.into()])?.into())
271        })
272        .collect()
273}
274
275pub fn build_equality_delete_hashjoin_scan(
276    scan: &LogicalIcebergIntermediateScan,
277    column_catalog_map: &HashMap<&str, &ColumnCatalog>,
278    child: PlanRef,
279    equality_delete_files: Vec<FileScanTask>,
280    equality_delete_columns: Vec<String>,
281) -> Result<PlanRef> {
282    let column_names = equality_delete_columns
283        .iter()
284        .map(|s| s.as_str())
285        .chain(std::iter::once(ICEBERG_SEQUENCE_NUM_COLUMN_NAME));
286    let column_catalogs = build_column_catalogs(column_names, column_catalog_map)?;
287    let source = scan.core.clone_with_column_catalog(column_catalogs);
288
289    let equality_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
290        source,
291        IcebergFileScanTask::EqualityDelete(equality_delete_files),
292    )
293    .into();
294
295    let data_columns_len = child.schema().len();
296    // Build join condition: equality delete columns are equal AND sequence number is less than.
297    // Join type is LeftAnti to exclude rows that match delete records.
298    let build_inputs = |scan: &PlanRef, offset: usize| -> Result<(Vec<InputRef>, InputRef)> {
299        let delete_column_index_map = scan
300            .schema()
301            .fields()
302            .iter()
303            .enumerate()
304            .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
305            .collect::<std::collections::HashMap<_, _>>();
306        let delete_column_inputs = equality_delete_columns
307            .iter()
308            .map(|name| {
309                let (index, data_type) = delete_column_index_map
310                    .get(name)
311                    .with_context(|| format!("Delete column {} not found in scan schema", name))?;
312                Ok(InputRef {
313                    index: offset + index,
314                    data_type: (*data_type).clone(),
315                })
316            })
317            .collect::<Result<Vec<InputRef>>>()?;
318        let seq_num_inputs = InputRef {
319            index: scan
320                .schema()
321                .fields()
322                .iter()
323                .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
324                .context("Sequence number column not found in scan schema")?
325                + offset,
326            data_type: risingwave_common::types::DataType::Int64,
327        };
328        Ok((delete_column_inputs, seq_num_inputs))
329    };
330    let (left_delete_column_inputs, left_seq_num_input) = build_inputs(&child, 0)?;
331    let (right_delete_column_inputs, right_seq_num_input) =
332        build_inputs(&equality_delete_iceberg_scan, data_columns_len)?;
333
334    let mut eq_join_expr =
335        build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
336    eq_join_expr.push(
337        FunctionCall::new(
338            ExprType::LessThan,
339            vec![left_seq_num_input.into(), right_seq_num_input.into()],
340        )?
341        .into(),
342    );
343    let on = Condition {
344        conjunctions: eq_join_expr,
345    };
346    let join = LogicalJoin::new(
347        child,
348        equality_delete_iceberg_scan,
349        risingwave_pb::plan_common::JoinType::LeftAnti,
350        on,
351    );
352    Ok(join.into())
353}
354
355pub fn build_position_delete_hashjoin_scan(
356    scan: &LogicalIcebergIntermediateScan,
357    column_catalog_map: &HashMap<&str, &ColumnCatalog>,
358    child: PlanRef,
359    position_delete_files: Vec<FileScanTask>,
360) -> Result<PlanRef> {
361    // Position delete files use file path and position to identify deleted rows.
362    let delete_column_names = [ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME];
363    let column_catalogs = build_column_catalogs(delete_column_names.iter(), column_catalog_map)?;
364    let position_delete_source = scan.core.clone_with_column_catalog(column_catalogs);
365
366    let position_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
367        position_delete_source,
368        IcebergFileScanTask::PositionDelete(position_delete_files),
369    )
370    .into();
371    let data_columns_len = child.schema().len();
372
373    let build_inputs = |scan: &PlanRef, offset: usize| {
374        scan.schema()
375            .fields()
376            .iter()
377            .enumerate()
378            .filter_map(|(index, data_column)| {
379                if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
380                    || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
381                {
382                    Some(InputRef {
383                        index: offset + index,
384                        data_type: data_column.data_type(),
385                    })
386                } else {
387                    None
388                }
389            })
390            .collect::<Vec<InputRef>>()
391    };
392    let left_delete_column_inputs = build_inputs(&child, 0);
393    let right_delete_column_inputs = build_inputs(&position_delete_iceberg_scan, data_columns_len);
394    let eq_join_expr =
395        build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
396    let on = Condition {
397        conjunctions: eq_join_expr,
398    };
399    let join = LogicalJoin::new(
400        child,
401        position_delete_iceberg_scan,
402        risingwave_pb::plan_common::JoinType::LeftAnti,
403        on,
404    );
405    Ok(join.into())
406}