risingwave_frontend/optimizer/plan_node/
logical_iceberg_intermediate_scan.rs1use educe::Educe;
16use iceberg::expr::Predicate;
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
19
20use super::generic::GenericPlanRef;
21use super::utils::{Distill, childless_record};
22use super::{
23 ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PredicatePushdown,
24 ToBatch, ToStream, generic,
25};
26use crate::catalog::source_catalog::SourceCatalog;
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::column_names_pretty;
30use crate::optimizer::plan_node::{
31 ColumnPruningContext, LogicalFilter, LogicalProject, LogicalSource, PredicatePushdownContext,
32 RewriteStreamContext, ToStreamContext,
33};
34use crate::utils::{ColIndexMapping, Condition, to_iceberg_predicate};
35
36#[derive(Debug, Clone, PartialEq, Educe)]
49#[educe(Hash)]
50pub struct LogicalIcebergIntermediateScan {
51 pub base: PlanBase<Logical>,
52 pub core: generic::Source,
53 #[educe(Hash(ignore))]
54 pub predicate: Predicate,
55 pub output_columns: Vec<String>,
56 pub time_travel_info: IcebergTimeTravelInfo,
57}
58
59impl Eq for LogicalIcebergIntermediateScan {}
60
61impl LogicalIcebergIntermediateScan {
62 pub fn new(logical_source: &LogicalSource, time_travel_info: IcebergTimeTravelInfo) -> Self {
63 assert!(logical_source.core.is_iceberg_connector());
64
65 let core = logical_source.core.clone();
66 let base = PlanBase::new_logical_with_core(&core);
67 let output_column = core
68 .column_catalog
69 .iter()
70 .map(|c| c.column_desc.name.clone())
71 .collect();
72
73 assert!(logical_source.output_exprs.is_none());
74
75 LogicalIcebergIntermediateScan {
76 base,
77 core,
78 predicate: Predicate::AlwaysTrue,
79 output_columns: output_column,
80 time_travel_info,
81 }
82 }
83
84 pub fn source_catalog(&self) -> Option<&SourceCatalog> {
85 self.core.catalog.as_deref()
86 }
87
88 pub fn clone_with_predicate(&self, predicate: Predicate) -> Self {
89 let new_predicate = self.predicate.clone().and(predicate);
90 LogicalIcebergIntermediateScan {
91 predicate: new_predicate,
92 ..self.clone()
93 }
94 }
95
96 pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
97 assert!(!required_cols.is_empty());
98
99 let mut core = self.core.clone();
100 let mut has_row_id = false;
101 core.column_catalog = required_cols
102 .iter()
103 .map(|idx| {
104 if Some(*idx) == core.row_id_index {
105 has_row_id = true;
106 }
107 core.column_catalog[*idx].clone()
108 })
109 .collect();
110 if !has_row_id {
111 core.row_id_index = None;
112 }
113 let base = PlanBase::new_logical_with_core(&core);
114
115 let new_output_column = required_cols
116 .iter()
117 .map(|&i| self.output_columns[i].clone())
118 .collect();
119
120 LogicalIcebergIntermediateScan {
121 base,
122 core,
123 predicate: self.predicate.clone(),
124 output_columns: new_output_column,
125 time_travel_info: self.time_travel_info.clone(),
126 }
127 }
128}
129
130impl_plan_tree_node_for_leaf! { Logical, LogicalIcebergIntermediateScan }
131
132impl Distill for LogicalIcebergIntermediateScan {
133 fn distill<'a>(&self) -> XmlNode<'a> {
134 let verbose = self.base.ctx().is_explain_verbose();
135 let mut fields = Vec::with_capacity(if verbose { 4 } else { 2 });
136
137 if let Some(catalog) = self.source_catalog() {
138 fields.push(("source", Pretty::from(catalog.name.clone())));
139 } else {
140 fields.push(("source", Pretty::from("unknown")));
141 }
142 fields.push(("columns", column_names_pretty(self.schema())));
143
144 if verbose {
145 fields.push(("predicate", Pretty::debug(&self.predicate)));
146 fields.push(("output_columns", Pretty::debug(&self.output_columns)));
147 fields.push(("time_travel_info", Pretty::debug(&self.time_travel_info)));
148 }
149
150 childless_record("LogicalIcebergIntermediateScan", fields)
151 }
152}
153
154impl ColPrunable for LogicalIcebergIntermediateScan {
155 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
156 if required_cols.is_empty() {
157 LogicalProject::new(self.clone_with_required_cols(&[0]).into(), vec![]).into()
159 } else {
160 self.clone_with_required_cols(required_cols).into()
161 }
162 }
163}
164
165impl ExprRewritable<Logical> for LogicalIcebergIntermediateScan {}
166
167impl ExprVisitable for LogicalIcebergIntermediateScan {}
168
169impl PredicatePushdown for LogicalIcebergIntermediateScan {
170 fn predicate_pushdown(
171 &self,
172 predicate: Condition,
173 _ctx: &mut PredicatePushdownContext,
174 ) -> PlanRef {
175 let (iceberg_predicate, upper_conditions) =
176 to_iceberg_predicate(predicate, self.schema().fields());
177 let plan = self.clone_with_predicate(iceberg_predicate).into();
178 if upper_conditions.always_true() {
179 plan
180 } else {
181 LogicalFilter::create(plan, upper_conditions)
182 }
183 }
184}
185
186impl ToBatch for LogicalIcebergIntermediateScan {
187 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
188 Err(crate::error::ErrorCode::InternalError(
191 "LogicalIcebergIntermediateScan should be converted to LogicalIcebergScan before to_batch".to_owned()
192 )
193 .into())
194 }
195}
196
197impl ToStream for LogicalIcebergIntermediateScan {
198 fn to_stream(
199 &self,
200 _ctx: &mut ToStreamContext,
201 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
202 unreachable!("LogicalIcebergIntermediateScan is only for batch queries")
203 }
204
205 fn logical_rewrite_for_stream(
206 &self,
207 _ctx: &mut RewriteStreamContext,
208 ) -> Result<(PlanRef, ColIndexMapping)> {
209 unreachable!("LogicalIcebergIntermediateScan is only for batch queries")
210 }
211}