risingwave_frontend/optimizer/plan_node/
logical_iceberg_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
19
20use super::generic::GenericPlanRef;
21use super::utils::{Distill, childless_record};
22use super::{
23    ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject, PlanBase,
24    PredicatePushdown, ToBatch, ToStream, generic,
25};
26use crate::catalog::source_catalog::SourceCatalog;
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::column_names_pretty;
30use crate::optimizer::plan_node::{
31    BatchIcebergScan, ColumnPruningContext, LogicalFilter, LogicalSource, PredicatePushdownContext,
32    RewriteStreamContext, ToStreamContext,
33};
34use crate::utils::{ColIndexMapping, Condition};
35
36/// `LogicalIcebergScan` is only used by batch queries. At the beginning of the batch query optimization, `LogicalSource` with a iceberg property would be converted into a `LogicalIcebergScan`.
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct LogicalIcebergScan {
39    pub base: PlanBase<Logical>,
40    pub core: generic::Source,
41    iceberg_scan_type: IcebergScanType,
42    snapshot_id: Option<i64>,
43}
44
45impl LogicalIcebergScan {
46    pub fn new(
47        logical_source: &LogicalSource,
48        iceberg_scan_type: IcebergScanType,
49        snapshot_id: Option<i64>,
50    ) -> Self {
51        assert!(logical_source.core.is_iceberg_connector());
52
53        let core = logical_source.core.clone();
54        let base = PlanBase::new_logical_with_core(&core);
55
56        assert!(logical_source.output_exprs.is_none());
57
58        LogicalIcebergScan {
59            base,
60            core,
61            iceberg_scan_type,
62            snapshot_id,
63        }
64    }
65
66    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
67        self.core.catalog.clone()
68    }
69
70    pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
71        assert!(!required_cols.is_empty());
72        let mut core = self.core.clone();
73        let mut has_row_id = false;
74        core.column_catalog = required_cols
75            .iter()
76            .map(|idx| {
77                if Some(*idx) == core.row_id_index {
78                    has_row_id = true;
79                }
80                core.column_catalog[*idx].clone()
81            })
82            .collect();
83        if !has_row_id {
84            core.row_id_index = None;
85        }
86        let base = PlanBase::new_logical_with_core(&core);
87
88        LogicalIcebergScan {
89            base,
90            core,
91            iceberg_scan_type: self.iceberg_scan_type,
92            snapshot_id: self.snapshot_id,
93        }
94    }
95}
96
97impl_plan_tree_node_for_leaf! { Logical, LogicalIcebergScan}
98impl Distill for LogicalIcebergScan {
99    fn distill<'a>(&self) -> XmlNode<'a> {
100        let fields = if let Some(catalog) = self.source_catalog() {
101            let src = Pretty::from(catalog.name.clone());
102            vec![
103                ("source", src),
104                ("columns", column_names_pretty(self.schema())),
105                ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
106            ]
107        } else {
108            vec![]
109        };
110        childless_record("LogicalIcebergScan", fields)
111    }
112}
113
114impl ColPrunable for LogicalIcebergScan {
115    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
116        if required_cols.is_empty() {
117            let mapping =
118                ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
119            // If reuqiured_cols is empty, we use the first column of iceberg to avoid the empty schema.
120            LogicalProject::with_mapping(self.clone_with_required_cols(&[0]).into(), mapping).into()
121        } else {
122            self.clone_with_required_cols(required_cols).into()
123        }
124    }
125}
126
127impl ExprRewritable<Logical> for LogicalIcebergScan {}
128
129impl ExprVisitable for LogicalIcebergScan {}
130
131impl PredicatePushdown for LogicalIcebergScan {
132    fn predicate_pushdown(
133        &self,
134        predicate: Condition,
135        _ctx: &mut PredicatePushdownContext,
136    ) -> PlanRef {
137        // No pushdown.
138        LogicalFilter::create(self.clone().into(), predicate)
139    }
140}
141
142impl ToBatch for LogicalIcebergScan {
143    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
144        let plan =
145            BatchIcebergScan::new(self.core.clone(), self.iceberg_scan_type, self.snapshot_id)
146                .into();
147        Ok(plan)
148    }
149}
150
151impl ToStream for LogicalIcebergScan {
152    fn to_stream(
153        &self,
154        _ctx: &mut ToStreamContext,
155    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
156        unreachable!()
157    }
158
159    fn logical_rewrite_for_stream(
160        &self,
161        _ctx: &mut RewriteStreamContext,
162    ) -> Result<(PlanRef, ColIndexMapping)> {
163        unreachable!()
164    }
165}