risingwave_frontend/optimizer/plan_node/
logical_iceberg_scan.rs1use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
19
20use super::generic::GenericPlanRef;
21use super::utils::{Distill, childless_record};
22use super::{
23 ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef, PredicatePushdown,
24 ToBatch, ToStream, generic,
25};
26use crate::catalog::source_catalog::SourceCatalog;
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::column_names_pretty;
30use crate::optimizer::plan_node::{
31 BatchIcebergScan, ColumnPruningContext, LogicalFilter, LogicalSource, PredicatePushdownContext,
32 RewriteStreamContext, ToStreamContext,
33};
34use crate::utils::{ColIndexMapping, Condition};
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct LogicalIcebergScan {
39 pub base: PlanBase<Logical>,
40 pub core: generic::Source,
41 iceberg_scan_type: IcebergScanType,
42}
43
44impl LogicalIcebergScan {
45 pub fn new(logical_source: &LogicalSource, iceberg_scan_type: IcebergScanType) -> Self {
46 assert!(logical_source.core.is_iceberg_connector());
47
48 let core = logical_source.core.clone();
49 let base = PlanBase::new_logical_with_core(&core);
50
51 assert!(logical_source.output_exprs.is_none());
52
53 LogicalIcebergScan {
54 base,
55 core,
56 iceberg_scan_type,
57 }
58 }
59
60 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
61 self.core.catalog.clone()
62 }
63
64 pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
65 assert!(!required_cols.is_empty());
66 let mut core = self.core.clone();
67 let mut has_row_id = false;
68 core.column_catalog = required_cols
69 .iter()
70 .map(|idx| {
71 if Some(*idx) == core.row_id_index {
72 has_row_id = true;
73 }
74 core.column_catalog[*idx].clone()
75 })
76 .collect();
77 if !has_row_id {
78 core.row_id_index = None;
79 }
80 let base = PlanBase::new_logical_with_core(&core);
81
82 LogicalIcebergScan {
83 base,
84 core,
85 iceberg_scan_type: self.iceberg_scan_type,
86 }
87 }
88}
89
90impl_plan_tree_node_for_leaf! {LogicalIcebergScan}
91impl Distill for LogicalIcebergScan {
92 fn distill<'a>(&self) -> XmlNode<'a> {
93 let fields = if let Some(catalog) = self.source_catalog() {
94 let src = Pretty::from(catalog.name.clone());
95 vec![
96 ("source", src),
97 ("columns", column_names_pretty(self.schema())),
98 ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
99 ]
100 } else {
101 vec![]
102 };
103 childless_record("LogicalIcebergScan", fields)
104 }
105}
106
107impl ColPrunable for LogicalIcebergScan {
108 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
109 if required_cols.is_empty() {
110 let mapping =
111 ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
112 LogicalProject::with_mapping(self.clone_with_required_cols(&[0]).into(), mapping).into()
114 } else {
115 self.clone_with_required_cols(required_cols).into()
116 }
117 }
118}
119
120impl ExprRewritable for LogicalIcebergScan {}
121
122impl ExprVisitable for LogicalIcebergScan {}
123
124impl PredicatePushdown for LogicalIcebergScan {
125 fn predicate_pushdown(
126 &self,
127 predicate: Condition,
128 _ctx: &mut PredicatePushdownContext,
129 ) -> PlanRef {
130 LogicalFilter::create(self.clone().into(), predicate)
132 }
133}
134
135impl ToBatch for LogicalIcebergScan {
136 fn to_batch(&self) -> Result<PlanRef> {
137 let plan: PlanRef = BatchIcebergScan::new(self.core.clone(), self.iceberg_scan_type).into();
138 Ok(plan)
139 }
140}
141
142impl ToStream for LogicalIcebergScan {
143 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
144 unreachable!()
145 }
146
147 fn logical_rewrite_for_stream(
148 &self,
149 _ctx: &mut RewriteStreamContext,
150 ) -> Result<(PlanRef, ColIndexMapping)> {
151 unreachable!()
152 }
153}