risingwave_frontend/optimizer/plan_node/
logical_iceberg_scan.rs1use std::hash::{Hash, Hasher};
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_connector::source::iceberg::IcebergFileScanTask;
19use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
20
21use super::generic::GenericPlanRef;
22use super::utils::{Distill, childless_record};
23use super::{
24 ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PredicatePushdown,
25 ToBatch, ToStream, generic,
26};
27use crate::catalog::source_catalog::SourceCatalog;
28use crate::error::Result;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::column_names_pretty;
31use crate::optimizer::plan_node::{
32 BatchIcebergScan, ColumnPruningContext, LogicalFilter, PredicatePushdownContext,
33 RewriteStreamContext, ToStreamContext,
34};
35use crate::utils::{ColIndexMapping, Condition};
36
37#[derive(Debug, Clone, PartialEq)]
46pub struct LogicalIcebergScan {
47 pub base: PlanBase<Logical>,
48 pub core: generic::Source,
49 pub task: IcebergFileScanTask,
50}
51
52impl Eq for LogicalIcebergScan {}
53
54impl Hash for LogicalIcebergScan {
55 fn hash<H: Hasher>(&self, state: &mut H) {
56 self.base.hash(state);
57 self.core.hash(state);
58 self.iceberg_scan_type().hash(state);
59 }
60}
61
62impl LogicalIcebergScan {
63 pub fn new(core: generic::Source, task: IcebergFileScanTask) -> Self {
64 let base = PlanBase::new_logical_with_core(&core);
65 LogicalIcebergScan { base, core, task }
66 }
67
68 pub fn source_catalog(&self) -> Option<&SourceCatalog> {
69 self.core.catalog.as_deref()
70 }
71
72 pub fn iceberg_scan_type(&self) -> IcebergScanType {
73 match &self.task {
74 IcebergFileScanTask::Data(_) => IcebergScanType::DataScan,
75 IcebergFileScanTask::EqualityDelete(_) => IcebergScanType::EqualityDeleteScan,
76 IcebergFileScanTask::PositionDelete(_) => IcebergScanType::PositionDeleteScan,
77 }
78 }
79}
80
81impl_plan_tree_node_for_leaf! { Logical, LogicalIcebergScan}
82impl Distill for LogicalIcebergScan {
83 fn distill<'a>(&self) -> XmlNode<'a> {
84 let fields = if let Some(catalog) = self.source_catalog() {
85 let src = Pretty::from(catalog.name.clone());
86 vec![
87 ("source", src),
88 ("columns", column_names_pretty(self.schema())),
89 (
90 "iceberg_scan_type",
91 Pretty::debug(&self.iceberg_scan_type()),
92 ),
93 ]
94 } else {
95 vec![]
96 };
97 childless_record("LogicalIcebergScan", fields)
98 }
99}
100
101impl ColPrunable for LogicalIcebergScan {
102 fn prune_col(&self, _: &[usize], _: &mut ColumnPruningContext) -> PlanRef {
103 unreachable!()
105 }
106}
107
108impl ExprRewritable<Logical> for LogicalIcebergScan {}
109
110impl ExprVisitable for LogicalIcebergScan {}
111
112impl PredicatePushdown for LogicalIcebergScan {
113 fn predicate_pushdown(
114 &self,
115 predicate: Condition,
116 _ctx: &mut PredicatePushdownContext,
117 ) -> PlanRef {
118 LogicalFilter::create(self.clone().into(), predicate)
120 }
121}
122
123impl ToBatch for LogicalIcebergScan {
124 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
125 let plan = BatchIcebergScan::new(self.core.clone(), self.task.clone()).into();
126 Ok(plan)
127 }
128}
129
130impl ToStream for LogicalIcebergScan {
131 fn to_stream(
132 &self,
133 _ctx: &mut ToStreamContext,
134 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
135 unreachable!()
136 }
137
138 fn logical_rewrite_for_stream(
139 &self,
140 _ctx: &mut RewriteStreamContext,
141 ) -> Result<(PlanRef, ColIndexMapping)> {
142 unreachable!()
143 }
144}