risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use iceberg::expr::Predicate as IcebergPredicate;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
21use risingwave_common::types::DataType;
22use risingwave_pb::batch_plan::IcebergScanNode;
23use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::batch::prelude::*;
28use super::utils::{Distill, childless_record, column_names_pretty};
29use super::{
30    ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, generic,
31};
32use crate::catalog::source_catalog::SourceCatalog;
33use crate::error::Result;
34use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
35use crate::optimizer::property::{Distribution, Order};
36
37#[derive(Debug, Clone)]
38pub struct BatchIcebergScan {
39    pub base: PlanBase<Batch>,
40    pub core: generic::Source,
41    iceberg_scan_type: IcebergScanType,
42    pub predicate: IcebergPredicate,
43}
44
45impl PartialEq for BatchIcebergScan {
46    fn eq(&self, other: &Self) -> bool {
47        if self.predicate == IcebergPredicate::AlwaysTrue
48            && other.predicate == IcebergPredicate::AlwaysTrue
49        {
50            self.base == other.base && self.core == other.core
51        } else {
52            panic!("BatchIcebergScan::eq: comparing non-AlwaysTrue predicates is not supported")
53        }
54    }
55}
56
57impl Eq for BatchIcebergScan {}
58
59impl Hash for BatchIcebergScan {
60    fn hash<H: Hasher>(&self, state: &mut H) {
61        if self.predicate != IcebergPredicate::AlwaysTrue {
62            panic!("BatchIcebergScan::hash: hashing non-AlwaysTrue predicates is not supported")
63        } else {
64            self.base.hash(state);
65            self.core.hash(state);
66        }
67    }
68}
69
70impl BatchIcebergScan {
71    pub fn new(core: generic::Source, iceberg_scan_type: IcebergScanType) -> Self {
72        let base = PlanBase::new_batch_with_core(
73            &core,
74            // Use `Single` by default, will be updated later with `clone_with_dist`.
75            Distribution::Single,
76            Order::any(),
77        );
78
79        Self {
80            base,
81            core,
82            iceberg_scan_type,
83            predicate: IcebergPredicate::AlwaysTrue,
84        }
85    }
86
87    pub fn new_count_star_with_batch_iceberg_scan(batch_iceberg_scan: &BatchIcebergScan) -> Self {
88        let mut core = batch_iceberg_scan.core.clone();
89        core.column_catalog = vec![ColumnCatalog::visible(ColumnDesc::named(
90            "count",
91            ColumnId::first_user_column(),
92            DataType::Int64,
93        ))];
94        let base = PlanBase::new_batch_with_core(
95            &core,
96            batch_iceberg_scan.base.distribution().clone(),
97            batch_iceberg_scan.base.order().clone(),
98        );
99        Self {
100            base,
101            core,
102            iceberg_scan_type: IcebergScanType::CountStar,
103            predicate: IcebergPredicate::AlwaysTrue,
104        }
105    }
106
107    pub fn iceberg_scan_type(&self) -> IcebergScanType {
108        self.iceberg_scan_type
109    }
110
111    pub fn column_names(&self) -> Vec<&str> {
112        self.schema().names_str()
113    }
114
115    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
116        self.core.catalog.clone()
117    }
118
119    pub fn clone_with_dist(&self) -> Self {
120        let base = self
121            .base
122            .clone_with_new_distribution(Distribution::SomeShard);
123        Self {
124            base,
125            core: self.core.clone(),
126            iceberg_scan_type: self.iceberg_scan_type,
127            predicate: self.predicate.clone(),
128        }
129    }
130
131    pub fn clone_with_predicate(&self, predicate: IcebergPredicate) -> Self {
132        Self {
133            base: self.base.clone(),
134            core: self.core.clone(),
135            iceberg_scan_type: self.iceberg_scan_type,
136            predicate,
137        }
138    }
139
140    pub fn as_of(&self) -> Option<AsOf> {
141        self.core.as_of.clone()
142    }
143}
144
145impl_plan_tree_node_for_leaf! { BatchIcebergScan }
146
147impl Distill for BatchIcebergScan {
148    fn distill<'a>(&self) -> XmlNode<'a> {
149        let src = Pretty::from(self.source_catalog().unwrap().name.clone());
150        let fields = vec![
151            ("source", src),
152            ("columns", column_names_pretty(self.schema())),
153            ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
154            ("predicate", Pretty::from(self.predicate.to_string())),
155        ];
156        childless_record("BatchIcebergScan", fields)
157    }
158}
159
160impl ToLocalBatch for BatchIcebergScan {
161    fn to_local(&self) -> Result<PlanRef> {
162        Ok(self.clone_with_dist().into())
163    }
164}
165
166impl ToDistributedBatch for BatchIcebergScan {
167    fn to_distributed(&self) -> Result<PlanRef> {
168        Ok(self.clone_with_dist().into())
169    }
170}
171
172impl ToBatchPb for BatchIcebergScan {
173    fn to_batch_prost_body(&self) -> NodeBody {
174        let source_catalog = self.source_catalog().unwrap();
175        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
176        NodeBody::IcebergScan(IcebergScanNode {
177            columns: self
178                .core
179                .column_catalog
180                .iter()
181                .map(|c| c.to_protobuf())
182                .collect(),
183            with_properties,
184            split: vec![],
185            secret_refs,
186            iceberg_scan_type: self.iceberg_scan_type as i32,
187        })
188    }
189}
190
191impl ExprRewritable for BatchIcebergScan {}
192
193impl ExprVisitable for BatchIcebergScan {}