risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs1use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use iceberg::expr::Predicate as IcebergPredicate;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
21use risingwave_common::types::DataType;
22use risingwave_pb::batch_plan::IcebergScanNode;
23use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::batch::prelude::*;
28use super::utils::{Distill, childless_record, column_names_pretty};
29use super::{
30 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
31 generic,
32};
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::error::Result;
35use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
36use crate::optimizer::property::{Distribution, Order};
37
38#[derive(Debug, Clone)]
39pub struct BatchIcebergScan {
40 pub base: PlanBase<Batch>,
41 pub core: generic::Source,
42 iceberg_scan_type: IcebergScanType,
43 pub predicate: IcebergPredicate,
44 pub snapshot_id: Option<i64>,
45}
46
47impl PartialEq for BatchIcebergScan {
48 fn eq(&self, other: &Self) -> bool {
49 if self.predicate == IcebergPredicate::AlwaysTrue
50 && other.predicate == IcebergPredicate::AlwaysTrue
51 {
52 self.base == other.base
53 && self.core == other.core
54 && self.snapshot_id == other.snapshot_id
55 } else {
56 panic!("BatchIcebergScan::eq: comparing non-AlwaysTrue predicates is not supported")
57 }
58 }
59}
60
61impl Eq for BatchIcebergScan {}
62
63impl Hash for BatchIcebergScan {
64 fn hash<H: Hasher>(&self, state: &mut H) {
65 if self.predicate != IcebergPredicate::AlwaysTrue {
66 panic!("BatchIcebergScan::hash: hashing non-AlwaysTrue predicates is not supported")
67 } else {
68 self.base.hash(state);
69 self.core.hash(state);
70 self.snapshot_id.hash(state);
71 }
72 }
73}
74
75impl BatchIcebergScan {
76 pub fn new(
77 core: generic::Source,
78 iceberg_scan_type: IcebergScanType,
79 snapshot_id: Option<i64>,
80 ) -> Self {
81 let base = PlanBase::new_batch_with_core(
82 &core,
83 Distribution::Single,
85 Order::any(),
86 );
87
88 Self {
89 base,
90 core,
91 iceberg_scan_type,
92 predicate: IcebergPredicate::AlwaysTrue,
93 snapshot_id,
94 }
95 }
96
97 pub fn new_count_star_with_batch_iceberg_scan(batch_iceberg_scan: &BatchIcebergScan) -> Self {
98 let mut core = batch_iceberg_scan.core.clone();
99 core.column_catalog = vec![ColumnCatalog::visible(ColumnDesc::named(
100 "count",
101 ColumnId::first_user_column(),
102 DataType::Int64,
103 ))];
104 let base = PlanBase::new_batch_with_core(
105 &core,
106 batch_iceberg_scan.base.distribution().clone(),
107 batch_iceberg_scan.base.order().clone(),
108 );
109 Self {
110 base,
111 core,
112 iceberg_scan_type: IcebergScanType::CountStar,
113 predicate: IcebergPredicate::AlwaysTrue,
114 snapshot_id: batch_iceberg_scan.snapshot_id,
115 }
116 }
117
118 pub fn iceberg_scan_type(&self) -> IcebergScanType {
119 self.iceberg_scan_type
120 }
121
122 pub fn column_names(&self) -> Vec<&str> {
123 self.schema().names_str()
124 }
125
126 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
127 self.core.catalog.clone()
128 }
129
130 pub fn clone_with_dist(&self) -> Self {
131 let base = self
132 .base
133 .clone_with_new_distribution(Distribution::SomeShard);
134 Self {
135 base,
136 core: self.core.clone(),
137 iceberg_scan_type: self.iceberg_scan_type,
138 predicate: self.predicate.clone(),
139 snapshot_id: self.snapshot_id,
140 }
141 }
142
143 pub fn clone_with_predicate(&self, predicate: IcebergPredicate) -> Self {
144 Self {
145 base: self.base.clone(),
146 core: self.core.clone(),
147 iceberg_scan_type: self.iceberg_scan_type,
148 predicate,
149 snapshot_id: self.snapshot_id,
150 }
151 }
152
153 pub fn as_of(&self) -> Option<AsOf> {
154 self.core.as_of.clone()
155 }
156
157 pub fn snapshot_id(&self) -> Option<i64> {
158 self.snapshot_id
159 }
160}
161
162impl_plan_tree_node_for_leaf! { Batch, BatchIcebergScan }
163
164impl Distill for BatchIcebergScan {
165 fn distill<'a>(&self) -> XmlNode<'a> {
166 let src = Pretty::from(self.source_catalog().unwrap().name.clone());
167 let fields = vec![
168 ("source", src),
169 ("columns", column_names_pretty(self.schema())),
170 ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
171 ("predicate", Pretty::from(self.predicate.to_string())),
172 ];
173 childless_record("BatchIcebergScan", fields)
174 }
175}
176
177impl ToLocalBatch for BatchIcebergScan {
178 fn to_local(&self) -> Result<PlanRef> {
179 Ok(self.clone_with_dist().into())
180 }
181}
182
183impl ToDistributedBatch for BatchIcebergScan {
184 fn to_distributed(&self) -> Result<PlanRef> {
185 Ok(self.clone_with_dist().into())
186 }
187}
188
189impl ToBatchPb for BatchIcebergScan {
190 fn to_batch_prost_body(&self) -> NodeBody {
191 let source_catalog = self.source_catalog().unwrap();
192 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
193 NodeBody::IcebergScan(IcebergScanNode {
194 columns: self
195 .core
196 .column_catalog
197 .iter()
198 .map(|c| c.to_protobuf())
199 .collect(),
200 with_properties,
201 split: vec![],
202 secret_refs,
203 iceberg_scan_type: self.iceberg_scan_type as i32,
204 })
205 }
206}
207
208impl ExprRewritable<Batch> for BatchIcebergScan {}
209
210impl ExprVisitable for BatchIcebergScan {}