risingwave_frontend/optimizer/plan_node/
logical_cdc_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::{CdcTableDesc, ColumnDesc};
19
20use super::generic::GenericPlanRef;
21use super::utils::{Distill, childless_record};
22use super::{
23    ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream,
24    generic,
25};
26use crate::catalog::ColumnId;
27use crate::error::Result;
28use crate::expr::{ExprRewriter, ExprVisitor};
29use crate::optimizer::optimizer_context::OptimizerContextRef;
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::plan_node::generic::CdcScanOptions;
32use crate::optimizer::plan_node::{
33    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamCdcTableScan,
34    ToStreamContext,
35};
36use crate::optimizer::property::Order;
37use crate::utils::{ColIndexMapping, Condition};
38
39/// `LogicalCdcScan` reads rows of a table from an external upstream database
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct LogicalCdcScan {
42    pub base: PlanBase<Logical>,
43    core: generic::CdcScan,
44}
45
46impl From<generic::CdcScan> for LogicalCdcScan {
47    fn from(core: generic::CdcScan) -> Self {
48        let base = PlanBase::new_logical_with_core(&core);
49        Self { base, core }
50    }
51}
52
53impl From<generic::CdcScan> for PlanRef {
54    fn from(core: generic::CdcScan) -> Self {
55        LogicalCdcScan::from(core).into()
56    }
57}
58
59impl LogicalCdcScan {
60    pub fn create(
61        table_name: String, // explain-only
62        cdc_table_desc: Rc<CdcTableDesc>,
63        ctx: OptimizerContextRef,
64        options: CdcScanOptions,
65    ) -> Self {
66        generic::CdcScan::new(
67            table_name,
68            (0..cdc_table_desc.columns.len()).collect(),
69            cdc_table_desc,
70            ctx,
71            options,
72        )
73        .into()
74    }
75
76    pub fn table_name(&self) -> &str {
77        &self.core.table_name
78    }
79
80    pub fn cdc_table_desc(&self) -> &CdcTableDesc {
81        self.core.cdc_table_desc.as_ref()
82    }
83
84    /// Get the descs of the output columns.
85    pub fn column_descs(&self) -> Vec<ColumnDesc> {
86        self.core.column_descs()
87    }
88
89    /// Get the ids of the output columns.
90    pub fn output_column_ids(&self) -> Vec<ColumnId> {
91        self.core.output_column_ids()
92    }
93
94    pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
95        generic::CdcScan::new(
96            self.table_name().to_owned(),
97            output_col_idx,
98            self.core.cdc_table_desc.clone(),
99            self.base.ctx().clone(),
100            self.core.options.clone(),
101        )
102        .into()
103    }
104
105    pub fn output_col_idx(&self) -> &Vec<usize> {
106        &self.core.output_col_idx
107    }
108}
109
110impl_plan_tree_node_for_leaf! {LogicalCdcScan}
111
112impl Distill for LogicalCdcScan {
113    fn distill<'a>(&self) -> XmlNode<'a> {
114        let verbose = self.base.ctx().is_explain_verbose();
115        let mut vec = Vec::with_capacity(5);
116        vec.push(("table", Pretty::from(self.table_name().to_owned())));
117        let key_is_columns = true;
118        let key = if key_is_columns {
119            "columns"
120        } else {
121            "output_columns"
122        };
123        vec.push((key, self.core.columns_pretty(verbose)));
124        if !key_is_columns {
125            vec.push((
126                "required_columns",
127                Pretty::Array(
128                    self.output_col_idx()
129                        .iter()
130                        .map(|i| {
131                            let col_name = &self.cdc_table_desc().columns[*i].name;
132                            Pretty::from(if verbose {
133                                format!("{}.{}", self.table_name(), col_name)
134                            } else {
135                                col_name.to_string()
136                            })
137                        })
138                        .collect(),
139                ),
140            ));
141        }
142
143        childless_record("LogicalCdcScan", vec)
144    }
145}
146
147impl ColPrunable for LogicalCdcScan {
148    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
149        let output_col_idx: Vec<usize> = required_cols
150            .iter()
151            .map(|i| self.output_col_idx()[*i])
152            .collect();
153        assert!(
154            output_col_idx
155                .iter()
156                .all(|i| self.output_col_idx().contains(i))
157        );
158
159        self.clone_with_output_indices(output_col_idx).into()
160    }
161}
162
163impl ExprRewritable for LogicalCdcScan {
164    fn has_rewritable_expr(&self) -> bool {
165        true
166    }
167
168    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
169        let core = self.core.clone();
170        core.rewrite_exprs(r);
171        Self {
172            base: self.base.clone_with_new_plan_id(),
173            core,
174        }
175        .into()
176    }
177}
178
179impl ExprVisitable for LogicalCdcScan {
180    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
181        self.core.visit_exprs(v);
182    }
183}
184
185impl PredicatePushdown for LogicalCdcScan {
186    fn predicate_pushdown(
187        &self,
188        _predicate: Condition,
189        _ctx: &mut PredicatePushdownContext,
190    ) -> PlanRef {
191        self.clone().into()
192    }
193}
194
195impl ToBatch for LogicalCdcScan {
196    fn to_batch(&self) -> Result<PlanRef> {
197        unreachable!()
198    }
199
200    fn to_batch_with_order_required(&self, _required_order: &Order) -> Result<PlanRef> {
201        unreachable!()
202    }
203}
204
205impl ToStream for LogicalCdcScan {
206    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
207        Ok(StreamCdcTableScan::new(self.core.clone()).into())
208    }
209
210    fn logical_rewrite_for_stream(
211        &self,
212        _ctx: &mut RewriteStreamContext,
213    ) -> Result<(PlanRef, ColIndexMapping)> {
214        Ok((
215            self.clone().into(),
216            ColIndexMapping::identity(self.schema().len()),
217        ))
218    }
219}