risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs1use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_connector::source::iceberg::IcebergFileScanTask;
20use risingwave_pb::batch_plan::IcebergScanNode;
21use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
22use risingwave_pb::batch_plan::plan_node::NodeBody;
23use risingwave_sqlparser::ast::AsOf;
24
25use super::batch::prelude::*;
26use super::utils::{Distill, childless_record, column_names_pretty};
27use super::{
28 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
29 generic,
30};
31use crate::catalog::source_catalog::SourceCatalog;
32use crate::error::Result;
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::property::{Distribution, Order};
35
36#[derive(Debug, Clone, PartialEq)]
37pub struct BatchIcebergScan {
38 pub base: PlanBase<Batch>,
39 pub core: generic::Source,
40 pub task: IcebergFileScanTask,
41}
42
43impl Eq for BatchIcebergScan {}
44
45impl Hash for BatchIcebergScan {
46 fn hash<H: Hasher>(&self, state: &mut H) {
47 self.base.hash(state);
48 self.core.hash(state);
49 self.iceberg_scan_type().hash(state);
50 }
51}
52
53impl BatchIcebergScan {
54 pub fn new(core: generic::Source, task: IcebergFileScanTask) -> Self {
55 let base = PlanBase::new_batch_with_core(
56 &core,
57 Distribution::Single,
59 Order::any(),
60 );
61
62 Self { base, core, task }
63 }
64
65 pub fn iceberg_scan_type(&self) -> IcebergScanType {
66 match &self.task {
67 IcebergFileScanTask::Data(_) => IcebergScanType::DataScan,
68 IcebergFileScanTask::EqualityDelete(_) => IcebergScanType::EqualityDeleteScan,
69 IcebergFileScanTask::PositionDelete(_) => IcebergScanType::PositionDeleteScan,
70 }
71 }
72
73 pub fn predicate(&self) -> Option<String> {
74 let predicate = self.task.predicate()?;
75 Some(predicate.to_string())
76 }
77
78 pub fn column_names(&self) -> Vec<&str> {
79 self.schema().names_str()
80 }
81
82 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
83 self.core.catalog.clone()
84 }
85
86 pub fn clone_with_dist(&self) -> Self {
87 let base = self
88 .base
89 .clone_with_new_distribution(Distribution::SomeShard);
90 Self {
91 base,
92 core: self.core.clone(),
93 task: self.task.clone(),
94 }
95 }
96
97 pub fn as_of(&self) -> Option<AsOf> {
98 self.core.as_of.clone()
99 }
100}
101
102impl_plan_tree_node_for_leaf! { Batch, BatchIcebergScan }
103
104impl Distill for BatchIcebergScan {
105 fn distill<'a>(&self) -> XmlNode<'a> {
106 let src = Pretty::from(self.source_catalog().unwrap().name.clone());
107 let mut fields = vec![
108 ("source", src),
109 ("columns", column_names_pretty(self.schema())),
110 (
111 "iceberg_scan_type",
112 Pretty::from(format!("{:?}", self.iceberg_scan_type())),
113 ),
114 ];
115 if let Some(predicate) = self.predicate() {
116 fields.push(("predicate", Pretty::from(predicate)));
117 }
118 childless_record("BatchIcebergScan", fields)
119 }
120}
121
122impl ToLocalBatch for BatchIcebergScan {
123 fn to_local(&self) -> Result<PlanRef> {
124 Ok(self.clone_with_dist().into())
125 }
126}
127
128impl ToDistributedBatch for BatchIcebergScan {
129 fn to_distributed(&self) -> Result<PlanRef> {
130 Ok(self.clone_with_dist().into())
131 }
132}
133
134impl ToBatchPb for BatchIcebergScan {
135 fn to_batch_prost_body(&self) -> NodeBody {
136 let source_catalog = self.source_catalog().unwrap();
137 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
138 NodeBody::IcebergScan(IcebergScanNode {
139 columns: self
140 .core
141 .column_catalog
142 .iter()
143 .map(|c| c.to_protobuf())
144 .collect(),
145 with_properties,
146 split: vec![],
147 secret_refs,
148 iceberg_scan_type: self.iceberg_scan_type() as i32,
149 })
150 }
151}
152
153impl ExprRewritable<Batch> for BatchIcebergScan {}
154
155impl ExprVisitable for BatchIcebergScan {}