risingwave_frontend/optimizer/plan_node/
stream_cdc_table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::{ColumnCatalog, Field};
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_pb::stream_plan::PbStreamNode;
21use risingwave_pb::stream_plan::stream_node::{PbNodeBody, PbStreamKind};
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record};
25use super::{ExprRewritable, PlanBase, StreamNode, StreamPlanRef as PlanRef, generic};
26use crate::catalog::ColumnId;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
30use crate::optimizer::property::{Distribution, DistributionDisplay};
31use crate::scheduler::SchedulerResult;
32use crate::stream_fragmenter::BuildFragmentGraphState;
33use crate::{Explain, TableCatalog};
34
35/// `StreamCdcTableScan` is a virtual plan node to represent a stream cdc table scan.
36/// It will be converted to cdc backfill + merge node (for upstream source)
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct StreamCdcTableScan {
39    pub base: PlanBase<Stream>,
40    core: generic::CdcScan,
41}
42
43impl StreamCdcTableScan {
44    pub fn new(core: generic::CdcScan) -> Self {
45        let distribution = Distribution::SomeShard;
46        let base = PlanBase::new_stream_with_core(
47            &core,
48            distribution,
49            StreamKind::Retract,
50            false,
51            core.watermark_columns(),
52            core.columns_monotonicity(),
53        );
54        Self { base, core }
55    }
56
57    pub fn table_name(&self) -> &str {
58        &self.core.table_name
59    }
60
61    pub fn core(&self) -> &generic::CdcScan {
62        &self.core
63    }
64
65    /// Build catalog for cdc backfill state.
66    ///
67    /// For non-parallelized cdc backfill:
68    /// Right now we only persist whether the backfill is finished and the corresponding cdc offset
69    /// schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
70    ///
71    /// For parallelized cdc backfill:
72    /// schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset_low` | `cdc_offset_high` |
73    pub fn build_backfill_state_catalog(
74        &self,
75        state: &mut BuildFragmentGraphState,
76        is_parallelized_backfill: bool,
77    ) -> TableCatalog {
78        if is_parallelized_backfill {
79            let mut catalog_builder = TableCatalogBuilder::default();
80            // Use `split_id` as primary key in state table.
81            catalog_builder.add_column(&Field::with_name(DataType::Int64, "split_id"));
82            catalog_builder.add_order_column(0, OrderType::ascending());
83            catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
84            // `row_count` column, the number of rows read from snapshot
85            catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
86            catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset_low"));
87            catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset_high"));
88            catalog_builder
89                .build(vec![], 1)
90                .with_id(state.gen_table_id_wrapped())
91        } else {
92            let mut catalog_builder = TableCatalogBuilder::default();
93            let upstream_schema = &self.core.get_table_columns();
94
95            // Use `split_id` as primary key in state table.
96            // Currently we only support single split for cdc backfill.
97            catalog_builder.add_column(&Field::with_name(DataType::Varchar, "split_id"));
98            catalog_builder.add_order_column(0, OrderType::ascending());
99
100            // pk columns
101            for col_order in self.core.primary_key() {
102                let col = &upstream_schema[col_order.column_index];
103                catalog_builder.add_column(&Field::from(col));
104            }
105
106            catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
107
108            // `row_count` column, the number of rows read from snapshot
109            catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
110
111            // The offset is only for observability, not for recovery right now
112            catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset"));
113
114            // leave dist key empty, since the cdc backfill executor is singleton
115            catalog_builder
116                .build(vec![], 1)
117                .with_id(state.gen_table_id_wrapped())
118        }
119    }
120}
121
122impl_plan_tree_node_for_leaf! { Stream, StreamCdcTableScan }
123
124impl Distill for StreamCdcTableScan {
125    fn distill<'a>(&self) -> XmlNode<'a> {
126        let verbose = self.base.ctx().is_explain_verbose();
127        let mut vec = Vec::with_capacity(4);
128        vec.push(("table", Pretty::from(self.core.table_name.clone())));
129        vec.push(("columns", self.core.columns_pretty(verbose)));
130
131        if verbose {
132            let pk = IndicesDisplay {
133                indices: self.stream_key().unwrap_or_default(),
134                schema: self.base.schema(),
135            };
136            vec.push(("pk", pk.distill()));
137            let dist = Pretty::display(&DistributionDisplay {
138                distribution: self.distribution(),
139                input_schema: self.base.schema(),
140            });
141            vec.push(("dist", dist));
142        }
143
144        childless_record("StreamCdcTableScan", vec)
145    }
146}
147
148impl StreamNode for StreamCdcTableScan {
149    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
150        unreachable!(
151            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
152        )
153    }
154}
155
156impl StreamCdcTableScan {
157    /// plan: merge -> filter -> exchange(simple) -> `stream_scan`
158    pub fn adhoc_to_stream_prost(
159        &self,
160        state: &mut BuildFragmentGraphState,
161    ) -> SchedulerResult<PbStreamNode> {
162        use risingwave_pb::stream_plan::*;
163
164        let stream_key = self
165            .stream_key()
166            .unwrap_or_else(|| {
167                panic!(
168                    "should always have a stream key in the stream plan but not, sub plan: {}",
169                    PlanRef::from(self.clone()).explain_to_string()
170                )
171            })
172            .iter()
173            .map(|x| *x as u32)
174            .collect_vec();
175
176        // The schema of the shared cdc source upstream is different from snapshot.
177        let cdc_source_schema = ColumnCatalog::debezium_cdc_source_cols()
178            .into_iter()
179            .map(|c| Field::from(c.column_desc).to_prost())
180            .collect_vec();
181
182        let catalog = self
183            .build_backfill_state_catalog(state, self.core.options.is_parallelized_backfill())
184            .to_internal_table_prost();
185
186        // We need to pass the id of upstream source job here
187        let upstream_source_id = self.core.cdc_table_desc.source_id.table_id;
188
189        // filter upstream source chunk by the value of `_rw_table_name` column
190        let filter_expr =
191            Self::build_cdc_filter_expr(self.core.cdc_table_desc.external_table_name.as_str());
192
193        let filter_operator_id = self.core.ctx.next_plan_node_id();
194        // The filter node receive chunks in `(payload, _rw_offset, _rw_table_name)` schema
195        let filter_stream_node = StreamNode {
196            operator_id: filter_operator_id.0 as _,
197            input: vec![
198                // The merge node body will be filled by the `ActorBuilder` on the meta service.
199                PbStreamNode {
200                    node_body: Some(PbNodeBody::Merge(Default::default())),
201                    identity: "Upstream".into(),
202                    fields: cdc_source_schema.clone(),
203                    stream_key: vec![], // not used
204                    ..Default::default()
205                },
206            ],
207            stream_key: vec![], // not used
208            stream_kind: PbStreamKind::AppendOnly as _,
209            identity: "StreamCdcFilter".to_owned(),
210            fields: cdc_source_schema.clone(),
211            node_body: Some(PbNodeBody::CdcFilter(Box::new(CdcFilterNode {
212                search_condition: Some(filter_expr.to_expr_proto()),
213                upstream_source_id,
214            }))),
215        };
216
217        let exchange_operator_id = self.core.ctx.next_plan_node_id();
218        let strategy = if self.core.options.is_parallelized_backfill() {
219            DispatchStrategy {
220                r#type: DispatcherType::Broadcast as _,
221                dist_key_indices: vec![],
222                output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
223            }
224        } else {
225            DispatchStrategy {
226                r#type: DispatcherType::Simple as _,
227                dist_key_indices: vec![], // simple exchange doesn't need dist key
228                output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
229            }
230        };
231        // Add a simple exchange node between filter and stream scan
232        let exchange_stream_node = StreamNode {
233            operator_id: exchange_operator_id.0 as _,
234            input: vec![filter_stream_node],
235            stream_key: vec![], // not used
236            stream_kind: PbStreamKind::AppendOnly as _,
237            identity: "Exchange".to_owned(),
238            fields: cdc_source_schema.clone(),
239            node_body: Some(PbNodeBody::Exchange(Box::new(ExchangeNode {
240                strategy: Some(strategy),
241            }))),
242        };
243
244        // The required columns from the external table
245        let upstream_column_ids = self
246            .core
247            .output_and_pk_column_ids()
248            .iter()
249            .map(ColumnId::get_id)
250            .collect_vec();
251
252        let output_indices = self
253            .core
254            .output_column_ids()
255            .iter()
256            .map(|i| {
257                upstream_column_ids
258                    .iter()
259                    .position(|&x| x == i.get_id())
260                    .unwrap() as u32
261            })
262            .collect_vec();
263
264        tracing::debug!(
265            output_column_ids=?self.core.output_column_ids(),
266            ?upstream_column_ids,
267            ?output_indices,
268            "stream cdc table scan output indices"
269        );
270
271        let options = self.core.options.to_proto();
272        let stream_scan_body = PbNodeBody::StreamCdcScan(Box::new(StreamCdcScanNode {
273            table_id: upstream_source_id,
274            upstream_column_ids,
275            output_indices,
276            // The table desc used by backfill executor
277            state_table: Some(catalog),
278            cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
279            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
280            disable_backfill: options.disable_backfill,
281            options: Some(options),
282        }));
283
284        // plan: merge -> filter -> exchange(simple) -> stream_scan
285        Ok(PbStreamNode {
286            fields: self.schema().to_prost(),
287            input: vec![exchange_stream_node],
288            node_body: Some(stream_scan_body),
289            stream_key,
290            operator_id: self.base.id().0 as u64,
291            identity: self.distill_to_string(),
292            stream_kind: self.stream_kind().to_protobuf() as i32,
293        })
294    }
295
296    // The filter node receive input chunks in `(payload, _rw_offset, _rw_table_name)` schema
297    pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl {
298        // filter by the `_rw_table_name` column
299        FunctionCall::new(
300            ExprType::Equal,
301            vec![
302                InputRef::new(2, DataType::Varchar).into(),
303                ExprImpl::literal_varchar(cdc_table_name.into()),
304            ],
305        )
306        .unwrap()
307        .into()
308    }
309}
310
311impl ExprRewritable<Stream> for StreamCdcTableScan {
312    fn has_rewritable_expr(&self) -> bool {
313        true
314    }
315
316    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
317        let core = self.core.clone();
318        core.rewrite_exprs(r);
319        Self::new(core).into()
320    }
321}
322
323impl ExprVisitable for StreamCdcTableScan {
324    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
325        self.core.visit_exprs(v);
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use std::str::FromStr;
332
333    use risingwave_common::row::OwnedRow;
334    use risingwave_common::types::{JsonbVal, ScalarImpl};
335
336    use super::*;
337
338    #[tokio::test]
339    async fn test_cdc_filter_expr() {
340        let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
341        let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
342
343        // NOTES: transaction metadata column expects to be filtered out before going to cdc filter
344        let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap();
345        let row1 = OwnedRow::new(vec![
346            Some(t1_json.into()),
347            Some(r#"{"file": "1.binlog", "pos": 100}"#.into()),
348            Some("public.t2".into()),
349        ]);
350        let row2 = OwnedRow::new(vec![
351            Some(t2_json.into()),
352            Some(r#"{"file": "2.binlog", "pos": 100}"#.into()),
353            Some("abs.t2".into()),
354        ]);
355
356        let row3 = OwnedRow::new(vec![
357            Some(trx_json.into()),
358            Some(r#"{"file": "3.binlog", "pos": 100}"#.into()),
359            Some("public.t2".into()),
360        ]);
361
362        let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("public.t2");
363        assert_eq!(
364            filter_expr.eval_row(&row1).await.unwrap(),
365            Some(ScalarImpl::Bool(true))
366        );
367        assert_eq!(
368            filter_expr.eval_row(&row2).await.unwrap(),
369            Some(ScalarImpl::Bool(false))
370        );
371        assert_eq!(
372            filter_expr.eval_row(&row3).await.unwrap(),
373            Some(ScalarImpl::Bool(true))
374        )
375    }
376}