risingwave_frontend/optimizer/plan_node/generic/
cdc_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16use std::str::FromStr;
17
18use anyhow::anyhow;
19use educe::Educe;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_connector::source::cdc::external::ExternalCdcTableType;
25use risingwave_connector::source::cdc::{
26    CDC_BACKFILL_AS_EVEN_SPLITS, CDC_BACKFILL_ENABLE_KEY, CDC_BACKFILL_MAX_PARALLELISM,
27    CDC_BACKFILL_NUM_ROWS_PER_SPLIT, CDC_BACKFILL_PARALLELISM,
28    CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY, CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY,
29    CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX, CdcScanOptions,
30};
31
32use super::GenericPlanNode;
33use crate::WithOptions;
34use crate::catalog::ColumnId;
35use crate::error::Result;
36use crate::expr::{ExprRewriter, ExprVisitor};
37use crate::optimizer::optimizer_context::OptimizerContextRef;
38use crate::optimizer::property::{FunctionalDependencySet, MonotonicityMap, WatermarkColumns};
39
40/// [`CdcScan`] reads rows of a table from an external upstream database
41#[derive(Debug, Clone, Educe)]
42#[educe(PartialEq, Eq, Hash)]
43pub struct CdcScan {
44    pub table_name: String,
45    /// Include `output_col_idx` and columns required in `predicate`
46    pub output_col_idx: Vec<usize>,
47    /// Descriptor of the external table for CDC
48    pub cdc_table_desc: Rc<CdcTableDesc>,
49    #[educe(PartialEq(ignore))]
50    #[educe(Hash(ignore))]
51    pub ctx: OptimizerContextRef,
52
53    pub options: CdcScanOptions,
54}
55
56pub fn build_cdc_scan_options_with_options(
57    with_options: &WithOptions,
58    cdc_table_type: &ExternalCdcTableType,
59) -> Result<CdcScanOptions> {
60    // Update this after more CDC table type is supported for backfill v2.
61    let support_backfill_v2 = matches!(
62        cdc_table_type,
63        ExternalCdcTableType::Postgres | ExternalCdcTableType::Mock
64    );
65
66    // unspecified option will use default values
67    let mut scan_options = CdcScanOptions::default();
68
69    // disable backfill if 'snapshot=false'
70    if let Some(snapshot) = with_options.get(CDC_BACKFILL_ENABLE_KEY) {
71        scan_options.disable_backfill = !(bool::from_str(snapshot)
72            .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?);
73    };
74
75    if let Some(snapshot_interval) = with_options.get(CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY) {
76        scan_options.snapshot_barrier_interval = u32::from_str(snapshot_interval)
77            .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY))?;
78    };
79
80    if let Some(snapshot_batch_size) = with_options.get(CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY) {
81        scan_options.snapshot_batch_size = u32::from_str(snapshot_batch_size)
82            .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY))?;
83    };
84
85    if support_backfill_v2 {
86        if let Some(backfill_parallelism) = with_options.get(CDC_BACKFILL_PARALLELISM) {
87            scan_options.backfill_parallelism = u32::from_str(backfill_parallelism)
88                .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_PARALLELISM))?;
89            if scan_options.backfill_parallelism > CDC_BACKFILL_MAX_PARALLELISM {
90                return Err(anyhow!(
91                    "Invalid value for {}, should be in range [0,{}] ",
92                    CDC_BACKFILL_PARALLELISM,
93                    CDC_BACKFILL_MAX_PARALLELISM
94                )
95                .into());
96            }
97        }
98
99        if let Some(backfill_num_rows_per_split) = with_options.get(CDC_BACKFILL_NUM_ROWS_PER_SPLIT)
100        {
101            scan_options.backfill_num_rows_per_split = u64::from_str(backfill_num_rows_per_split)
102                .map_err(|_| {
103                anyhow!("Invalid value for {}", CDC_BACKFILL_NUM_ROWS_PER_SPLIT)
104            })?;
105        }
106
107        if let Some(backfill_as_even_splits) = with_options.get(CDC_BACKFILL_AS_EVEN_SPLITS) {
108            scan_options.backfill_as_even_splits = bool::from_str(backfill_as_even_splits)
109                .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_AS_EVEN_SPLITS))?;
110        }
111
112        if let Some(backfill_split_pk_column_index) =
113            with_options.get(CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX)
114        {
115            scan_options.backfill_split_pk_column_index =
116                u32::from_str(backfill_split_pk_column_index).map_err(|_| {
117                    anyhow!("Invalid value for {}", CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX)
118                })?;
119        }
120    }
121
122    Ok(scan_options)
123}
124
125impl CdcScan {
126    pub fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) {}
127
128    pub fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
129
130    /// Get the ids of the output columns.
131    pub fn output_column_ids(&self) -> Vec<ColumnId> {
132        self.output_col_idx
133            .iter()
134            .map(|i| self.get_table_columns()[*i].column_id)
135            .collect()
136    }
137
138    pub fn primary_key(&self) -> &[ColumnOrder] {
139        &self.cdc_table_desc.pk
140    }
141
142    pub fn watermark_columns(&self) -> WatermarkColumns {
143        WatermarkColumns::new()
144    }
145
146    pub fn columns_monotonicity(&self) -> MonotonicityMap {
147        MonotonicityMap::new()
148    }
149
150    pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
151        self.output_col_idx
152            .iter()
153            .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
154            .collect()
155    }
156
157    pub(crate) fn column_names(&self) -> Vec<String> {
158        self.output_col_idx
159            .iter()
160            .map(|&i| self.get_table_columns()[i].name.clone())
161            .collect()
162    }
163
164    /// get the Mapping of columnIndex from internal column index to output column index
165    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
166        ColIndexMapping::with_remaining_columns(
167            &self.output_col_idx,
168            self.get_table_columns().len(),
169        )
170    }
171
172    /// Get the ids of the output columns and primary key columns.
173    pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
174        let mut ids = self.output_column_ids();
175        for column_order in self.primary_key() {
176            let id = self.get_table_columns()[column_order.column_index].column_id;
177            if !ids.contains(&id) {
178                ids.push(id);
179            }
180        }
181        ids
182    }
183
184    /// Create a logical scan node for CDC backfill
185    pub(crate) fn new(
186        table_name: String,
187        output_col_idx: Vec<usize>, // the column index in the table
188        cdc_table_desc: Rc<CdcTableDesc>,
189        ctx: OptimizerContextRef,
190        options: CdcScanOptions,
191    ) -> Self {
192        Self {
193            table_name,
194            output_col_idx,
195            cdc_table_desc,
196            ctx,
197            options,
198        }
199    }
200
201    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
202        Pretty::Array(
203            match verbose {
204                true => self.column_names_with_table_prefix(),
205                false => self.column_names(),
206            }
207            .into_iter()
208            .map(Pretty::from)
209            .collect(),
210        )
211    }
212}
213
214// TODO: extend for cdc table
215impl GenericPlanNode for CdcScan {
216    fn schema(&self) -> Schema {
217        let fields = self
218            .output_col_idx
219            .iter()
220            .map(|tb_idx| {
221                let col = &self.get_table_columns()[*tb_idx];
222                Field::from_with_table_name_prefix(col, &self.table_name)
223            })
224            .collect();
225        Schema { fields }
226    }
227
228    fn stream_key(&self) -> Option<Vec<usize>> {
229        Some(self.cdc_table_desc.stream_key.clone())
230    }
231
232    fn ctx(&self) -> OptimizerContextRef {
233        self.ctx.clone()
234    }
235
236    fn functional_dependency(&self) -> FunctionalDependencySet {
237        let pk_indices = self.stream_key();
238        let col_num = self.output_col_idx.len();
239        match &pk_indices {
240            Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
241            None => FunctionalDependencySet::new(col_num),
242        }
243    }
244}
245
246impl CdcScan {
247    pub fn get_table_columns(&self) -> &[ColumnDesc] {
248        &self.cdc_table_desc.columns
249    }
250
251    /// Get the descs of the output columns.
252    pub fn column_descs(&self) -> Vec<ColumnDesc> {
253        self.output_col_idx
254            .iter()
255            .map(|&i| self.get_table_columns()[i].clone())
256            .collect()
257    }
258}