risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs1use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use iceberg::expr::Predicate as IcebergPredicate;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
21use risingwave_common::types::DataType;
22use risingwave_pb::batch_plan::IcebergScanNode;
23use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::batch::prelude::*;
28use super::utils::{Distill, childless_record, column_names_pretty};
29use super::{
30 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
31 generic,
32};
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::error::Result;
35use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
36use crate::optimizer::property::{Distribution, Order};
37
38#[derive(Debug, Clone)]
39pub struct BatchIcebergScan {
40 pub base: PlanBase<Batch>,
41 pub core: generic::Source,
42 iceberg_scan_type: IcebergScanType,
43 pub predicate: IcebergPredicate,
44}
45
46impl PartialEq for BatchIcebergScan {
47 fn eq(&self, other: &Self) -> bool {
48 if self.predicate == IcebergPredicate::AlwaysTrue
49 && other.predicate == IcebergPredicate::AlwaysTrue
50 {
51 self.base == other.base && self.core == other.core
52 } else {
53 panic!("BatchIcebergScan::eq: comparing non-AlwaysTrue predicates is not supported")
54 }
55 }
56}
57
58impl Eq for BatchIcebergScan {}
59
60impl Hash for BatchIcebergScan {
61 fn hash<H: Hasher>(&self, state: &mut H) {
62 if self.predicate != IcebergPredicate::AlwaysTrue {
63 panic!("BatchIcebergScan::hash: hashing non-AlwaysTrue predicates is not supported")
64 } else {
65 self.base.hash(state);
66 self.core.hash(state);
67 }
68 }
69}
70
71impl BatchIcebergScan {
72 pub fn new(core: generic::Source, iceberg_scan_type: IcebergScanType) -> Self {
73 let base = PlanBase::new_batch_with_core(
74 &core,
75 Distribution::Single,
77 Order::any(),
78 );
79
80 Self {
81 base,
82 core,
83 iceberg_scan_type,
84 predicate: IcebergPredicate::AlwaysTrue,
85 }
86 }
87
88 pub fn new_count_star_with_batch_iceberg_scan(batch_iceberg_scan: &BatchIcebergScan) -> Self {
89 let mut core = batch_iceberg_scan.core.clone();
90 core.column_catalog = vec![ColumnCatalog::visible(ColumnDesc::named(
91 "count",
92 ColumnId::first_user_column(),
93 DataType::Int64,
94 ))];
95 let base = PlanBase::new_batch_with_core(
96 &core,
97 batch_iceberg_scan.base.distribution().clone(),
98 batch_iceberg_scan.base.order().clone(),
99 );
100 Self {
101 base,
102 core,
103 iceberg_scan_type: IcebergScanType::CountStar,
104 predicate: IcebergPredicate::AlwaysTrue,
105 }
106 }
107
108 pub fn iceberg_scan_type(&self) -> IcebergScanType {
109 self.iceberg_scan_type
110 }
111
112 pub fn column_names(&self) -> Vec<&str> {
113 self.schema().names_str()
114 }
115
116 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
117 self.core.catalog.clone()
118 }
119
120 pub fn clone_with_dist(&self) -> Self {
121 let base = self
122 .base
123 .clone_with_new_distribution(Distribution::SomeShard);
124 Self {
125 base,
126 core: self.core.clone(),
127 iceberg_scan_type: self.iceberg_scan_type,
128 predicate: self.predicate.clone(),
129 }
130 }
131
132 pub fn clone_with_predicate(&self, predicate: IcebergPredicate) -> Self {
133 Self {
134 base: self.base.clone(),
135 core: self.core.clone(),
136 iceberg_scan_type: self.iceberg_scan_type,
137 predicate,
138 }
139 }
140
141 pub fn as_of(&self) -> Option<AsOf> {
142 self.core.as_of.clone()
143 }
144}
145
146impl_plan_tree_node_for_leaf! { Batch, BatchIcebergScan }
147
148impl Distill for BatchIcebergScan {
149 fn distill<'a>(&self) -> XmlNode<'a> {
150 let src = Pretty::from(self.source_catalog().unwrap().name.clone());
151 let fields = vec![
152 ("source", src),
153 ("columns", column_names_pretty(self.schema())),
154 ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
155 ("predicate", Pretty::from(self.predicate.to_string())),
156 ];
157 childless_record("BatchIcebergScan", fields)
158 }
159}
160
161impl ToLocalBatch for BatchIcebergScan {
162 fn to_local(&self) -> Result<PlanRef> {
163 Ok(self.clone_with_dist().into())
164 }
165}
166
167impl ToDistributedBatch for BatchIcebergScan {
168 fn to_distributed(&self) -> Result<PlanRef> {
169 Ok(self.clone_with_dist().into())
170 }
171}
172
173impl ToBatchPb for BatchIcebergScan {
174 fn to_batch_prost_body(&self) -> NodeBody {
175 let source_catalog = self.source_catalog().unwrap();
176 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
177 NodeBody::IcebergScan(IcebergScanNode {
178 columns: self
179 .core
180 .column_catalog
181 .iter()
182 .map(|c| c.to_protobuf())
183 .collect(),
184 with_properties,
185 split: vec![],
186 secret_refs,
187 iceberg_scan_type: self.iceberg_scan_type as i32,
188 })
189 }
190}
191
192impl ExprRewritable<Batch> for BatchIcebergScan {}
193
194impl ExprVisitable for BatchIcebergScan {}