risingwave_frontend/optimizer/plan_node/
logical_iceberg_scan.rs1use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
19
20use super::generic::GenericPlanRef;
21use super::utils::{Distill, childless_record};
22use super::{
23 ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject, PlanBase,
24 PredicatePushdown, ToBatch, ToStream, generic,
25};
26use crate::catalog::source_catalog::SourceCatalog;
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::column_names_pretty;
30use crate::optimizer::plan_node::{
31 BatchIcebergScan, ColumnPruningContext, LogicalFilter, LogicalSource, PredicatePushdownContext,
32 RewriteStreamContext, ToStreamContext,
33};
34use crate::utils::{ColIndexMapping, Condition};
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct LogicalIcebergScan {
39 pub base: PlanBase<Logical>,
40 pub core: generic::Source,
41 iceberg_scan_type: IcebergScanType,
42 snapshot_id: Option<i64>,
43}
44
45impl LogicalIcebergScan {
46 pub fn new(
47 logical_source: &LogicalSource,
48 iceberg_scan_type: IcebergScanType,
49 snapshot_id: Option<i64>,
50 ) -> Self {
51 assert!(logical_source.core.is_iceberg_connector());
52
53 let core = logical_source.core.clone();
54 let base = PlanBase::new_logical_with_core(&core);
55
56 assert!(logical_source.output_exprs.is_none());
57
58 LogicalIcebergScan {
59 base,
60 core,
61 iceberg_scan_type,
62 snapshot_id,
63 }
64 }
65
66 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
67 self.core.catalog.clone()
68 }
69
70 pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
71 assert!(!required_cols.is_empty());
72 let mut core = self.core.clone();
73 let mut has_row_id = false;
74 core.column_catalog = required_cols
75 .iter()
76 .map(|idx| {
77 if Some(*idx) == core.row_id_index {
78 has_row_id = true;
79 }
80 core.column_catalog[*idx].clone()
81 })
82 .collect();
83 if !has_row_id {
84 core.row_id_index = None;
85 }
86 let base = PlanBase::new_logical_with_core(&core);
87
88 LogicalIcebergScan {
89 base,
90 core,
91 iceberg_scan_type: self.iceberg_scan_type,
92 snapshot_id: self.snapshot_id,
93 }
94 }
95}
96
97impl_plan_tree_node_for_leaf! { Logical, LogicalIcebergScan}
98impl Distill for LogicalIcebergScan {
99 fn distill<'a>(&self) -> XmlNode<'a> {
100 let fields = if let Some(catalog) = self.source_catalog() {
101 let src = Pretty::from(catalog.name.clone());
102 vec![
103 ("source", src),
104 ("columns", column_names_pretty(self.schema())),
105 ("iceberg_scan_type", Pretty::debug(&self.iceberg_scan_type)),
106 ]
107 } else {
108 vec![]
109 };
110 childless_record("LogicalIcebergScan", fields)
111 }
112}
113
114impl ColPrunable for LogicalIcebergScan {
115 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
116 if required_cols.is_empty() {
117 let mapping =
118 ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
119 LogicalProject::with_mapping(self.clone_with_required_cols(&[0]).into(), mapping).into()
121 } else {
122 self.clone_with_required_cols(required_cols).into()
123 }
124 }
125}
126
127impl ExprRewritable<Logical> for LogicalIcebergScan {}
128
129impl ExprVisitable for LogicalIcebergScan {}
130
131impl PredicatePushdown for LogicalIcebergScan {
132 fn predicate_pushdown(
133 &self,
134 predicate: Condition,
135 _ctx: &mut PredicatePushdownContext,
136 ) -> PlanRef {
137 LogicalFilter::create(self.clone().into(), predicate)
139 }
140}
141
142impl ToBatch for LogicalIcebergScan {
143 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
144 let plan =
145 BatchIcebergScan::new(self.core.clone(), self.iceberg_scan_type, self.snapshot_id)
146 .into();
147 Ok(plan)
148 }
149}
150
151impl ToStream for LogicalIcebergScan {
152 fn to_stream(
153 &self,
154 _ctx: &mut ToStreamContext,
155 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
156 unreachable!()
157 }
158
159 fn logical_rewrite_for_stream(
160 &self,
161 _ctx: &mut RewriteStreamContext,
162 ) -> Result<(PlanRef, ColIndexMapping)> {
163 unreachable!()
164 }
165}