risingwave_frontend/optimizer/plan_node/
logical_iceberg_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
19
20use super::generic::GenericPlanRef;
21use super::utils::{Distill, childless_record};
22use super::{
23    ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef, PredicatePushdown,
24    ToBatch, ToStream, generic,
25};
26use crate::catalog::source_catalog::SourceCatalog;
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::column_names_pretty;
30use crate::optimizer::plan_node::{
31    BatchIcebergScan, ColumnPruningContext, LogicalFilter, LogicalSource, PredicatePushdownContext,
32    RewriteStreamContext, ToStreamContext,
33};
34use crate::utils::{ColIndexMapping, Condition};
35
36/// `LogicalIcebergScan` is only used by batch queries. At the beginning of the batch query optimization, `LogicalSource` with a iceberg property would be converted into a `LogicalIcebergScan`.
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct LogicalIcebergScan {
39    pub base: PlanBase<Logical>,
40    pub core: generic::Source,
41    iceberg_scan_type: IcebergScanType,
42}
43
44impl LogicalIcebergScan {
45    pub fn new(logical_source: &LogicalSource, iceberg_scan_type: IcebergScanType) -> Self {
46        assert!(logical_source.core.is_iceberg_connector());
47
48        let core = logical_source.core.clone();
49        let base = PlanBase::new_logical_with_core(&core);
50
51        assert!(logical_source.output_exprs.is_none());
52
53        LogicalIcebergScan {
54            base,
55            core,
56            iceberg_scan_type,
57        }
58    }
59
60    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
61        self.core.catalog.clone()
62    }
63
64    pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
65        assert!(!required_cols.is_empty());
66        let mut core = self.core.clone();
67        let mut has_row_id = false;
68        core.column_catalog = required_cols
69            .iter()
70            .map(|idx| {
71                if Some(*idx) == core.row_id_index {
72                    has_row_id = true;
73                }
74                core.column_catalog[*idx].clone()
75            })
76            .collect();
77        if !has_row_id {
78            core.row_id_index = None;
79        }
80        let base = PlanBase::new_logical_with_core(&core);
81
82        LogicalIcebergScan {
83            base,
84            core,
85            iceberg_scan_type: self.iceberg_scan_type,
86        }
87    }
88}
89
90impl_plan_tree_node_for_leaf! {LogicalIcebergScan}
91impl Distill for LogicalIcebergScan {
92    fn distill<'a>(&self) -> XmlNode<'a> {
93        let fields = if let Some(catalog) = self.source_catalog() {
94            let src = Pretty::from(catalog.name.clone());
95            vec![
96                ("source", src),
97                ("columns", column_names_pretty(self.schema())),
98                ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
99            ]
100        } else {
101            vec![]
102        };
103        childless_record("LogicalIcebergScan", fields)
104    }
105}
106
107impl ColPrunable for LogicalIcebergScan {
108    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
109        if required_cols.is_empty() {
110            let mapping =
111                ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
112            // If reuqiured_cols is empty, we use the first column of iceberg to avoid the empty schema.
113            LogicalProject::with_mapping(self.clone_with_required_cols(&[0]).into(), mapping).into()
114        } else {
115            self.clone_with_required_cols(required_cols).into()
116        }
117    }
118}
119
120impl ExprRewritable for LogicalIcebergScan {}
121
122impl ExprVisitable for LogicalIcebergScan {}
123
124impl PredicatePushdown for LogicalIcebergScan {
125    fn predicate_pushdown(
126        &self,
127        predicate: Condition,
128        _ctx: &mut PredicatePushdownContext,
129    ) -> PlanRef {
130        // No pushdown.
131        LogicalFilter::create(self.clone().into(), predicate)
132    }
133}
134
135impl ToBatch for LogicalIcebergScan {
136    fn to_batch(&self) -> Result<PlanRef> {
137        let plan: PlanRef = BatchIcebergScan::new(self.core.clone(), self.iceberg_scan_type).into();
138        Ok(plan)
139    }
140}
141
142impl ToStream for LogicalIcebergScan {
143    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
144        unreachable!()
145    }
146
147    fn logical_rewrite_for_stream(
148        &self,
149        _ctx: &mut RewriteStreamContext,
150    ) -> Result<(PlanRef, ColIndexMapping)> {
151        unreachable!()
152    }
153}