risingwave_frontend/optimizer/plan_node/generic/
cdc_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16use std::str::FromStr;
17
18use anyhow::anyhow;
19use educe::Educe;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_connector::source::cdc::{
25    CDC_BACKFILL_ENABLE_KEY, CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY,
26    CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY,
27};
28use risingwave_pb::stream_plan::StreamCdcScanOptions;
29
30use super::GenericPlanNode;
31use crate::WithOptions;
32use crate::catalog::ColumnId;
33use crate::error::Result;
34use crate::expr::{ExprRewriter, ExprVisitor};
35use crate::optimizer::optimizer_context::OptimizerContextRef;
36use crate::optimizer::property::{FunctionalDependencySet, MonotonicityMap, WatermarkColumns};
37
38/// [`CdcScan`] reads rows of a table from an external upstream database
39#[derive(Debug, Clone, Educe)]
40#[educe(PartialEq, Eq, Hash)]
41pub struct CdcScan {
42    pub table_name: String,
43    /// Include `output_col_idx` and columns required in `predicate`
44    pub output_col_idx: Vec<usize>,
45    /// Descriptor of the external table for CDC
46    pub cdc_table_desc: Rc<CdcTableDesc>,
47    #[educe(PartialEq(ignore))]
48    #[educe(Hash(ignore))]
49    pub ctx: OptimizerContextRef,
50
51    pub options: CdcScanOptions,
52}
53
54#[derive(Debug, Clone, Hash, PartialEq)]
55pub struct CdcScanOptions {
56    pub disable_backfill: bool,
57    pub snapshot_barrier_interval: u32,
58    pub snapshot_batch_size: u32,
59}
60
61impl Default for CdcScanOptions {
62    fn default() -> Self {
63        Self {
64            disable_backfill: false,
65            snapshot_barrier_interval: 1,
66            snapshot_batch_size: 1000,
67        }
68    }
69}
70
71impl CdcScanOptions {
72    pub fn from_with_options(with_options: &WithOptions) -> Result<Self> {
73        // unspecified option will use default values
74        let mut scan_options = Self::default();
75
76        // disable backfill if 'snapshot=false'
77        if let Some(snapshot) = with_options.get(CDC_BACKFILL_ENABLE_KEY) {
78            scan_options.disable_backfill = !(bool::from_str(snapshot)
79                .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?);
80        };
81
82        if let Some(snapshot_interval) = with_options.get(CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY) {
83            scan_options.snapshot_barrier_interval = u32::from_str(snapshot_interval)
84                .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY))?;
85        };
86
87        if let Some(snapshot_batch_size) = with_options.get(CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY) {
88            scan_options.snapshot_batch_size =
89                u32::from_str(snapshot_batch_size).map_err(|_| {
90                    anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY)
91                })?;
92        };
93
94        Ok(scan_options)
95    }
96
97    pub fn to_proto(&self) -> StreamCdcScanOptions {
98        StreamCdcScanOptions {
99            disable_backfill: self.disable_backfill,
100            snapshot_barrier_interval: self.snapshot_barrier_interval,
101            snapshot_batch_size: self.snapshot_batch_size,
102        }
103    }
104}
105
106impl CdcScan {
107    pub fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) {}
108
109    pub fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
110
111    /// Get the ids of the output columns.
112    pub fn output_column_ids(&self) -> Vec<ColumnId> {
113        self.output_col_idx
114            .iter()
115            .map(|i| self.get_table_columns()[*i].column_id)
116            .collect()
117    }
118
119    pub fn primary_key(&self) -> &[ColumnOrder] {
120        &self.cdc_table_desc.pk
121    }
122
123    pub fn watermark_columns(&self) -> WatermarkColumns {
124        WatermarkColumns::new()
125    }
126
127    pub fn columns_monotonicity(&self) -> MonotonicityMap {
128        MonotonicityMap::new()
129    }
130
131    pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
132        self.output_col_idx
133            .iter()
134            .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
135            .collect()
136    }
137
138    pub(crate) fn column_names(&self) -> Vec<String> {
139        self.output_col_idx
140            .iter()
141            .map(|&i| self.get_table_columns()[i].name.clone())
142            .collect()
143    }
144
145    /// get the Mapping of columnIndex from internal column index to output column index
146    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
147        ColIndexMapping::with_remaining_columns(
148            &self.output_col_idx,
149            self.get_table_columns().len(),
150        )
151    }
152
153    /// Get the ids of the output columns and primary key columns.
154    pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
155        let mut ids = self.output_column_ids();
156        for column_order in self.primary_key() {
157            let id = self.get_table_columns()[column_order.column_index].column_id;
158            if !ids.contains(&id) {
159                ids.push(id);
160            }
161        }
162        ids
163    }
164
165    /// Create a logical scan node for CDC backfill
166    pub(crate) fn new(
167        table_name: String,
168        output_col_idx: Vec<usize>, // the column index in the table
169        cdc_table_desc: Rc<CdcTableDesc>,
170        ctx: OptimizerContextRef,
171        options: CdcScanOptions,
172    ) -> Self {
173        Self {
174            table_name,
175            output_col_idx,
176            cdc_table_desc,
177            ctx,
178            options,
179        }
180    }
181
182    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
183        Pretty::Array(
184            match verbose {
185                true => self.column_names_with_table_prefix(),
186                false => self.column_names(),
187            }
188            .into_iter()
189            .map(Pretty::from)
190            .collect(),
191        )
192    }
193}
194
195// TODO: extend for cdc table
196impl GenericPlanNode for CdcScan {
197    fn schema(&self) -> Schema {
198        let fields = self
199            .output_col_idx
200            .iter()
201            .map(|tb_idx| {
202                let col = &self.get_table_columns()[*tb_idx];
203                Field::from_with_table_name_prefix(col, &self.table_name)
204            })
205            .collect();
206        Schema { fields }
207    }
208
209    fn stream_key(&self) -> Option<Vec<usize>> {
210        Some(self.cdc_table_desc.stream_key.clone())
211    }
212
213    fn ctx(&self) -> OptimizerContextRef {
214        self.ctx.clone()
215    }
216
217    fn functional_dependency(&self) -> FunctionalDependencySet {
218        let pk_indices = self.stream_key();
219        let col_num = self.output_col_idx.len();
220        match &pk_indices {
221            Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
222            None => FunctionalDependencySet::new(col_num),
223        }
224    }
225}
226
227impl CdcScan {
228    pub fn get_table_columns(&self) -> &[ColumnDesc] {
229        &self.cdc_table_desc.columns
230    }
231
232    pub fn append_only(&self) -> bool {
233        false
234    }
235
236    /// Get the descs of the output columns.
237    pub fn column_descs(&self) -> Vec<ColumnDesc> {
238        self.output_col_idx
239            .iter()
240            .map(|&i| self.get_table_columns()[i].clone())
241            .collect()
242    }
243}