risingwave_frontend/optimizer/plan_node/
logical_iceberg_intermediate_scan.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use educe::Educe;
18use iceberg::expr::Predicate;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::types::DataType;
22use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
23
24use super::generic::GenericPlanRef;
25use super::utils::{Distill, childless_record};
26use super::{
27    ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PredicatePushdown,
28    ToBatch, ToStream, generic,
29};
30use crate::catalog::source_catalog::SourceCatalog;
31use crate::error::Result;
32use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
33use crate::optimizer::plan_node::utils::column_names_pretty;
34use crate::optimizer::plan_node::{
35    ColumnPruningContext, LogicalFilter, LogicalProject, LogicalSource, PredicatePushdownContext,
36    RewriteStreamContext, ToStreamContext,
37};
38use crate::utils::{
39    ColIndexMapping, Condition, ExtractIcebergPredicateResult, extract_iceberg_predicate,
40};
41
42/// Predicate and column mapping needed when rewriting an Iceberg scan to a
43/// Hummock `LogicalScan`. Only consumed by `IcebergEngineStorageSelectionRule`;
44/// the normal Iceberg path ignores these fields.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct HummockRewriteInfo {
47    /// The accumulated predicate over *table* column indices (not output columns),
48    /// built from the extracted portion of pushed-down predicates.
49    pub origin_condition: Condition,
50    /// Maps current output column indices → original table column indices.
51    pub output_column_mapping: ColIndexMapping,
52}
53
54impl HummockRewriteInfo {
55    /// Create with an initial source→table column mapping.
56    /// For non-engine iceberg sources (no associated table), pass an identity mapping.
57    pub fn new(source_to_table_mapping: ColIndexMapping) -> Self {
58        Self {
59            origin_condition: Condition::true_cond(),
60            output_column_mapping: source_to_table_mapping,
61        }
62    }
63
64    /// Accumulate a new predicate, remapping it through the current column mapping.
65    pub fn add_predicate(&self, extracted_condition: Condition) -> Self {
66        let mut mapping = self.output_column_mapping.clone();
67        let remapped = extracted_condition.rewrite_expr(&mut mapping);
68        Self {
69            origin_condition: self.origin_condition.clone().and(remapped),
70            output_column_mapping: mapping,
71        }
72    }
73
74    /// Prune columns: compose the column mapping so that the new output indices
75    /// still map back to the original table column indices.
76    pub fn prune_columns(&self, required_cols: &[usize]) -> Self {
77        let map = required_cols
78            .iter()
79            .map(|&idx| Some(self.output_column_mapping.map(idx)))
80            .collect();
81        Self {
82            origin_condition: self.origin_condition.clone(),
83            output_column_mapping: ColIndexMapping::new(
84                map,
85                self.output_column_mapping.target_size(),
86            ),
87        }
88    }
89}
90
91/// `LogicalIcebergIntermediateScan` is an intermediate plan node used during optimization
92/// of Iceberg scans. It accumulates predicates and column pruning information before
93/// being converted to the final `LogicalIcebergScan` with delete file anti-joins.
94///
95/// This node is introduced to reduce the number of Iceberg metadata reads. Instead of
96/// reading metadata when creating `LogicalIcebergScan`, we defer the metadata read
97/// until all optimizations (predicate pushdown, column pruning) are applied.
98///
99/// The optimization flow is:
100/// 1. `LogicalSource` (iceberg) -> `LogicalIcebergIntermediateScan`
101/// 2. Predicate pushdown and column pruning are applied to `LogicalIcebergIntermediateScan`
102/// 3. `LogicalIcebergIntermediateScan` -> `LogicalIcebergScan` (with anti-joins for delete files)
103#[derive(Debug, Clone, PartialEq, Educe)]
104#[educe(Hash)]
105pub struct LogicalIcebergIntermediateScan {
106    pub base: PlanBase<Logical>,
107    pub core: generic::Source,
108    #[educe(Hash(ignore))]
109    pub iceberg_predicate: Predicate,
110    pub time_travel_info: IcebergTimeTravelInfo,
111    /// For Iceberg engine tables: maps source column name → target Hummock `DataType`.
112    /// This remapping is applied to the output schema so that the intermediate scan's
113    /// output types match the Hummock table types, avoiding unnecessary double casts
114    /// when the storage selection rule rewrites to a Hummock `LogicalScan`.
115    /// Empty for non-engine-table Iceberg sources.
116    #[educe(Hash(ignore))]
117    pub table_column_type_mapping: HashMap<String, DataType>,
118    /// Info needed only when rewriting to a Hummock row-store scan.
119    #[educe(Hash(ignore))]
120    pub hummock_rewrite: HummockRewriteInfo,
121}
122
123impl Eq for LogicalIcebergIntermediateScan {}
124
125impl LogicalIcebergIntermediateScan {
126    pub fn new(
127        logical_source: &LogicalSource,
128        time_travel_info: IcebergTimeTravelInfo,
129        table_column_type_mapping: HashMap<String, DataType>,
130        // Maps source-column indices to Hummock table-column indices so that
131        // `HummockRewriteInfo` tracks predicates and projections in table
132        // index space. Pass `ColIndexMapping::identity(n)` when there is no
133        // associated Hummock table (e.g. standalone iceberg sources).
134        source_to_table_mapping: ColIndexMapping,
135    ) -> Self {
136        assert!(logical_source.core.is_iceberg_connector());
137
138        let mut core = logical_source.core.clone();
139        // Apply type remapping: change the source column types to Hummock table types
140        // so that the output schema has Hummock types.
141        for col in &mut core.column_catalog {
142            if let Some(target_type) = table_column_type_mapping.get(col.name()) {
143                col.column_desc.data_type = target_type.clone();
144            }
145        }
146        let hummock_rewrite = HummockRewriteInfo::new(source_to_table_mapping);
147        let base = PlanBase::new_logical_with_core(&core);
148        assert!(logical_source.output_exprs.is_none());
149        LogicalIcebergIntermediateScan {
150            base,
151            core,
152            iceberg_predicate: Predicate::AlwaysTrue,
153            time_travel_info,
154            table_column_type_mapping,
155            hummock_rewrite,
156        }
157    }
158
159    pub fn source_catalog(&self) -> Option<&SourceCatalog> {
160        self.core.catalog.as_deref()
161    }
162
163    pub fn output_columns(&self) -> impl ExactSizeIterator<Item = &str> {
164        self.core.column_catalog.iter().map(|c| c.name.as_str())
165    }
166
167    pub fn add_predicate(
168        &self,
169        iceberg_predicate: Predicate,
170        extracted_condition: Condition,
171    ) -> Self {
172        LogicalIcebergIntermediateScan {
173            iceberg_predicate: self.iceberg_predicate.clone().and(iceberg_predicate),
174            hummock_rewrite: self.hummock_rewrite.add_predicate(extracted_condition),
175            ..self.clone()
176        }
177    }
178
179    /// Returns true if this intermediate scan has type remapping for Iceberg engine tables.
180    pub fn has_type_mapping(&self) -> bool {
181        !self.table_column_type_mapping.is_empty()
182    }
183
184    pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
185        assert!(!required_cols.is_empty());
186
187        let mut core = self.core.clone();
188        core.column_catalog = required_cols
189            .iter()
190            .map(|idx| core.column_catalog[*idx].clone())
191            .collect();
192        core.row_id_index = required_cols
193            .iter()
194            .position(|idx| Some(*idx) == self.core.row_id_index);
195
196        let base = PlanBase::new_logical_with_core(&core);
197
198        LogicalIcebergIntermediateScan {
199            base,
200            core,
201            iceberg_predicate: self.iceberg_predicate.clone(),
202            time_travel_info: self.time_travel_info.clone(),
203            table_column_type_mapping: self.table_column_type_mapping.clone(),
204            hummock_rewrite: self.hummock_rewrite.prune_columns(required_cols),
205        }
206    }
207}
208
209impl_plan_tree_node_for_leaf! { Logical, LogicalIcebergIntermediateScan }
210
211impl Distill for LogicalIcebergIntermediateScan {
212    fn distill<'a>(&self) -> XmlNode<'a> {
213        let verbose = self.base.ctx().is_explain_verbose();
214        let mut fields = Vec::with_capacity(if verbose { 4 } else { 2 });
215
216        if let Some(catalog) = self.source_catalog() {
217            fields.push(("source", Pretty::from(catalog.name.clone())));
218        } else {
219            fields.push(("source", Pretty::from("unknown")));
220        }
221        fields.push(("columns", column_names_pretty(self.schema())));
222
223        if verbose {
224            fields.push(("predicate", Pretty::debug(&self.iceberg_predicate)));
225            fields.push((
226                "output_column",
227                Pretty::debug(&self.output_columns().collect_vec()),
228            ));
229            fields.push(("time_travel_info", Pretty::debug(&self.time_travel_info)));
230        }
231
232        childless_record("LogicalIcebergIntermediateScan", fields)
233    }
234}
235
236impl ColPrunable for LogicalIcebergIntermediateScan {
237    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
238        if required_cols.is_empty() {
239            // If required_cols is empty, we use the first column of iceberg to avoid the empty schema.
240            LogicalProject::new(self.clone_with_required_cols(&[0]).into(), vec![]).into()
241        } else {
242            self.clone_with_required_cols(required_cols).into()
243        }
244    }
245}
246
247impl ExprRewritable<Logical> for LogicalIcebergIntermediateScan {}
248
249impl ExprVisitable for LogicalIcebergIntermediateScan {}
250
251impl PredicatePushdown for LogicalIcebergIntermediateScan {
252    fn predicate_pushdown(
253        &self,
254        predicate: Condition,
255        _ctx: &mut PredicatePushdownContext,
256    ) -> PlanRef {
257        let ExtractIcebergPredicateResult {
258            iceberg_predicate,
259            extracted_condition,
260            remaining_condition,
261        } = extract_iceberg_predicate(predicate, self.schema().fields());
262        let plan = self
263            .add_predicate(iceberg_predicate, extracted_condition)
264            .into();
265        if remaining_condition.always_true() {
266            plan
267        } else {
268            LogicalFilter::create(plan, remaining_condition)
269        }
270    }
271}
272
273impl ToBatch for LogicalIcebergIntermediateScan {
274    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
275        // This should not be called directly. The intermediate scan should be
276        // converted to LogicalIcebergScan first via the materialization rule.
277        Err(crate::error::ErrorCode::InternalError(
278            "LogicalIcebergIntermediateScan should be converted to LogicalIcebergScan before to_batch".to_owned()
279        )
280        .into())
281    }
282}
283
284impl ToStream for LogicalIcebergIntermediateScan {
285    fn to_stream(
286        &self,
287        _ctx: &mut ToStreamContext,
288    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
289        unreachable!("LogicalIcebergIntermediateScan is only for batch queries")
290    }
291
292    fn logical_rewrite_for_stream(
293        &self,
294        _ctx: &mut RewriteStreamContext,
295    ) -> Result<(PlanRef, ColIndexMapping)> {
296        unreachable!("LogicalIcebergIntermediateScan is only for batch queries")
297    }
298}