1use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::{ColumnCatalog, Field};
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::normalize_simple_postgres_quoted_table_name;
21use risingwave_pb::stream_plan::PbStreamNode;
22use risingwave_pb::stream_plan::stream_node::{PbNodeBody, PbStreamKind};
23
24use super::stream::prelude::*;
25use super::utils::{Distill, childless_record};
26use super::{ExprRewritable, PlanBase, StreamNode, StreamPlanRef as PlanRef, generic};
27use crate::catalog::ColumnId;
28use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef};
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
31use crate::optimizer::property::{Distribution, DistributionDisplay};
32use crate::scheduler::SchedulerResult;
33use crate::stream_fragmenter::BuildFragmentGraphState;
34use crate::{Explain, TableCatalog};
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct StreamCdcTableScan {
40 pub base: PlanBase<Stream>,
41 core: generic::CdcScan,
42}
43
44impl StreamCdcTableScan {
45 pub fn new(core: generic::CdcScan) -> Self {
46 let distribution = Distribution::SomeShard;
47 let base = PlanBase::new_stream_with_core(
48 &core,
49 distribution,
50 StreamKind::Retract,
51 false,
52 core.watermark_columns(),
53 core.columns_monotonicity(),
54 );
55 Self { base, core }
56 }
57
58 pub fn table_name(&self) -> &str {
59 &self.core.table_name
60 }
61
62 pub fn core(&self) -> &generic::CdcScan {
63 &self.core
64 }
65
66 pub fn build_backfill_state_catalog(
75 &self,
76 state: &mut BuildFragmentGraphState,
77 is_parallelized_backfill: bool,
78 ) -> TableCatalog {
79 if is_parallelized_backfill {
80 let mut catalog_builder = TableCatalogBuilder::default();
81 catalog_builder.add_column(&Field::with_name(DataType::Int64, "split_id"));
83 catalog_builder.add_order_column(0, OrderType::ascending());
84 catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
85 catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
87 catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset_low"));
88 catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset_high"));
89 catalog_builder
90 .build(vec![], 1)
91 .with_id(state.gen_table_id_wrapped())
92 } else {
93 let mut catalog_builder = TableCatalogBuilder::default();
94 let upstream_schema = &self.core.get_table_columns();
95
96 catalog_builder.add_column(&Field::with_name(DataType::Varchar, "split_id"));
99 catalog_builder.add_order_column(0, OrderType::ascending());
100
101 for col_order in self.core.primary_key() {
103 let col = &upstream_schema[col_order.column_index];
104 catalog_builder.add_column(&Field::from(col));
105 }
106
107 catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
108
109 catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
111
112 catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset"));
114
115 catalog_builder
117 .build(vec![], 1)
118 .with_id(state.gen_table_id_wrapped())
119 }
120 }
121}
122
123impl_plan_tree_node_for_leaf! { Stream, StreamCdcTableScan }
124
125impl Distill for StreamCdcTableScan {
126 fn distill<'a>(&self) -> XmlNode<'a> {
127 let verbose = self.base.ctx().is_explain_verbose();
128 let mut vec = Vec::with_capacity(4);
129 vec.push(("table", Pretty::from(self.core.table_name.clone())));
130 vec.push(("columns", self.core.columns_pretty(verbose)));
131
132 if verbose {
133 let pk = IndicesDisplay {
134 indices: self.stream_key().unwrap_or_default(),
135 schema: self.base.schema(),
136 };
137 vec.push(("pk", pk.distill()));
138 let dist = Pretty::display(&DistributionDisplay {
139 distribution: self.distribution(),
140 input_schema: self.base.schema(),
141 });
142 vec.push(("dist", dist));
143 }
144
145 childless_record("StreamCdcTableScan", vec)
146 }
147}
148
149impl StreamNode for StreamCdcTableScan {
150 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
151 unreachable!(
152 "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
153 )
154 }
155}
156
157impl StreamCdcTableScan {
158 pub fn adhoc_to_stream_prost(
160 &self,
161 state: &mut BuildFragmentGraphState,
162 ) -> SchedulerResult<PbStreamNode> {
163 use risingwave_pb::stream_plan::*;
164
165 let stream_key = self
166 .stream_key()
167 .unwrap_or_else(|| {
168 panic!(
169 "should always have a stream key in the stream plan but not, sub plan: {}",
170 PlanRef::from(self.clone()).explain_to_string()
171 )
172 })
173 .iter()
174 .map(|x| *x as u32)
175 .collect_vec();
176
177 let cdc_source_schema = ColumnCatalog::debezium_cdc_source_cols()
179 .into_iter()
180 .map(|c| Field::from(c.column_desc).to_prost())
181 .collect_vec();
182
183 let catalog = self
184 .build_backfill_state_catalog(state, self.core.options.is_parallelized_backfill())
185 .to_internal_table_prost();
186
187 let upstream_source_id = self.core.cdc_table_desc.source_id;
189
190 let filter_expr =
192 Self::build_cdc_filter_expr(self.core.cdc_table_desc.external_table_name.as_str());
193
194 let filter_operator_id = self.core.ctx.next_plan_node_id();
195 let filter_stream_node = StreamNode {
197 operator_id: filter_operator_id.to_stream_node_operator_id(),
198 input: vec![
199 PbStreamNode {
201 node_body: Some(PbNodeBody::Merge(Default::default())),
202 identity: "Upstream".into(),
203 fields: cdc_source_schema.clone(),
204 stream_key: vec![], ..Default::default()
206 },
207 ],
208 stream_key: vec![], stream_kind: PbStreamKind::AppendOnly as _,
210 identity: "StreamCdcFilter".to_owned(),
211 fields: cdc_source_schema.clone(),
212 node_body: Some(PbNodeBody::CdcFilter(Box::new(CdcFilterNode {
213 search_condition: Some(filter_expr.to_expr_proto()),
214 upstream_source_id,
215 }))),
216 };
217
218 let exchange_operator_id = self.core.ctx.next_plan_node_id();
219 let strategy = if self.core.options.is_parallelized_backfill() {
220 DispatchStrategy {
221 r#type: DispatcherType::Broadcast as _,
222 dist_key_indices: vec![],
223 output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
224 }
225 } else {
226 DispatchStrategy {
227 r#type: DispatcherType::Simple as _,
228 dist_key_indices: vec![], output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
230 }
231 };
232 let exchange_stream_node = StreamNode {
234 operator_id: exchange_operator_id.to_stream_node_operator_id(),
235 input: vec![filter_stream_node],
236 stream_key: vec![], stream_kind: PbStreamKind::AppendOnly as _,
238 identity: "Exchange".to_owned(),
239 fields: cdc_source_schema,
240 node_body: Some(PbNodeBody::Exchange(Box::new(ExchangeNode {
241 strategy: Some(strategy),
242 }))),
243 };
244
245 let upstream_column_ids = self
247 .core
248 .output_and_pk_column_ids()
249 .iter()
250 .map(ColumnId::get_id)
251 .collect_vec();
252
253 let output_indices = self
254 .core
255 .output_column_ids()
256 .iter()
257 .map(|i| {
258 upstream_column_ids
259 .iter()
260 .position(|&x| x == i.get_id())
261 .unwrap() as u32
262 })
263 .collect_vec();
264
265 tracing::debug!(
266 output_column_ids=?self.core.output_column_ids(),
267 ?upstream_column_ids,
268 ?output_indices,
269 "stream cdc table scan output indices"
270 );
271
272 let options = self.core.options.to_proto();
273 let stream_scan_body = PbNodeBody::StreamCdcScan(Box::new(StreamCdcScanNode {
274 table_id: upstream_source_id.as_cdc_table_id(),
275 upstream_column_ids,
276 output_indices,
277 state_table: Some(catalog),
279 cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
280 rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
281 disable_backfill: options.disable_backfill,
282 options: Some(options),
283 }));
284
285 Ok(PbStreamNode {
287 fields: self.schema().to_prost(),
288 input: vec![exchange_stream_node],
289 node_body: Some(stream_scan_body),
290 stream_key,
291 operator_id: self.base.id().to_stream_node_operator_id(),
292 identity: self.distill_to_string(),
293 stream_kind: self.stream_kind().to_protobuf() as i32,
294 })
295 }
296
297 pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl {
299 let table_name_ref: ExprImpl = InputRef::new(2, DataType::Varchar).into();
300 let mut filter_expr = build_cdc_table_name_eq_expr(table_name_ref.clone(), cdc_table_name);
301
302 if let Some(compat_table_name) = normalize_simple_postgres_quoted_table_name(cdc_table_name)
303 {
304 filter_expr = FunctionCall::new(
311 ExprType::Or,
312 vec![
313 filter_expr,
314 build_cdc_table_name_eq_expr(table_name_ref, &compat_table_name),
315 ],
316 )
317 .unwrap()
318 .into();
319 }
320
321 filter_expr
322 }
323}
324
325fn build_cdc_table_name_eq_expr(table_name_ref: ExprImpl, cdc_table_name: &str) -> ExprImpl {
326 FunctionCall::new(
327 ExprType::Equal,
328 vec![
329 table_name_ref,
330 ExprImpl::literal_varchar(cdc_table_name.into()),
331 ],
332 )
333 .unwrap()
334 .into()
335}
336
337impl ExprRewritable<Stream> for StreamCdcTableScan {
338 fn has_rewritable_expr(&self) -> bool {
339 true
340 }
341
342 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
343 let core = self.core.clone();
344 core.rewrite_exprs(r);
345 Self::new(core).into()
346 }
347}
348
349impl ExprVisitable for StreamCdcTableScan {
350 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
351 self.core.visit_exprs(v);
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use std::str::FromStr;
358
359 use risingwave_common::row::OwnedRow;
360 use risingwave_common::types::{JsonbVal, ScalarImpl};
361
362 use super::*;
363
364 #[tokio::test]
365 async fn test_cdc_filter_expr() {
366 let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
367 let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
368
369 let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap();
371 let row1 = OwnedRow::new(vec![
372 Some(t1_json.into()),
373 Some(r#"{"file": "1.binlog", "pos": 100}"#.into()),
374 Some("public.t2".into()),
375 ]);
376 let row2 = OwnedRow::new(vec![
377 Some(t2_json.into()),
378 Some(r#"{"file": "2.binlog", "pos": 100}"#.into()),
379 Some("abs.t2".into()),
380 ]);
381
382 let row3 = OwnedRow::new(vec![
383 Some(trx_json.into()),
384 Some(r#"{"file": "3.binlog", "pos": 100}"#.into()),
385 Some("public.t2".into()),
386 ]);
387
388 let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("public.t2");
389 assert_eq!(
390 filter_expr.eval_row(&row1).await.unwrap(),
391 Some(ScalarImpl::Bool(true))
392 );
393 assert_eq!(
394 filter_expr.eval_row(&row2).await.unwrap(),
395 Some(ScalarImpl::Bool(false))
396 );
397 assert_eq!(
398 filter_expr.eval_row(&row3).await.unwrap(),
399 Some(ScalarImpl::Bool(true))
400 )
401 }
402
403 #[tokio::test]
404 async fn test_cdc_filter_expr_postgres_quoted_table_name_compat() {
405 let row_with_unquoted_schema_table =
406 OwnedRow::new(vec![None, None, Some("public.QuotedNoteE2e".into())]);
407 let row_with_other_table = OwnedRow::new(vec![None, None, Some("public.other".into())]);
408
409 let filter_expr = StreamCdcTableScan::build_cdc_filter_expr(r#"public."QuotedNoteE2e""#);
410 assert_eq!(
411 filter_expr
412 .eval_row(&row_with_unquoted_schema_table)
413 .await
414 .unwrap(),
415 Some(ScalarImpl::Bool(true))
416 );
417 assert_eq!(
418 filter_expr.eval_row(&row_with_other_table).await.unwrap(),
419 Some(ScalarImpl::Bool(false))
420 );
421 }
422}