risingwave_frontend/optimizer/plan_node/
logical_iceberg_intermediate_scan.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use educe::Educe;
16use iceberg::expr::Predicate;
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
19
20use super::generic::GenericPlanRef;
21use super::utils::{Distill, childless_record};
22use super::{
23    ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PredicatePushdown,
24    ToBatch, ToStream, generic,
25};
26use crate::catalog::source_catalog::SourceCatalog;
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::column_names_pretty;
30use crate::optimizer::plan_node::{
31    ColumnPruningContext, LogicalFilter, LogicalProject, LogicalSource, PredicatePushdownContext,
32    RewriteStreamContext, ToStreamContext,
33};
34use crate::utils::{ColIndexMapping, Condition, to_iceberg_predicate};
35
36/// `LogicalIcebergIntermediateScan` is an intermediate plan node used during optimization
37/// of Iceberg scans. It accumulates predicates and column pruning information before
38/// being converted to the final `LogicalIcebergScan` with delete file anti-joins.
39///
40/// This node is introduced to reduce the number of Iceberg metadata reads. Instead of
41/// reading metadata when creating `LogicalIcebergScan`, we defer the metadata read
42/// until all optimizations (predicate pushdown, column pruning) are applied.
43///
44/// The optimization flow is:
45/// 1. `LogicalSource` (iceberg) -> `LogicalIcebergIntermediateScan`
46/// 2. Predicate pushdown and column pruning are applied to `LogicalIcebergIntermediateScan`
47/// 3. `LogicalIcebergIntermediateScan` -> `LogicalIcebergScan` (with anti-joins for delete files)
48#[derive(Debug, Clone, PartialEq, Educe)]
49#[educe(Hash)]
50pub struct LogicalIcebergIntermediateScan {
51    pub base: PlanBase<Logical>,
52    pub core: generic::Source,
53    #[educe(Hash(ignore))]
54    pub predicate: Predicate,
55    pub output_columns: Vec<String>,
56    pub time_travel_info: IcebergTimeTravelInfo,
57}
58
59impl Eq for LogicalIcebergIntermediateScan {}
60
61impl LogicalIcebergIntermediateScan {
62    pub fn new(logical_source: &LogicalSource, time_travel_info: IcebergTimeTravelInfo) -> Self {
63        assert!(logical_source.core.is_iceberg_connector());
64
65        let core = logical_source.core.clone();
66        let base = PlanBase::new_logical_with_core(&core);
67        let output_column = core
68            .column_catalog
69            .iter()
70            .map(|c| c.column_desc.name.clone())
71            .collect();
72
73        assert!(logical_source.output_exprs.is_none());
74
75        LogicalIcebergIntermediateScan {
76            base,
77            core,
78            predicate: Predicate::AlwaysTrue,
79            output_columns: output_column,
80            time_travel_info,
81        }
82    }
83
84    pub fn source_catalog(&self) -> Option<&SourceCatalog> {
85        self.core.catalog.as_deref()
86    }
87
88    pub fn clone_with_predicate(&self, predicate: Predicate) -> Self {
89        let new_predicate = self.predicate.clone().and(predicate);
90        LogicalIcebergIntermediateScan {
91            predicate: new_predicate,
92            ..self.clone()
93        }
94    }
95
96    pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
97        assert!(!required_cols.is_empty());
98
99        let mut core = self.core.clone();
100        let mut has_row_id = false;
101        core.column_catalog = required_cols
102            .iter()
103            .map(|idx| {
104                if Some(*idx) == core.row_id_index {
105                    has_row_id = true;
106                }
107                core.column_catalog[*idx].clone()
108            })
109            .collect();
110        if !has_row_id {
111            core.row_id_index = None;
112        }
113        let base = PlanBase::new_logical_with_core(&core);
114
115        let new_output_column = required_cols
116            .iter()
117            .map(|&i| self.output_columns[i].clone())
118            .collect();
119
120        LogicalIcebergIntermediateScan {
121            base,
122            core,
123            predicate: self.predicate.clone(),
124            output_columns: new_output_column,
125            time_travel_info: self.time_travel_info.clone(),
126        }
127    }
128}
129
130impl_plan_tree_node_for_leaf! { Logical, LogicalIcebergIntermediateScan }
131
132impl Distill for LogicalIcebergIntermediateScan {
133    fn distill<'a>(&self) -> XmlNode<'a> {
134        let verbose = self.base.ctx().is_explain_verbose();
135        let mut fields = Vec::with_capacity(if verbose { 4 } else { 2 });
136
137        if let Some(catalog) = self.source_catalog() {
138            fields.push(("source", Pretty::from(catalog.name.clone())));
139        } else {
140            fields.push(("source", Pretty::from("unknown")));
141        }
142        fields.push(("columns", column_names_pretty(self.schema())));
143
144        if verbose {
145            fields.push(("predicate", Pretty::debug(&self.predicate)));
146            fields.push(("output_columns", Pretty::debug(&self.output_columns)));
147            fields.push(("time_travel_info", Pretty::debug(&self.time_travel_info)));
148        }
149
150        childless_record("LogicalIcebergIntermediateScan", fields)
151    }
152}
153
154impl ColPrunable for LogicalIcebergIntermediateScan {
155    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
156        if required_cols.is_empty() {
157            // If required_cols is empty, we use the first column of iceberg to avoid the empty schema.
158            LogicalProject::new(self.clone_with_required_cols(&[0]).into(), vec![]).into()
159        } else {
160            self.clone_with_required_cols(required_cols).into()
161        }
162    }
163}
164
165impl ExprRewritable<Logical> for LogicalIcebergIntermediateScan {}
166
167impl ExprVisitable for LogicalIcebergIntermediateScan {}
168
169impl PredicatePushdown for LogicalIcebergIntermediateScan {
170    fn predicate_pushdown(
171        &self,
172        predicate: Condition,
173        _ctx: &mut PredicatePushdownContext,
174    ) -> PlanRef {
175        let (iceberg_predicate, upper_conditions) =
176            to_iceberg_predicate(predicate, self.schema().fields());
177        let plan = self.clone_with_predicate(iceberg_predicate).into();
178        if upper_conditions.always_true() {
179            plan
180        } else {
181            LogicalFilter::create(plan, upper_conditions)
182        }
183    }
184}
185
186impl ToBatch for LogicalIcebergIntermediateScan {
187    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
188        // This should not be called directly. The intermediate scan should be
189        // converted to LogicalIcebergScan first via the materialization rule.
190        Err(crate::error::ErrorCode::InternalError(
191            "LogicalIcebergIntermediateScan should be converted to LogicalIcebergScan before to_batch".to_owned()
192        )
193        .into())
194    }
195}
196
197impl ToStream for LogicalIcebergIntermediateScan {
198    fn to_stream(
199        &self,
200        _ctx: &mut ToStreamContext,
201    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
202        unreachable!("LogicalIcebergIntermediateScan is only for batch queries")
203    }
204
205    fn logical_rewrite_for_stream(
206        &self,
207        _ctx: &mut RewriteStreamContext,
208    ) -> Result<(PlanRef, ColIndexMapping)> {
209        unreachable!("LogicalIcebergIntermediateScan is only for batch queries")
210    }
211}