risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use iceberg::expr::Predicate as IcebergPredicate;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
21use risingwave_common::types::DataType;
22use risingwave_pb::batch_plan::IcebergScanNode;
23use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::batch::prelude::*;
28use super::utils::{Distill, childless_record, column_names_pretty};
29use super::{
30    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
31    generic,
32};
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::error::Result;
35use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
36use crate::optimizer::property::{Distribution, Order};
37
38#[derive(Debug, Clone)]
39pub struct BatchIcebergScan {
40    pub base: PlanBase<Batch>,
41    pub core: generic::Source,
42    iceberg_scan_type: IcebergScanType,
43    pub predicate: IcebergPredicate,
44    pub snapshot_id: Option<i64>,
45}
46
47impl PartialEq for BatchIcebergScan {
48    fn eq(&self, other: &Self) -> bool {
49        if self.predicate == IcebergPredicate::AlwaysTrue
50            && other.predicate == IcebergPredicate::AlwaysTrue
51        {
52            self.base == other.base
53                && self.core == other.core
54                && self.snapshot_id == other.snapshot_id
55        } else {
56            panic!("BatchIcebergScan::eq: comparing non-AlwaysTrue predicates is not supported")
57        }
58    }
59}
60
61impl Eq for BatchIcebergScan {}
62
63impl Hash for BatchIcebergScan {
64    fn hash<H: Hasher>(&self, state: &mut H) {
65        if self.predicate != IcebergPredicate::AlwaysTrue {
66            panic!("BatchIcebergScan::hash: hashing non-AlwaysTrue predicates is not supported")
67        } else {
68            self.base.hash(state);
69            self.core.hash(state);
70            self.snapshot_id.hash(state);
71        }
72    }
73}
74
75impl BatchIcebergScan {
76    pub fn new(
77        core: generic::Source,
78        iceberg_scan_type: IcebergScanType,
79        snapshot_id: Option<i64>,
80    ) -> Self {
81        let base = PlanBase::new_batch_with_core(
82            &core,
83            // Use `Single` by default, will be updated later with `clone_with_dist`.
84            Distribution::Single,
85            Order::any(),
86        );
87
88        Self {
89            base,
90            core,
91            iceberg_scan_type,
92            predicate: IcebergPredicate::AlwaysTrue,
93            snapshot_id,
94        }
95    }
96
97    pub fn new_count_star_with_batch_iceberg_scan(batch_iceberg_scan: &BatchIcebergScan) -> Self {
98        let mut core = batch_iceberg_scan.core.clone();
99        core.column_catalog = vec![ColumnCatalog::visible(ColumnDesc::named(
100            "count",
101            ColumnId::first_user_column(),
102            DataType::Int64,
103        ))];
104        let base = PlanBase::new_batch_with_core(
105            &core,
106            batch_iceberg_scan.base.distribution().clone(),
107            batch_iceberg_scan.base.order().clone(),
108        );
109        Self {
110            base,
111            core,
112            iceberg_scan_type: IcebergScanType::CountStar,
113            predicate: IcebergPredicate::AlwaysTrue,
114            snapshot_id: batch_iceberg_scan.snapshot_id,
115        }
116    }
117
118    pub fn iceberg_scan_type(&self) -> IcebergScanType {
119        self.iceberg_scan_type
120    }
121
122    pub fn column_names(&self) -> Vec<&str> {
123        self.schema().names_str()
124    }
125
126    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
127        self.core.catalog.clone()
128    }
129
130    pub fn clone_with_dist(&self) -> Self {
131        let base = self
132            .base
133            .clone_with_new_distribution(Distribution::SomeShard);
134        Self {
135            base,
136            core: self.core.clone(),
137            iceberg_scan_type: self.iceberg_scan_type,
138            predicate: self.predicate.clone(),
139            snapshot_id: self.snapshot_id,
140        }
141    }
142
143    pub fn clone_with_predicate(&self, predicate: IcebergPredicate) -> Self {
144        Self {
145            base: self.base.clone(),
146            core: self.core.clone(),
147            iceberg_scan_type: self.iceberg_scan_type,
148            predicate,
149            snapshot_id: self.snapshot_id,
150        }
151    }
152
153    pub fn as_of(&self) -> Option<AsOf> {
154        self.core.as_of.clone()
155    }
156
157    pub fn snapshot_id(&self) -> Option<i64> {
158        self.snapshot_id
159    }
160}
161
162impl_plan_tree_node_for_leaf! { Batch, BatchIcebergScan }
163
164impl Distill for BatchIcebergScan {
165    fn distill<'a>(&self) -> XmlNode<'a> {
166        let src = Pretty::from(self.source_catalog().unwrap().name.clone());
167        let fields = vec![
168            ("source", src),
169            ("columns", column_names_pretty(self.schema())),
170            ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
171            ("predicate", Pretty::from(self.predicate.to_string())),
172        ];
173        childless_record("BatchIcebergScan", fields)
174    }
175}
176
177impl ToLocalBatch for BatchIcebergScan {
178    fn to_local(&self) -> Result<PlanRef> {
179        Ok(self.clone_with_dist().into())
180    }
181}
182
183impl ToDistributedBatch for BatchIcebergScan {
184    fn to_distributed(&self) -> Result<PlanRef> {
185        Ok(self.clone_with_dist().into())
186    }
187}
188
189impl ToBatchPb for BatchIcebergScan {
190    fn to_batch_prost_body(&self) -> NodeBody {
191        let source_catalog = self.source_catalog().unwrap();
192        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
193        NodeBody::IcebergScan(IcebergScanNode {
194            columns: self
195                .core
196                .column_catalog
197                .iter()
198                .map(|c| c.to_protobuf())
199                .collect(),
200            with_properties,
201            split: vec![],
202            secret_refs,
203            iceberg_scan_type: self.iceberg_scan_type as i32,
204        })
205    }
206}
207
208impl ExprRewritable<Batch> for BatchIcebergScan {}
209
210impl ExprVisitable for BatchIcebergScan {}