risingwave_frontend/optimizer/rule/
source_to_iceberg_intermediate_scan_rule.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This rule converts a `LogicalSource` with an Iceberg connector to a
16//! `LogicalIcebergIntermediateScan`. The intermediate scan node is used to
17//! accumulate predicates and column pruning information before being
18//! materialized to the final `LogicalIcebergScan` with delete file anti-joins.
19//!
20//! This is the first step in the Iceberg scan optimization pipeline:
21//! 1. `LogicalSource` -> `LogicalIcebergIntermediateScan` (this rule)
22//! 2. Predicate pushdown and column pruning on `LogicalIcebergIntermediateScan`
23//! 3. `LogicalIcebergIntermediateScan` -> `LogicalIcebergScan` (materialization rule)
24
25use std::collections::HashMap;
26
27#[cfg(not(madsim))]
28use risingwave_connector::source::ConnectorProperties;
29#[cfg(not(madsim))]
30use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
31
32use super::prelude::{PlanRef, *};
33use crate::error::Result;
34#[cfg(not(madsim))]
35use crate::optimizer::plan_node::LogicalSource;
36#[cfg(not(madsim))]
37use crate::optimizer::plan_node::LogicalValues;
38use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
39use crate::optimizer::plan_node::{Logical, LogicalIcebergIntermediateScan};
40use crate::optimizer::rule::{ApplyResult, FallibleRule};
41
42pub struct SourceToIcebergIntermediateScanRule;
43
44impl FallibleRule<Logical> for SourceToIcebergIntermediateScanRule {
45    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
46        let Some(source) = plan.as_logical_source() else {
47            return ApplyResult::NotApplicable;
48        };
49
50        if !source.core.is_iceberg_connector() {
51            return ApplyResult::NotApplicable;
52        }
53
54        #[cfg(madsim)]
55        return ApplyResult::Err(
56            crate::error::ErrorCode::BindError(
57                "iceberg_scan can't be used in the madsim mode".to_string(),
58            )
59            .into(),
60        );
61
62        #[cfg(not(madsim))]
63        {
64            // If time travel is not specified, we use current timestamp to get the latest snapshot
65            let timezone = plan.ctx().get_session_timezone();
66            let mut time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
67            if time_travel_info.is_none() {
68                time_travel_info =
69                    fetch_current_snapshot_id(&mut plan.ctx().iceberg_snapshot_id_map(), source)?
70                        .map(IcebergTimeTravelInfo::Version);
71            }
72            let Some(time_travel_info) = time_travel_info else {
73                return ApplyResult::Ok(
74                    LogicalValues::new(vec![], plan.schema().clone(), plan.ctx()).into(),
75                );
76            };
77            let intermediate_scan = LogicalIcebergIntermediateScan::new(source, time_travel_info);
78            ApplyResult::Ok(intermediate_scan.into())
79        }
80    }
81}
82
83#[cfg(not(madsim))]
84fn fetch_current_snapshot_id(
85    map: &mut HashMap<String, Option<i64>>,
86    source: &LogicalSource,
87) -> Result<Option<i64>> {
88    let catalog = source.source_catalog().ok_or_else(|| {
89        crate::error::ErrorCode::InternalError(
90            "Iceberg source must have a valid source catalog".to_owned(),
91        )
92    })?;
93    let name = catalog.name.as_str();
94    if let Some(&snapshot_id) = map.get(name) {
95        return Ok(snapshot_id);
96    }
97
98    let ConnectorProperties::Iceberg(prop) =
99        ConnectorProperties::extract(catalog.with_properties.clone(), false)?
100    else {
101        return Err(crate::error::ErrorCode::InternalError(
102            "Iceberg source must have Iceberg connector properties".to_owned(),
103        )
104        .into());
105    };
106
107    let snapshot_id = tokio::task::block_in_place(|| {
108        crate::utils::FRONTEND_RUNTIME.block_on(async {
109            prop.load_table()
110                .await
111                .map(|table| table.metadata().current_snapshot_id())
112        })
113    })?;
114    map.insert(name.to_owned(), snapshot_id);
115    Ok(snapshot_id)
116}
117
118impl SourceToIcebergIntermediateScanRule {
119    pub fn create() -> BoxedRule {
120        Box::new(SourceToIcebergIntermediateScanRule)
121    }
122}