risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs1use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use iceberg::expr::Predicate as IcebergPredicate;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
21use risingwave_common::types::DataType;
22use risingwave_pb::batch_plan::IcebergScanNode;
23use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::batch::prelude::*;
28use super::utils::{Distill, childless_record, column_names_pretty};
29use super::{
30 ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, generic,
31};
32use crate::catalog::source_catalog::SourceCatalog;
33use crate::error::Result;
34use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
35use crate::optimizer::property::{Distribution, Order};
36
37#[derive(Debug, Clone)]
38pub struct BatchIcebergScan {
39 pub base: PlanBase<Batch>,
40 pub core: generic::Source,
41 iceberg_scan_type: IcebergScanType,
42 pub predicate: IcebergPredicate,
43}
44
45impl PartialEq for BatchIcebergScan {
46 fn eq(&self, other: &Self) -> bool {
47 if self.predicate == IcebergPredicate::AlwaysTrue
48 && other.predicate == IcebergPredicate::AlwaysTrue
49 {
50 self.base == other.base && self.core == other.core
51 } else {
52 panic!("BatchIcebergScan::eq: comparing non-AlwaysTrue predicates is not supported")
53 }
54 }
55}
56
57impl Eq for BatchIcebergScan {}
58
59impl Hash for BatchIcebergScan {
60 fn hash<H: Hasher>(&self, state: &mut H) {
61 if self.predicate != IcebergPredicate::AlwaysTrue {
62 panic!("BatchIcebergScan::hash: hashing non-AlwaysTrue predicates is not supported")
63 } else {
64 self.base.hash(state);
65 self.core.hash(state);
66 }
67 }
68}
69
70impl BatchIcebergScan {
71 pub fn new(core: generic::Source, iceberg_scan_type: IcebergScanType) -> Self {
72 let base = PlanBase::new_batch_with_core(
73 &core,
74 Distribution::Single,
76 Order::any(),
77 );
78
79 Self {
80 base,
81 core,
82 iceberg_scan_type,
83 predicate: IcebergPredicate::AlwaysTrue,
84 }
85 }
86
87 pub fn new_count_star_with_batch_iceberg_scan(batch_iceberg_scan: &BatchIcebergScan) -> Self {
88 let mut core = batch_iceberg_scan.core.clone();
89 core.column_catalog = vec![ColumnCatalog::visible(ColumnDesc::named(
90 "count",
91 ColumnId::first_user_column(),
92 DataType::Int64,
93 ))];
94 let base = PlanBase::new_batch_with_core(
95 &core,
96 batch_iceberg_scan.base.distribution().clone(),
97 batch_iceberg_scan.base.order().clone(),
98 );
99 Self {
100 base,
101 core,
102 iceberg_scan_type: IcebergScanType::CountStar,
103 predicate: IcebergPredicate::AlwaysTrue,
104 }
105 }
106
107 pub fn iceberg_scan_type(&self) -> IcebergScanType {
108 self.iceberg_scan_type
109 }
110
111 pub fn column_names(&self) -> Vec<&str> {
112 self.schema().names_str()
113 }
114
115 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
116 self.core.catalog.clone()
117 }
118
119 pub fn clone_with_dist(&self) -> Self {
120 let base = self
121 .base
122 .clone_with_new_distribution(Distribution::SomeShard);
123 Self {
124 base,
125 core: self.core.clone(),
126 iceberg_scan_type: self.iceberg_scan_type,
127 predicate: self.predicate.clone(),
128 }
129 }
130
131 pub fn clone_with_predicate(&self, predicate: IcebergPredicate) -> Self {
132 Self {
133 base: self.base.clone(),
134 core: self.core.clone(),
135 iceberg_scan_type: self.iceberg_scan_type,
136 predicate,
137 }
138 }
139
140 pub fn as_of(&self) -> Option<AsOf> {
141 self.core.as_of.clone()
142 }
143}
144
145impl_plan_tree_node_for_leaf! { BatchIcebergScan }
146
147impl Distill for BatchIcebergScan {
148 fn distill<'a>(&self) -> XmlNode<'a> {
149 let src = Pretty::from(self.source_catalog().unwrap().name.clone());
150 let fields = vec![
151 ("source", src),
152 ("columns", column_names_pretty(self.schema())),
153 ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
154 ("predicate", Pretty::from(self.predicate.to_string())),
155 ];
156 childless_record("BatchIcebergScan", fields)
157 }
158}
159
160impl ToLocalBatch for BatchIcebergScan {
161 fn to_local(&self) -> Result<PlanRef> {
162 Ok(self.clone_with_dist().into())
163 }
164}
165
166impl ToDistributedBatch for BatchIcebergScan {
167 fn to_distributed(&self) -> Result<PlanRef> {
168 Ok(self.clone_with_dist().into())
169 }
170}
171
172impl ToBatchPb for BatchIcebergScan {
173 fn to_batch_prost_body(&self) -> NodeBody {
174 let source_catalog = self.source_catalog().unwrap();
175 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
176 NodeBody::IcebergScan(IcebergScanNode {
177 columns: self
178 .core
179 .column_catalog
180 .iter()
181 .map(|c| c.to_protobuf())
182 .collect(),
183 with_properties,
184 split: vec![],
185 secret_refs,
186 iceberg_scan_type: self.iceberg_scan_type as i32,
187 })
188 }
189}
190
191impl ExprRewritable for BatchIcebergScan {}
192
193impl ExprVisitable for BatchIcebergScan {}