risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs1use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_connector::source::iceberg::IcebergFileScanTask;
20use risingwave_pb::batch_plan::IcebergScanNode;
21use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
22use risingwave_pb::batch_plan::plan_node::NodeBody;
23use risingwave_sqlparser::ast::AsOf;
24
25use super::batch::prelude::*;
26use super::utils::{Distill, childless_record, column_names_pretty};
27use super::{
28 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
29 generic,
30};
31use crate::catalog::source_catalog::SourceCatalog;
32use crate::error::Result;
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::property::{Distribution, Order};
35
36#[derive(Debug, Clone, PartialEq)]
37pub struct BatchIcebergScan {
38 pub base: PlanBase<Batch>,
39 pub core: generic::Source,
40 pub task: IcebergFileScanTask,
41 limit: Option<u64>,
42}
43
44impl Eq for BatchIcebergScan {}
45
46impl Hash for BatchIcebergScan {
47 fn hash<H: Hasher>(&self, state: &mut H) {
48 self.base.hash(state);
49 self.core.hash(state);
50 self.iceberg_scan_type().hash(state);
51 self.limit.hash(state);
52 }
53}
54
55impl BatchIcebergScan {
56 pub fn new(core: generic::Source, task: IcebergFileScanTask) -> Self {
57 let base = PlanBase::new_batch_with_core(
58 &core,
59 Distribution::Single,
61 Order::any(),
62 );
63
64 Self {
65 base,
66 core,
67 task,
68 limit: None,
69 }
70 }
71
72 pub fn clone_with_limit(&self, limit: Option<u64>) -> Self {
73 Self {
74 base: self.base.clone(),
75 core: self.core.clone(),
76 task: self.task.clone(),
77 limit,
78 }
79 }
80
81 pub fn limit(&self) -> Option<u64> {
82 self.limit
83 }
84
85 pub fn iceberg_scan_type(&self) -> IcebergScanType {
86 match &self.task {
87 IcebergFileScanTask::Data(_) => IcebergScanType::DataScan,
88 IcebergFileScanTask::EqualityDelete(_) => IcebergScanType::EqualityDeleteScan,
89 IcebergFileScanTask::PositionDelete(_) => IcebergScanType::PositionDeleteScan,
90 }
91 }
92
93 pub fn predicate(&self) -> Option<String> {
94 let predicate = self.task.predicate()?;
95 Some(predicate.to_string())
96 }
97
98 pub fn column_names(&self) -> Vec<&str> {
99 self.schema().names_str()
100 }
101
102 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
103 self.core.catalog.clone()
104 }
105
106 pub fn clone_with_dist(&self) -> Self {
107 let base = self
108 .base
109 .clone_with_new_distribution(Distribution::SomeShard);
110 Self {
111 base,
112 core: self.core.clone(),
113 task: self.task.clone(),
114 limit: self.limit,
115 }
116 }
117
118 pub fn as_of(&self) -> Option<AsOf> {
119 self.core.as_of.clone()
120 }
121}
122
123impl_plan_tree_node_for_leaf! { Batch, BatchIcebergScan }
124
125impl Distill for BatchIcebergScan {
126 fn distill<'a>(&self) -> XmlNode<'a> {
127 let src = Pretty::from(self.source_catalog().unwrap().name.clone());
128 let mut fields = vec![
129 ("source", src),
130 ("columns", column_names_pretty(self.schema())),
131 (
132 "iceberg_scan_type",
133 Pretty::from(format!("{:?}", self.iceberg_scan_type())),
134 ),
135 ];
136 if let Some(predicate) = self.predicate() {
137 fields.push(("predicate", Pretty::from(predicate)));
138 }
139 if let Some(limit) = self.limit {
140 fields.push(("limit", Pretty::debug(&limit)));
141 }
142 childless_record("BatchIcebergScan", fields)
143 }
144}
145
146impl ToLocalBatch for BatchIcebergScan {
147 fn to_local(&self) -> Result<PlanRef> {
148 Ok(self.clone_with_dist().into())
149 }
150}
151
152impl ToDistributedBatch for BatchIcebergScan {
153 fn to_distributed(&self) -> Result<PlanRef> {
154 Ok(self.clone_with_dist().into())
155 }
156}
157
158impl ToBatchPb for BatchIcebergScan {
159 fn to_batch_prost_body(&self) -> NodeBody {
160 let source_catalog = self.source_catalog().unwrap();
161 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
162 NodeBody::IcebergScan(IcebergScanNode {
163 columns: self
164 .core
165 .column_catalog
166 .iter()
167 .map(|c| c.to_protobuf())
168 .collect(),
169 with_properties,
170 split: vec![],
171 secret_refs,
172 iceberg_scan_type: self.iceberg_scan_type() as i32,
173 })
174 }
175}
176
177impl ExprRewritable<Batch> for BatchIcebergScan {}
178
179impl ExprVisitable for BatchIcebergScan {}