1use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::{ColumnCatalog, Field};
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_pb::stream_plan::PbStreamNode;
21use risingwave_pb::stream_plan::stream_node::{PbNodeBody, PbStreamKind};
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record};
25use super::{ExprRewritable, PlanBase, StreamNode, StreamPlanRef as PlanRef, generic};
26use crate::catalog::ColumnId;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
30use crate::optimizer::property::{Distribution, DistributionDisplay};
31use crate::scheduler::SchedulerResult;
32use crate::stream_fragmenter::BuildFragmentGraphState;
33use crate::{Explain, TableCatalog};
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct StreamCdcTableScan {
39 pub base: PlanBase<Stream>,
40 core: generic::CdcScan,
41}
42
43impl StreamCdcTableScan {
44 pub fn new(core: generic::CdcScan) -> Self {
45 let distribution = Distribution::SomeShard;
46 let base = PlanBase::new_stream_with_core(
47 &core,
48 distribution,
49 StreamKind::Retract,
50 false,
51 core.watermark_columns(),
52 core.columns_monotonicity(),
53 );
54 Self { base, core }
55 }
56
57 pub fn table_name(&self) -> &str {
58 &self.core.table_name
59 }
60
61 pub fn core(&self) -> &generic::CdcScan {
62 &self.core
63 }
64
65 pub fn build_backfill_state_catalog(
74 &self,
75 state: &mut BuildFragmentGraphState,
76 is_parallelized_backfill: bool,
77 ) -> TableCatalog {
78 if is_parallelized_backfill {
79 let mut catalog_builder = TableCatalogBuilder::default();
80 catalog_builder.add_column(&Field::with_name(DataType::Int64, "split_id"));
82 catalog_builder.add_order_column(0, OrderType::ascending());
83 catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
84 catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
86 catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset_low"));
87 catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset_high"));
88 catalog_builder
89 .build(vec![], 1)
90 .with_id(state.gen_table_id_wrapped())
91 } else {
92 let mut catalog_builder = TableCatalogBuilder::default();
93 let upstream_schema = &self.core.get_table_columns();
94
95 catalog_builder.add_column(&Field::with_name(DataType::Varchar, "split_id"));
98 catalog_builder.add_order_column(0, OrderType::ascending());
99
100 for col_order in self.core.primary_key() {
102 let col = &upstream_schema[col_order.column_index];
103 catalog_builder.add_column(&Field::from(col));
104 }
105
106 catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
107
108 catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
110
111 catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset"));
113
114 catalog_builder
116 .build(vec![], 1)
117 .with_id(state.gen_table_id_wrapped())
118 }
119 }
120}
121
122impl_plan_tree_node_for_leaf! { Stream, StreamCdcTableScan }
123
124impl Distill for StreamCdcTableScan {
125 fn distill<'a>(&self) -> XmlNode<'a> {
126 let verbose = self.base.ctx().is_explain_verbose();
127 let mut vec = Vec::with_capacity(4);
128 vec.push(("table", Pretty::from(self.core.table_name.clone())));
129 vec.push(("columns", self.core.columns_pretty(verbose)));
130
131 if verbose {
132 let pk = IndicesDisplay {
133 indices: self.stream_key().unwrap_or_default(),
134 schema: self.base.schema(),
135 };
136 vec.push(("pk", pk.distill()));
137 let dist = Pretty::display(&DistributionDisplay {
138 distribution: self.distribution(),
139 input_schema: self.base.schema(),
140 });
141 vec.push(("dist", dist));
142 }
143
144 childless_record("StreamCdcTableScan", vec)
145 }
146}
147
148impl StreamNode for StreamCdcTableScan {
149 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
150 unreachable!(
151 "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
152 )
153 }
154}
155
156impl StreamCdcTableScan {
157 pub fn adhoc_to_stream_prost(
159 &self,
160 state: &mut BuildFragmentGraphState,
161 ) -> SchedulerResult<PbStreamNode> {
162 use risingwave_pb::stream_plan::*;
163
164 let stream_key = self
165 .stream_key()
166 .unwrap_or_else(|| {
167 panic!(
168 "should always have a stream key in the stream plan but not, sub plan: {}",
169 PlanRef::from(self.clone()).explain_to_string()
170 )
171 })
172 .iter()
173 .map(|x| *x as u32)
174 .collect_vec();
175
176 let cdc_source_schema = ColumnCatalog::debezium_cdc_source_cols()
178 .into_iter()
179 .map(|c| Field::from(c.column_desc).to_prost())
180 .collect_vec();
181
182 let catalog = self
183 .build_backfill_state_catalog(state, self.core.options.is_parallelized_backfill())
184 .to_internal_table_prost();
185
186 let upstream_source_id = self.core.cdc_table_desc.source_id.table_id;
188
189 let filter_expr =
191 Self::build_cdc_filter_expr(self.core.cdc_table_desc.external_table_name.as_str());
192
193 let filter_operator_id = self.core.ctx.next_plan_node_id();
194 let filter_stream_node = StreamNode {
196 operator_id: filter_operator_id.0 as _,
197 input: vec![
198 PbStreamNode {
200 node_body: Some(PbNodeBody::Merge(Default::default())),
201 identity: "Upstream".into(),
202 fields: cdc_source_schema.clone(),
203 stream_key: vec![], ..Default::default()
205 },
206 ],
207 stream_key: vec![], stream_kind: PbStreamKind::AppendOnly as _,
209 identity: "StreamCdcFilter".to_owned(),
210 fields: cdc_source_schema.clone(),
211 node_body: Some(PbNodeBody::CdcFilter(Box::new(CdcFilterNode {
212 search_condition: Some(filter_expr.to_expr_proto()),
213 upstream_source_id,
214 }))),
215 };
216
217 let exchange_operator_id = self.core.ctx.next_plan_node_id();
218 let strategy = if self.core.options.is_parallelized_backfill() {
219 DispatchStrategy {
220 r#type: DispatcherType::Broadcast as _,
221 dist_key_indices: vec![],
222 output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
223 }
224 } else {
225 DispatchStrategy {
226 r#type: DispatcherType::Simple as _,
227 dist_key_indices: vec![], output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
229 }
230 };
231 let exchange_stream_node = StreamNode {
233 operator_id: exchange_operator_id.0 as _,
234 input: vec![filter_stream_node],
235 stream_key: vec![], stream_kind: PbStreamKind::AppendOnly as _,
237 identity: "Exchange".to_owned(),
238 fields: cdc_source_schema.clone(),
239 node_body: Some(PbNodeBody::Exchange(Box::new(ExchangeNode {
240 strategy: Some(strategy),
241 }))),
242 };
243
244 let upstream_column_ids = self
246 .core
247 .output_and_pk_column_ids()
248 .iter()
249 .map(ColumnId::get_id)
250 .collect_vec();
251
252 let output_indices = self
253 .core
254 .output_column_ids()
255 .iter()
256 .map(|i| {
257 upstream_column_ids
258 .iter()
259 .position(|&x| x == i.get_id())
260 .unwrap() as u32
261 })
262 .collect_vec();
263
264 tracing::debug!(
265 output_column_ids=?self.core.output_column_ids(),
266 ?upstream_column_ids,
267 ?output_indices,
268 "stream cdc table scan output indices"
269 );
270
271 let options = self.core.options.to_proto();
272 let stream_scan_body = PbNodeBody::StreamCdcScan(Box::new(StreamCdcScanNode {
273 table_id: upstream_source_id,
274 upstream_column_ids,
275 output_indices,
276 state_table: Some(catalog),
278 cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
279 rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
280 disable_backfill: options.disable_backfill,
281 options: Some(options),
282 }));
283
284 Ok(PbStreamNode {
286 fields: self.schema().to_prost(),
287 input: vec![exchange_stream_node],
288 node_body: Some(stream_scan_body),
289 stream_key,
290 operator_id: self.base.id().0 as u64,
291 identity: self.distill_to_string(),
292 stream_kind: self.stream_kind().to_protobuf() as i32,
293 })
294 }
295
296 pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl {
298 FunctionCall::new(
300 ExprType::Equal,
301 vec![
302 InputRef::new(2, DataType::Varchar).into(),
303 ExprImpl::literal_varchar(cdc_table_name.into()),
304 ],
305 )
306 .unwrap()
307 .into()
308 }
309}
310
311impl ExprRewritable<Stream> for StreamCdcTableScan {
312 fn has_rewritable_expr(&self) -> bool {
313 true
314 }
315
316 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
317 let core = self.core.clone();
318 core.rewrite_exprs(r);
319 Self::new(core).into()
320 }
321}
322
323impl ExprVisitable for StreamCdcTableScan {
324 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
325 self.core.visit_exprs(v);
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use std::str::FromStr;
332
333 use risingwave_common::row::OwnedRow;
334 use risingwave_common::types::{JsonbVal, ScalarImpl};
335
336 use super::*;
337
338 #[tokio::test]
339 async fn test_cdc_filter_expr() {
340 let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
341 let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
342
343 let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap();
345 let row1 = OwnedRow::new(vec![
346 Some(t1_json.into()),
347 Some(r#"{"file": "1.binlog", "pos": 100}"#.into()),
348 Some("public.t2".into()),
349 ]);
350 let row2 = OwnedRow::new(vec![
351 Some(t2_json.into()),
352 Some(r#"{"file": "2.binlog", "pos": 100}"#.into()),
353 Some("abs.t2".into()),
354 ]);
355
356 let row3 = OwnedRow::new(vec![
357 Some(trx_json.into()),
358 Some(r#"{"file": "3.binlog", "pos": 100}"#.into()),
359 Some("public.t2".into()),
360 ]);
361
362 let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("public.t2");
363 assert_eq!(
364 filter_expr.eval_row(&row1).await.unwrap(),
365 Some(ScalarImpl::Bool(true))
366 );
367 assert_eq!(
368 filter_expr.eval_row(&row2).await.unwrap(),
369 Some(ScalarImpl::Bool(false))
370 );
371 assert_eq!(
372 filter_expr.eval_row(&row3).await.unwrap(),
373 Some(ScalarImpl::Bool(true))
374 )
375 }
376}