risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use iceberg::expr::Predicate as IcebergPredicate;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
21use risingwave_common::types::DataType;
22use risingwave_pb::batch_plan::IcebergScanNode;
23use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::batch::prelude::*;
28use super::utils::{Distill, childless_record, column_names_pretty};
29use super::{
30    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
31    generic,
32};
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::error::Result;
35use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
36use crate::optimizer::property::{Distribution, Order};
37
38#[derive(Debug, Clone)]
39pub struct BatchIcebergScan {
40    pub base: PlanBase<Batch>,
41    pub core: generic::Source,
42    iceberg_scan_type: IcebergScanType,
43    pub predicate: IcebergPredicate,
44}
45
46impl PartialEq for BatchIcebergScan {
47    fn eq(&self, other: &Self) -> bool {
48        if self.predicate == IcebergPredicate::AlwaysTrue
49            && other.predicate == IcebergPredicate::AlwaysTrue
50        {
51            self.base == other.base && self.core == other.core
52        } else {
53            panic!("BatchIcebergScan::eq: comparing non-AlwaysTrue predicates is not supported")
54        }
55    }
56}
57
58impl Eq for BatchIcebergScan {}
59
60impl Hash for BatchIcebergScan {
61    fn hash<H: Hasher>(&self, state: &mut H) {
62        if self.predicate != IcebergPredicate::AlwaysTrue {
63            panic!("BatchIcebergScan::hash: hashing non-AlwaysTrue predicates is not supported")
64        } else {
65            self.base.hash(state);
66            self.core.hash(state);
67        }
68    }
69}
70
71impl BatchIcebergScan {
72    pub fn new(core: generic::Source, iceberg_scan_type: IcebergScanType) -> Self {
73        let base = PlanBase::new_batch_with_core(
74            &core,
75            // Use `Single` by default, will be updated later with `clone_with_dist`.
76            Distribution::Single,
77            Order::any(),
78        );
79
80        Self {
81            base,
82            core,
83            iceberg_scan_type,
84            predicate: IcebergPredicate::AlwaysTrue,
85        }
86    }
87
88    pub fn new_count_star_with_batch_iceberg_scan(batch_iceberg_scan: &BatchIcebergScan) -> Self {
89        let mut core = batch_iceberg_scan.core.clone();
90        core.column_catalog = vec![ColumnCatalog::visible(ColumnDesc::named(
91            "count",
92            ColumnId::first_user_column(),
93            DataType::Int64,
94        ))];
95        let base = PlanBase::new_batch_with_core(
96            &core,
97            batch_iceberg_scan.base.distribution().clone(),
98            batch_iceberg_scan.base.order().clone(),
99        );
100        Self {
101            base,
102            core,
103            iceberg_scan_type: IcebergScanType::CountStar,
104            predicate: IcebergPredicate::AlwaysTrue,
105        }
106    }
107
108    pub fn iceberg_scan_type(&self) -> IcebergScanType {
109        self.iceberg_scan_type
110    }
111
112    pub fn column_names(&self) -> Vec<&str> {
113        self.schema().names_str()
114    }
115
116    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
117        self.core.catalog.clone()
118    }
119
120    pub fn clone_with_dist(&self) -> Self {
121        let base = self
122            .base
123            .clone_with_new_distribution(Distribution::SomeShard);
124        Self {
125            base,
126            core: self.core.clone(),
127            iceberg_scan_type: self.iceberg_scan_type,
128            predicate: self.predicate.clone(),
129        }
130    }
131
132    pub fn clone_with_predicate(&self, predicate: IcebergPredicate) -> Self {
133        Self {
134            base: self.base.clone(),
135            core: self.core.clone(),
136            iceberg_scan_type: self.iceberg_scan_type,
137            predicate,
138        }
139    }
140
141    pub fn as_of(&self) -> Option<AsOf> {
142        self.core.as_of.clone()
143    }
144}
145
146impl_plan_tree_node_for_leaf! { Batch, BatchIcebergScan }
147
148impl Distill for BatchIcebergScan {
149    fn distill<'a>(&self) -> XmlNode<'a> {
150        let src = Pretty::from(self.source_catalog().unwrap().name.clone());
151        let fields = vec![
152            ("source", src),
153            ("columns", column_names_pretty(self.schema())),
154            ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
155            ("predicate", Pretty::from(self.predicate.to_string())),
156        ];
157        childless_record("BatchIcebergScan", fields)
158    }
159}
160
161impl ToLocalBatch for BatchIcebergScan {
162    fn to_local(&self) -> Result<PlanRef> {
163        Ok(self.clone_with_dist().into())
164    }
165}
166
167impl ToDistributedBatch for BatchIcebergScan {
168    fn to_distributed(&self) -> Result<PlanRef> {
169        Ok(self.clone_with_dist().into())
170    }
171}
172
173impl ToBatchPb for BatchIcebergScan {
174    fn to_batch_prost_body(&self) -> NodeBody {
175        let source_catalog = self.source_catalog().unwrap();
176        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
177        NodeBody::IcebergScan(IcebergScanNode {
178            columns: self
179                .core
180                .column_catalog
181                .iter()
182                .map(|c| c.to_protobuf())
183                .collect(),
184            with_properties,
185            split: vec![],
186            secret_refs,
187            iceberg_scan_type: self.iceberg_scan_type as i32,
188        })
189    }
190}
191
192impl ExprRewritable<Batch> for BatchIcebergScan {}
193
194impl ExprVisitable for BatchIcebergScan {}