risingwave_frontend/optimizer/plan_node/
logical_cdc_scan.rs1use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::{CdcTableDesc, ColumnDesc};
19
20use super::generic::GenericPlanRef;
21use super::utils::{Distill, childless_record};
22use super::{
23 ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream,
24 generic,
25};
26use crate::catalog::ColumnId;
27use crate::error::Result;
28use crate::expr::{ExprRewriter, ExprVisitor};
29use crate::optimizer::optimizer_context::OptimizerContextRef;
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::plan_node::generic::CdcScanOptions;
32use crate::optimizer::plan_node::{
33 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamCdcTableScan,
34 ToStreamContext,
35};
36use crate::optimizer::property::Order;
37use crate::utils::{ColIndexMapping, Condition};
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct LogicalCdcScan {
42 pub base: PlanBase<Logical>,
43 core: generic::CdcScan,
44}
45
46impl From<generic::CdcScan> for LogicalCdcScan {
47 fn from(core: generic::CdcScan) -> Self {
48 let base = PlanBase::new_logical_with_core(&core);
49 Self { base, core }
50 }
51}
52
53impl From<generic::CdcScan> for PlanRef {
54 fn from(core: generic::CdcScan) -> Self {
55 LogicalCdcScan::from(core).into()
56 }
57}
58
59impl LogicalCdcScan {
60 pub fn create(
61 table_name: String, cdc_table_desc: Rc<CdcTableDesc>,
63 ctx: OptimizerContextRef,
64 options: CdcScanOptions,
65 ) -> Self {
66 generic::CdcScan::new(
67 table_name,
68 (0..cdc_table_desc.columns.len()).collect(),
69 cdc_table_desc,
70 ctx,
71 options,
72 )
73 .into()
74 }
75
76 pub fn table_name(&self) -> &str {
77 &self.core.table_name
78 }
79
80 pub fn cdc_table_desc(&self) -> &CdcTableDesc {
81 self.core.cdc_table_desc.as_ref()
82 }
83
84 pub fn column_descs(&self) -> Vec<ColumnDesc> {
86 self.core.column_descs()
87 }
88
89 pub fn output_column_ids(&self) -> Vec<ColumnId> {
91 self.core.output_column_ids()
92 }
93
94 pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
95 generic::CdcScan::new(
96 self.table_name().to_owned(),
97 output_col_idx,
98 self.core.cdc_table_desc.clone(),
99 self.base.ctx().clone(),
100 self.core.options.clone(),
101 )
102 .into()
103 }
104
105 pub fn output_col_idx(&self) -> &Vec<usize> {
106 &self.core.output_col_idx
107 }
108}
109
110impl_plan_tree_node_for_leaf! {LogicalCdcScan}
111
112impl Distill for LogicalCdcScan {
113 fn distill<'a>(&self) -> XmlNode<'a> {
114 let verbose = self.base.ctx().is_explain_verbose();
115 let mut vec = Vec::with_capacity(5);
116 vec.push(("table", Pretty::from(self.table_name().to_owned())));
117 let key_is_columns = true;
118 let key = if key_is_columns {
119 "columns"
120 } else {
121 "output_columns"
122 };
123 vec.push((key, self.core.columns_pretty(verbose)));
124 if !key_is_columns {
125 vec.push((
126 "required_columns",
127 Pretty::Array(
128 self.output_col_idx()
129 .iter()
130 .map(|i| {
131 let col_name = &self.cdc_table_desc().columns[*i].name;
132 Pretty::from(if verbose {
133 format!("{}.{}", self.table_name(), col_name)
134 } else {
135 col_name.to_string()
136 })
137 })
138 .collect(),
139 ),
140 ));
141 }
142
143 childless_record("LogicalCdcScan", vec)
144 }
145}
146
147impl ColPrunable for LogicalCdcScan {
148 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
149 let output_col_idx: Vec<usize> = required_cols
150 .iter()
151 .map(|i| self.output_col_idx()[*i])
152 .collect();
153 assert!(
154 output_col_idx
155 .iter()
156 .all(|i| self.output_col_idx().contains(i))
157 );
158
159 self.clone_with_output_indices(output_col_idx).into()
160 }
161}
162
163impl ExprRewritable for LogicalCdcScan {
164 fn has_rewritable_expr(&self) -> bool {
165 true
166 }
167
168 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
169 let core = self.core.clone();
170 core.rewrite_exprs(r);
171 Self {
172 base: self.base.clone_with_new_plan_id(),
173 core,
174 }
175 .into()
176 }
177}
178
179impl ExprVisitable for LogicalCdcScan {
180 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
181 self.core.visit_exprs(v);
182 }
183}
184
185impl PredicatePushdown for LogicalCdcScan {
186 fn predicate_pushdown(
187 &self,
188 _predicate: Condition,
189 _ctx: &mut PredicatePushdownContext,
190 ) -> PlanRef {
191 self.clone().into()
192 }
193}
194
195impl ToBatch for LogicalCdcScan {
196 fn to_batch(&self) -> Result<PlanRef> {
197 unreachable!()
198 }
199
200 fn to_batch_with_order_required(&self, _required_order: &Order) -> Result<PlanRef> {
201 unreachable!()
202 }
203}
204
205impl ToStream for LogicalCdcScan {
206 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
207 Ok(StreamCdcTableScan::new(self.core.clone()).into())
208 }
209
210 fn logical_rewrite_for_stream(
211 &self,
212 _ctx: &mut RewriteStreamContext,
213 ) -> Result<(PlanRef, ColIndexMapping)> {
214 Ok((
215 self.clone().into(),
216 ColIndexMapping::identity(self.schema().len()),
217 ))
218 }
219}