risingwave_frontend/optimizer/plan_node/generic/
cdc_scan.rs1use std::rc::Rc;
16use std::str::FromStr;
17
18use anyhow::anyhow;
19use educe::Educe;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_connector::source::cdc::external::ExternalCdcTableType;
25use risingwave_connector::source::cdc::{
26 CDC_BACKFILL_AS_EVEN_SPLITS, CDC_BACKFILL_ENABLE_KEY, CDC_BACKFILL_MAX_PARALLELISM,
27 CDC_BACKFILL_NUM_ROWS_PER_SPLIT, CDC_BACKFILL_PARALLELISM,
28 CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY, CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY,
29 CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX, CdcScanOptions,
30};
31
32use super::GenericPlanNode;
33use crate::WithOptions;
34use crate::catalog::ColumnId;
35use crate::error::Result;
36use crate::expr::{ExprRewriter, ExprVisitor};
37use crate::optimizer::optimizer_context::OptimizerContextRef;
38use crate::optimizer::property::{FunctionalDependencySet, MonotonicityMap, WatermarkColumns};
39
40#[derive(Debug, Clone, Educe)]
42#[educe(PartialEq, Eq, Hash)]
43pub struct CdcScan {
44 pub table_name: String,
45 pub output_col_idx: Vec<usize>,
47 pub cdc_table_desc: Rc<CdcTableDesc>,
49 #[educe(PartialEq(ignore))]
50 #[educe(Hash(ignore))]
51 pub ctx: OptimizerContextRef,
52
53 pub options: CdcScanOptions,
54}
55
56pub fn build_cdc_scan_options_with_options(
57 with_options: &WithOptions,
58 cdc_table_type: &ExternalCdcTableType,
59) -> Result<CdcScanOptions> {
60 let support_backfill_v2 = matches!(
62 cdc_table_type,
63 ExternalCdcTableType::Postgres | ExternalCdcTableType::Mock
64 );
65
66 let mut scan_options = CdcScanOptions::default();
68
69 if let Some(snapshot) = with_options.get(CDC_BACKFILL_ENABLE_KEY) {
71 scan_options.disable_backfill = !(bool::from_str(snapshot)
72 .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?);
73 };
74
75 if let Some(snapshot_interval) = with_options.get(CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY) {
76 scan_options.snapshot_barrier_interval = u32::from_str(snapshot_interval)
77 .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY))?;
78 };
79
80 if let Some(snapshot_batch_size) = with_options.get(CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY) {
81 scan_options.snapshot_batch_size = u32::from_str(snapshot_batch_size)
82 .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY))?;
83 };
84
85 if support_backfill_v2 {
86 if let Some(backfill_parallelism) = with_options.get(CDC_BACKFILL_PARALLELISM) {
87 scan_options.backfill_parallelism = u32::from_str(backfill_parallelism)
88 .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_PARALLELISM))?;
89 if scan_options.backfill_parallelism > CDC_BACKFILL_MAX_PARALLELISM {
90 return Err(anyhow!(
91 "Invalid value for {}, should be in range [0,{}] ",
92 CDC_BACKFILL_PARALLELISM,
93 CDC_BACKFILL_MAX_PARALLELISM
94 )
95 .into());
96 }
97 }
98
99 if let Some(backfill_num_rows_per_split) = with_options.get(CDC_BACKFILL_NUM_ROWS_PER_SPLIT)
100 {
101 scan_options.backfill_num_rows_per_split = u64::from_str(backfill_num_rows_per_split)
102 .map_err(|_| {
103 anyhow!("Invalid value for {}", CDC_BACKFILL_NUM_ROWS_PER_SPLIT)
104 })?;
105 }
106
107 if let Some(backfill_as_even_splits) = with_options.get(CDC_BACKFILL_AS_EVEN_SPLITS) {
108 scan_options.backfill_as_even_splits = bool::from_str(backfill_as_even_splits)
109 .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_AS_EVEN_SPLITS))?;
110 }
111
112 if let Some(backfill_split_pk_column_index) =
113 with_options.get(CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX)
114 {
115 scan_options.backfill_split_pk_column_index =
116 u32::from_str(backfill_split_pk_column_index).map_err(|_| {
117 anyhow!("Invalid value for {}", CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX)
118 })?;
119 }
120 }
121
122 Ok(scan_options)
123}
124
125impl CdcScan {
126 pub fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) {}
127
128 pub fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
129
130 pub fn output_column_ids(&self) -> Vec<ColumnId> {
132 self.output_col_idx
133 .iter()
134 .map(|i| self.get_table_columns()[*i].column_id)
135 .collect()
136 }
137
138 pub fn primary_key(&self) -> &[ColumnOrder] {
139 &self.cdc_table_desc.pk
140 }
141
142 pub fn watermark_columns(&self) -> WatermarkColumns {
143 WatermarkColumns::new()
144 }
145
146 pub fn columns_monotonicity(&self) -> MonotonicityMap {
147 MonotonicityMap::new()
148 }
149
150 pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
151 self.output_col_idx
152 .iter()
153 .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
154 .collect()
155 }
156
157 pub(crate) fn column_names(&self) -> Vec<String> {
158 self.output_col_idx
159 .iter()
160 .map(|&i| self.get_table_columns()[i].name.clone())
161 .collect()
162 }
163
164 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
166 ColIndexMapping::with_remaining_columns(
167 &self.output_col_idx,
168 self.get_table_columns().len(),
169 )
170 }
171
172 pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
174 let mut ids = self.output_column_ids();
175 for column_order in self.primary_key() {
176 let id = self.get_table_columns()[column_order.column_index].column_id;
177 if !ids.contains(&id) {
178 ids.push(id);
179 }
180 }
181 ids
182 }
183
184 pub(crate) fn new(
186 table_name: String,
187 output_col_idx: Vec<usize>, cdc_table_desc: Rc<CdcTableDesc>,
189 ctx: OptimizerContextRef,
190 options: CdcScanOptions,
191 ) -> Self {
192 Self {
193 table_name,
194 output_col_idx,
195 cdc_table_desc,
196 ctx,
197 options,
198 }
199 }
200
201 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
202 Pretty::Array(
203 match verbose {
204 true => self.column_names_with_table_prefix(),
205 false => self.column_names(),
206 }
207 .into_iter()
208 .map(Pretty::from)
209 .collect(),
210 )
211 }
212}
213
214impl GenericPlanNode for CdcScan {
216 fn schema(&self) -> Schema {
217 let fields = self
218 .output_col_idx
219 .iter()
220 .map(|tb_idx| {
221 let col = &self.get_table_columns()[*tb_idx];
222 Field::from_with_table_name_prefix(col, &self.table_name)
223 })
224 .collect();
225 Schema { fields }
226 }
227
228 fn stream_key(&self) -> Option<Vec<usize>> {
229 Some(self.cdc_table_desc.stream_key.clone())
230 }
231
232 fn ctx(&self) -> OptimizerContextRef {
233 self.ctx.clone()
234 }
235
236 fn functional_dependency(&self) -> FunctionalDependencySet {
237 let pk_indices = self.stream_key();
238 let col_num = self.output_col_idx.len();
239 match &pk_indices {
240 Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
241 None => FunctionalDependencySet::new(col_num),
242 }
243 }
244}
245
246impl CdcScan {
247 pub fn get_table_columns(&self) -> &[ColumnDesc] {
248 &self.cdc_table_desc.columns
249 }
250
251 pub fn column_descs(&self) -> Vec<ColumnDesc> {
253 self.output_col_idx
254 .iter()
255 .map(|&i| self.get_table_columns()[i].clone())
256 .collect()
257 }
258}