risingwave_frontend/optimizer/plan_node/
stream_cdc_table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::{ColumnCatalog, Field};
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_pb::stream_plan::PbStreamNode;
21use risingwave_pb::stream_plan::stream_node::PbNodeBody;
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record};
25use super::{ExprRewritable, PlanBase, StreamNode, StreamPlanRef as PlanRef, generic};
26use crate::catalog::ColumnId;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
30use crate::optimizer::property::{Distribution, DistributionDisplay};
31use crate::scheduler::SchedulerResult;
32use crate::stream_fragmenter::BuildFragmentGraphState;
33use crate::{Explain, TableCatalog};
34
35/// `StreamCdcTableScan` is a virtual plan node to represent a stream cdc table scan.
36/// It will be converted to cdc backfill + merge node (for upstream source)
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct StreamCdcTableScan {
39    pub base: PlanBase<Stream>,
40    core: generic::CdcScan,
41}
42
43impl StreamCdcTableScan {
44    pub fn new(core: generic::CdcScan) -> Self {
45        let distribution = Distribution::SomeShard;
46        let base = PlanBase::new_stream_with_core(
47            &core,
48            distribution,
49            StreamKind::Retract,
50            false,
51            core.watermark_columns(),
52            core.columns_monotonicity(),
53        );
54        Self { base, core }
55    }
56
57    pub fn table_name(&self) -> &str {
58        &self.core.table_name
59    }
60
61    pub fn core(&self) -> &generic::CdcScan {
62        &self.core
63    }
64
65    /// Build catalog for cdc backfill state
66    /// Right now we only persist whether the backfill is finished and the corresponding cdc offset
67    /// schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
68    pub fn build_backfill_state_catalog(
69        &self,
70        state: &mut BuildFragmentGraphState,
71        is_parallelized_backfill: bool,
72    ) -> TableCatalog {
73        if is_parallelized_backfill {
74            let mut catalog_builder = TableCatalogBuilder::default();
75            // Use `split_id` as primary key in state table.
76            catalog_builder.add_column(&Field::with_name(DataType::Int64, "split_id"));
77            catalog_builder.add_order_column(0, OrderType::ascending());
78            catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
79            // `row_count` column, the number of rows read from snapshot
80            catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
81            catalog_builder
82                .build(vec![], 1)
83                .with_id(state.gen_table_id_wrapped())
84        } else {
85            let mut catalog_builder = TableCatalogBuilder::default();
86            let upstream_schema = &self.core.get_table_columns();
87
88            // Use `split_id` as primary key in state table.
89            // Currently we only support single split for cdc backfill.
90            catalog_builder.add_column(&Field::with_name(DataType::Varchar, "split_id"));
91            catalog_builder.add_order_column(0, OrderType::ascending());
92
93            // pk columns
94            for col_order in self.core.primary_key() {
95                let col = &upstream_schema[col_order.column_index];
96                catalog_builder.add_column(&Field::from(col));
97            }
98
99            catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
100
101            // `row_count` column, the number of rows read from snapshot
102            catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
103
104            // The offset is only for observability, not for recovery right now
105            catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset"));
106
107            // leave dist key empty, since the cdc backfill executor is singleton
108            catalog_builder
109                .build(vec![], 1)
110                .with_id(state.gen_table_id_wrapped())
111        }
112    }
113}
114
115impl_plan_tree_node_for_leaf! { Stream, StreamCdcTableScan }
116
117impl Distill for StreamCdcTableScan {
118    fn distill<'a>(&self) -> XmlNode<'a> {
119        let verbose = self.base.ctx().is_explain_verbose();
120        let mut vec = Vec::with_capacity(4);
121        vec.push(("table", Pretty::from(self.core.table_name.clone())));
122        vec.push(("columns", self.core.columns_pretty(verbose)));
123
124        if verbose {
125            let pk = IndicesDisplay {
126                indices: self.stream_key().unwrap_or_default(),
127                schema: self.base.schema(),
128            };
129            vec.push(("pk", pk.distill()));
130            let dist = Pretty::display(&DistributionDisplay {
131                distribution: self.distribution(),
132                input_schema: self.base.schema(),
133            });
134            vec.push(("dist", dist));
135        }
136
137        childless_record("StreamCdcTableScan", vec)
138    }
139}
140
141impl StreamNode for StreamCdcTableScan {
142    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
143        unreachable!(
144            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
145        )
146    }
147}
148
149impl StreamCdcTableScan {
150    /// plan: merge -> filter -> exchange(simple) -> `stream_scan`
151    pub fn adhoc_to_stream_prost(
152        &self,
153        state: &mut BuildFragmentGraphState,
154    ) -> SchedulerResult<PbStreamNode> {
155        use risingwave_pb::stream_plan::*;
156
157        let stream_key = self
158            .stream_key()
159            .unwrap_or_else(|| {
160                panic!(
161                    "should always have a stream key in the stream plan but not, sub plan: {}",
162                    PlanRef::from(self.clone()).explain_to_string()
163                )
164            })
165            .iter()
166            .map(|x| *x as u32)
167            .collect_vec();
168
169        // The schema of the shared cdc source upstream is different from snapshot.
170        let cdc_source_schema = ColumnCatalog::debezium_cdc_source_cols()
171            .into_iter()
172            .map(|c| Field::from(c.column_desc).to_prost())
173            .collect_vec();
174
175        let catalog = self
176            .build_backfill_state_catalog(state, self.core.options.is_parallelized_backfill())
177            .to_internal_table_prost();
178
179        // We need to pass the id of upstream source job here
180        let upstream_source_id = self.core.cdc_table_desc.source_id.table_id;
181
182        // filter upstream source chunk by the value of `_rw_table_name` column
183        let filter_expr =
184            Self::build_cdc_filter_expr(self.core.cdc_table_desc.external_table_name.as_str());
185
186        let filter_operator_id = self.core.ctx.next_plan_node_id();
187        // The filter node receive chunks in `(payload, _rw_offset, _rw_table_name)` schema
188        let filter_stream_node = StreamNode {
189            operator_id: filter_operator_id.0 as _,
190            input: vec![
191                // The merge node body will be filled by the `ActorBuilder` on the meta service.
192                PbStreamNode {
193                    node_body: Some(PbNodeBody::Merge(Default::default())),
194                    identity: "Upstream".into(),
195                    fields: cdc_source_schema.clone(),
196                    stream_key: vec![], // not used
197                    ..Default::default()
198                },
199            ],
200            stream_key: vec![], // not used
201            append_only: true,
202            identity: "StreamCdcFilter".to_owned(),
203            fields: cdc_source_schema.clone(),
204            node_body: Some(PbNodeBody::CdcFilter(Box::new(CdcFilterNode {
205                search_condition: Some(filter_expr.to_expr_proto()),
206                upstream_source_id,
207            }))),
208        };
209
210        let exchange_operator_id = self.core.ctx.next_plan_node_id();
211        let strategy = if self.core.options.is_parallelized_backfill() {
212            DispatchStrategy {
213                r#type: DispatcherType::Broadcast as _,
214                dist_key_indices: vec![],
215                output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
216            }
217        } else {
218            DispatchStrategy {
219                r#type: DispatcherType::Simple as _,
220                dist_key_indices: vec![], // simple exchange doesn't need dist key
221                output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
222            }
223        };
224        // Add a simple exchange node between filter and stream scan
225        let exchange_stream_node = StreamNode {
226            operator_id: exchange_operator_id.0 as _,
227            input: vec![filter_stream_node],
228            stream_key: vec![], // not used
229            append_only: true,
230            identity: "Exchange".to_owned(),
231            fields: cdc_source_schema.clone(),
232            node_body: Some(PbNodeBody::Exchange(Box::new(ExchangeNode {
233                strategy: Some(strategy),
234            }))),
235        };
236
237        // The required columns from the external table
238        let upstream_column_ids = self
239            .core
240            .output_and_pk_column_ids()
241            .iter()
242            .map(ColumnId::get_id)
243            .collect_vec();
244
245        let output_indices = self
246            .core
247            .output_column_ids()
248            .iter()
249            .map(|i| {
250                upstream_column_ids
251                    .iter()
252                    .position(|&x| x == i.get_id())
253                    .unwrap() as u32
254            })
255            .collect_vec();
256
257        tracing::debug!(
258            output_column_ids=?self.core.output_column_ids(),
259            ?upstream_column_ids,
260            ?output_indices,
261            "stream cdc table scan output indices"
262        );
263
264        let options = self.core.options.to_proto();
265        let stream_scan_body = PbNodeBody::StreamCdcScan(Box::new(StreamCdcScanNode {
266            table_id: upstream_source_id,
267            upstream_column_ids,
268            output_indices,
269            // The table desc used by backfill executor
270            state_table: Some(catalog),
271            cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
272            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
273            disable_backfill: options.disable_backfill,
274            options: Some(options),
275        }));
276
277        // plan: merge -> filter -> exchange(simple) -> stream_scan
278        Ok(PbStreamNode {
279            fields: self.schema().to_prost(),
280            input: vec![exchange_stream_node],
281            node_body: Some(stream_scan_body),
282            stream_key,
283            operator_id: self.base.id().0 as u64,
284            identity: self.distill_to_string(),
285            append_only: self.append_only(),
286        })
287    }
288
289    // The filter node receive input chunks in `(payload, _rw_offset, _rw_table_name)` schema
290    pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl {
291        // filter by the `_rw_table_name` column
292        FunctionCall::new(
293            ExprType::Equal,
294            vec![
295                InputRef::new(2, DataType::Varchar).into(),
296                ExprImpl::literal_varchar(cdc_table_name.into()),
297            ],
298        )
299        .unwrap()
300        .into()
301    }
302}
303
304impl ExprRewritable<Stream> for StreamCdcTableScan {
305    fn has_rewritable_expr(&self) -> bool {
306        true
307    }
308
309    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
310        let core = self.core.clone();
311        core.rewrite_exprs(r);
312        Self::new(core).into()
313    }
314}
315
316impl ExprVisitable for StreamCdcTableScan {
317    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
318        self.core.visit_exprs(v);
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use std::str::FromStr;
325
326    use risingwave_common::row::OwnedRow;
327    use risingwave_common::types::{JsonbVal, ScalarImpl};
328
329    use super::*;
330
331    #[tokio::test]
332    async fn test_cdc_filter_expr() {
333        let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
334        let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
335
336        // NOTES: transaction metadata column expects to be filtered out before going to cdc filter
337        let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap();
338        let row1 = OwnedRow::new(vec![
339            Some(t1_json.into()),
340            Some(r#"{"file": "1.binlog", "pos": 100}"#.into()),
341            Some("public.t2".into()),
342        ]);
343        let row2 = OwnedRow::new(vec![
344            Some(t2_json.into()),
345            Some(r#"{"file": "2.binlog", "pos": 100}"#.into()),
346            Some("abs.t2".into()),
347        ]);
348
349        let row3 = OwnedRow::new(vec![
350            Some(trx_json.into()),
351            Some(r#"{"file": "3.binlog", "pos": 100}"#.into()),
352            Some("public.t2".into()),
353        ]);
354
355        let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("public.t2");
356        assert_eq!(
357            filter_expr.eval_row(&row1).await.unwrap(),
358            Some(ScalarImpl::Bool(true))
359        );
360        assert_eq!(
361            filter_expr.eval_row(&row2).await.unwrap(),
362            Some(ScalarImpl::Bool(false))
363        );
364        assert_eq!(
365            filter_expr.eval_row(&row3).await.unwrap(),
366            Some(ScalarImpl::Bool(true))
367        )
368    }
369}