risingwave_frontend/optimizer/plan_node/generic/
cdc_scan.rs1use std::rc::Rc;
16use std::str::FromStr;
17
18use anyhow::anyhow;
19use educe::Educe;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_connector::source::cdc::{
25 CDC_BACKFILL_ENABLE_KEY, CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY,
26 CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY,
27};
28use risingwave_pb::stream_plan::StreamCdcScanOptions;
29
30use super::GenericPlanNode;
31use crate::WithOptions;
32use crate::catalog::ColumnId;
33use crate::error::Result;
34use crate::expr::{ExprRewriter, ExprVisitor};
35use crate::optimizer::optimizer_context::OptimizerContextRef;
36use crate::optimizer::property::{FunctionalDependencySet, MonotonicityMap, WatermarkColumns};
37
38#[derive(Debug, Clone, Educe)]
40#[educe(PartialEq, Eq, Hash)]
41pub struct CdcScan {
42 pub table_name: String,
43 pub output_col_idx: Vec<usize>,
45 pub cdc_table_desc: Rc<CdcTableDesc>,
47 #[educe(PartialEq(ignore))]
48 #[educe(Hash(ignore))]
49 pub ctx: OptimizerContextRef,
50
51 pub options: CdcScanOptions,
52}
53
54#[derive(Debug, Clone, Hash, PartialEq)]
55pub struct CdcScanOptions {
56 pub disable_backfill: bool,
57 pub snapshot_barrier_interval: u32,
58 pub snapshot_batch_size: u32,
59}
60
61impl Default for CdcScanOptions {
62 fn default() -> Self {
63 Self {
64 disable_backfill: false,
65 snapshot_barrier_interval: 1,
66 snapshot_batch_size: 1000,
67 }
68 }
69}
70
71impl CdcScanOptions {
72 pub fn from_with_options(with_options: &WithOptions) -> Result<Self> {
73 let mut scan_options = Self::default();
75
76 if let Some(snapshot) = with_options.get(CDC_BACKFILL_ENABLE_KEY) {
78 scan_options.disable_backfill = !(bool::from_str(snapshot)
79 .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?);
80 };
81
82 if let Some(snapshot_interval) = with_options.get(CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY) {
83 scan_options.snapshot_barrier_interval = u32::from_str(snapshot_interval)
84 .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY))?;
85 };
86
87 if let Some(snapshot_batch_size) = with_options.get(CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY) {
88 scan_options.snapshot_batch_size =
89 u32::from_str(snapshot_batch_size).map_err(|_| {
90 anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY)
91 })?;
92 };
93
94 Ok(scan_options)
95 }
96
97 pub fn to_proto(&self) -> StreamCdcScanOptions {
98 StreamCdcScanOptions {
99 disable_backfill: self.disable_backfill,
100 snapshot_barrier_interval: self.snapshot_barrier_interval,
101 snapshot_batch_size: self.snapshot_batch_size,
102 }
103 }
104}
105
106impl CdcScan {
107 pub fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) {}
108
109 pub fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
110
111 pub fn output_column_ids(&self) -> Vec<ColumnId> {
113 self.output_col_idx
114 .iter()
115 .map(|i| self.get_table_columns()[*i].column_id)
116 .collect()
117 }
118
119 pub fn primary_key(&self) -> &[ColumnOrder] {
120 &self.cdc_table_desc.pk
121 }
122
123 pub fn watermark_columns(&self) -> WatermarkColumns {
124 WatermarkColumns::new()
125 }
126
127 pub fn columns_monotonicity(&self) -> MonotonicityMap {
128 MonotonicityMap::new()
129 }
130
131 pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
132 self.output_col_idx
133 .iter()
134 .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
135 .collect()
136 }
137
138 pub(crate) fn column_names(&self) -> Vec<String> {
139 self.output_col_idx
140 .iter()
141 .map(|&i| self.get_table_columns()[i].name.clone())
142 .collect()
143 }
144
145 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
147 ColIndexMapping::with_remaining_columns(
148 &self.output_col_idx,
149 self.get_table_columns().len(),
150 )
151 }
152
153 pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
155 let mut ids = self.output_column_ids();
156 for column_order in self.primary_key() {
157 let id = self.get_table_columns()[column_order.column_index].column_id;
158 if !ids.contains(&id) {
159 ids.push(id);
160 }
161 }
162 ids
163 }
164
165 pub(crate) fn new(
167 table_name: String,
168 output_col_idx: Vec<usize>, cdc_table_desc: Rc<CdcTableDesc>,
170 ctx: OptimizerContextRef,
171 options: CdcScanOptions,
172 ) -> Self {
173 Self {
174 table_name,
175 output_col_idx,
176 cdc_table_desc,
177 ctx,
178 options,
179 }
180 }
181
182 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
183 Pretty::Array(
184 match verbose {
185 true => self.column_names_with_table_prefix(),
186 false => self.column_names(),
187 }
188 .into_iter()
189 .map(Pretty::from)
190 .collect(),
191 )
192 }
193}
194
195impl GenericPlanNode for CdcScan {
197 fn schema(&self) -> Schema {
198 let fields = self
199 .output_col_idx
200 .iter()
201 .map(|tb_idx| {
202 let col = &self.get_table_columns()[*tb_idx];
203 Field::from_with_table_name_prefix(col, &self.table_name)
204 })
205 .collect();
206 Schema { fields }
207 }
208
209 fn stream_key(&self) -> Option<Vec<usize>> {
210 Some(self.cdc_table_desc.stream_key.clone())
211 }
212
213 fn ctx(&self) -> OptimizerContextRef {
214 self.ctx.clone()
215 }
216
217 fn functional_dependency(&self) -> FunctionalDependencySet {
218 let pk_indices = self.stream_key();
219 let col_num = self.output_col_idx.len();
220 match &pk_indices {
221 Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
222 None => FunctionalDependencySet::new(col_num),
223 }
224 }
225}
226
227impl CdcScan {
228 pub fn get_table_columns(&self) -> &[ColumnDesc] {
229 &self.cdc_table_desc.columns
230 }
231
232 pub fn append_only(&self) -> bool {
233 false
234 }
235
236 pub fn column_descs(&self) -> Vec<ColumnDesc> {
238 self.output_col_idx
239 .iter()
240 .map(|&i| self.get_table_columns()[i].clone())
241 .collect()
242 }
243}