risingwave_frontend/optimizer/plan_node/
logical_iceberg_intermediate_scan.rs1use std::collections::HashMap;
16
17use educe::Educe;
18use iceberg::expr::Predicate;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::types::DataType;
22use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
23
24use super::generic::GenericPlanRef;
25use super::utils::{Distill, childless_record};
26use super::{
27 ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PredicatePushdown,
28 ToBatch, ToStream, generic,
29};
30use crate::catalog::source_catalog::SourceCatalog;
31use crate::error::Result;
32use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
33use crate::optimizer::plan_node::utils::column_names_pretty;
34use crate::optimizer::plan_node::{
35 ColumnPruningContext, LogicalFilter, LogicalProject, LogicalSource, PredicatePushdownContext,
36 RewriteStreamContext, ToStreamContext,
37};
38use crate::utils::{
39 ColIndexMapping, Condition, ExtractIcebergPredicateResult, extract_iceberg_predicate,
40};
41
42#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct HummockRewriteInfo {
47 pub origin_condition: Condition,
50 pub output_column_mapping: ColIndexMapping,
52}
53
54impl HummockRewriteInfo {
55 pub fn new(source_to_table_mapping: ColIndexMapping) -> Self {
58 Self {
59 origin_condition: Condition::true_cond(),
60 output_column_mapping: source_to_table_mapping,
61 }
62 }
63
64 pub fn add_predicate(&self, extracted_condition: Condition) -> Self {
66 let mut mapping = self.output_column_mapping.clone();
67 let remapped = extracted_condition.rewrite_expr(&mut mapping);
68 Self {
69 origin_condition: self.origin_condition.clone().and(remapped),
70 output_column_mapping: mapping,
71 }
72 }
73
74 pub fn prune_columns(&self, required_cols: &[usize]) -> Self {
77 let map = required_cols
78 .iter()
79 .map(|&idx| Some(self.output_column_mapping.map(idx)))
80 .collect();
81 Self {
82 origin_condition: self.origin_condition.clone(),
83 output_column_mapping: ColIndexMapping::new(
84 map,
85 self.output_column_mapping.target_size(),
86 ),
87 }
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Educe)]
104#[educe(Hash)]
105pub struct LogicalIcebergIntermediateScan {
106 pub base: PlanBase<Logical>,
107 pub core: generic::Source,
108 #[educe(Hash(ignore))]
109 pub iceberg_predicate: Predicate,
110 pub time_travel_info: IcebergTimeTravelInfo,
111 #[educe(Hash(ignore))]
117 pub table_column_type_mapping: HashMap<String, DataType>,
118 #[educe(Hash(ignore))]
120 pub hummock_rewrite: HummockRewriteInfo,
121}
122
123impl Eq for LogicalIcebergIntermediateScan {}
124
125impl LogicalIcebergIntermediateScan {
126 pub fn new(
127 logical_source: &LogicalSource,
128 time_travel_info: IcebergTimeTravelInfo,
129 table_column_type_mapping: HashMap<String, DataType>,
130 source_to_table_mapping: ColIndexMapping,
135 ) -> Self {
136 assert!(logical_source.core.is_iceberg_connector());
137
138 let mut core = logical_source.core.clone();
139 for col in &mut core.column_catalog {
142 if let Some(target_type) = table_column_type_mapping.get(col.name()) {
143 col.column_desc.data_type = target_type.clone();
144 }
145 }
146 let hummock_rewrite = HummockRewriteInfo::new(source_to_table_mapping);
147 let base = PlanBase::new_logical_with_core(&core);
148 assert!(logical_source.output_exprs.is_none());
149 LogicalIcebergIntermediateScan {
150 base,
151 core,
152 iceberg_predicate: Predicate::AlwaysTrue,
153 time_travel_info,
154 table_column_type_mapping,
155 hummock_rewrite,
156 }
157 }
158
159 pub fn source_catalog(&self) -> Option<&SourceCatalog> {
160 self.core.catalog.as_deref()
161 }
162
163 pub fn output_columns(&self) -> impl ExactSizeIterator<Item = &str> {
164 self.core.column_catalog.iter().map(|c| c.name.as_str())
165 }
166
167 pub fn add_predicate(
168 &self,
169 iceberg_predicate: Predicate,
170 extracted_condition: Condition,
171 ) -> Self {
172 LogicalIcebergIntermediateScan {
173 iceberg_predicate: self.iceberg_predicate.clone().and(iceberg_predicate),
174 hummock_rewrite: self.hummock_rewrite.add_predicate(extracted_condition),
175 ..self.clone()
176 }
177 }
178
179 pub fn has_type_mapping(&self) -> bool {
181 !self.table_column_type_mapping.is_empty()
182 }
183
184 pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
185 assert!(!required_cols.is_empty());
186
187 let mut core = self.core.clone();
188 core.column_catalog = required_cols
189 .iter()
190 .map(|idx| core.column_catalog[*idx].clone())
191 .collect();
192 core.row_id_index = required_cols
193 .iter()
194 .position(|idx| Some(*idx) == self.core.row_id_index);
195
196 let base = PlanBase::new_logical_with_core(&core);
197
198 LogicalIcebergIntermediateScan {
199 base,
200 core,
201 iceberg_predicate: self.iceberg_predicate.clone(),
202 time_travel_info: self.time_travel_info.clone(),
203 table_column_type_mapping: self.table_column_type_mapping.clone(),
204 hummock_rewrite: self.hummock_rewrite.prune_columns(required_cols),
205 }
206 }
207}
208
209impl_plan_tree_node_for_leaf! { Logical, LogicalIcebergIntermediateScan }
210
211impl Distill for LogicalIcebergIntermediateScan {
212 fn distill<'a>(&self) -> XmlNode<'a> {
213 let verbose = self.base.ctx().is_explain_verbose();
214 let mut fields = Vec::with_capacity(if verbose { 4 } else { 2 });
215
216 if let Some(catalog) = self.source_catalog() {
217 fields.push(("source", Pretty::from(catalog.name.clone())));
218 } else {
219 fields.push(("source", Pretty::from("unknown")));
220 }
221 fields.push(("columns", column_names_pretty(self.schema())));
222
223 if verbose {
224 fields.push(("predicate", Pretty::debug(&self.iceberg_predicate)));
225 fields.push((
226 "output_column",
227 Pretty::debug(&self.output_columns().collect_vec()),
228 ));
229 fields.push(("time_travel_info", Pretty::debug(&self.time_travel_info)));
230 }
231
232 childless_record("LogicalIcebergIntermediateScan", fields)
233 }
234}
235
236impl ColPrunable for LogicalIcebergIntermediateScan {
237 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
238 if required_cols.is_empty() {
239 LogicalProject::new(self.clone_with_required_cols(&[0]).into(), vec![]).into()
241 } else {
242 self.clone_with_required_cols(required_cols).into()
243 }
244 }
245}
246
247impl ExprRewritable<Logical> for LogicalIcebergIntermediateScan {}
248
249impl ExprVisitable for LogicalIcebergIntermediateScan {}
250
251impl PredicatePushdown for LogicalIcebergIntermediateScan {
252 fn predicate_pushdown(
253 &self,
254 predicate: Condition,
255 _ctx: &mut PredicatePushdownContext,
256 ) -> PlanRef {
257 let ExtractIcebergPredicateResult {
258 iceberg_predicate,
259 extracted_condition,
260 remaining_condition,
261 } = extract_iceberg_predicate(predicate, self.schema().fields());
262 let plan = self
263 .add_predicate(iceberg_predicate, extracted_condition)
264 .into();
265 if remaining_condition.always_true() {
266 plan
267 } else {
268 LogicalFilter::create(plan, remaining_condition)
269 }
270 }
271}
272
273impl ToBatch for LogicalIcebergIntermediateScan {
274 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
275 Err(crate::error::ErrorCode::InternalError(
278 "LogicalIcebergIntermediateScan should be converted to LogicalIcebergScan before to_batch".to_owned()
279 )
280 .into())
281 }
282}
283
284impl ToStream for LogicalIcebergIntermediateScan {
285 fn to_stream(
286 &self,
287 _ctx: &mut ToStreamContext,
288 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
289 unreachable!("LogicalIcebergIntermediateScan is only for batch queries")
290 }
291
292 fn logical_rewrite_for_stream(
293 &self,
294 _ctx: &mut RewriteStreamContext,
295 ) -> Result<(PlanRef, ColIndexMapping)> {
296 unreachable!("LogicalIcebergIntermediateScan is only for batch queries")
297 }
298}