risingwave_frontend/optimizer/plan_node/
stream_cdc_table_scan.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::{ColumnCatalog, Field};
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::normalize_simple_postgres_quoted_table_name;
21use risingwave_pb::stream_plan::PbStreamNode;
22use risingwave_pb::stream_plan::stream_node::{PbNodeBody, PbStreamKind};
23
24use super::stream::prelude::*;
25use super::utils::{Distill, childless_record};
26use super::{ExprRewritable, PlanBase, StreamNode, StreamPlanRef as PlanRef, generic};
27use crate::catalog::ColumnId;
28use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef};
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
31use crate::optimizer::property::{Distribution, DistributionDisplay};
32use crate::scheduler::SchedulerResult;
33use crate::stream_fragmenter::BuildFragmentGraphState;
34use crate::{Explain, TableCatalog};
35
36/// `StreamCdcTableScan` is a virtual plan node to represent a stream cdc table scan.
37/// It will be converted to cdc backfill + merge node (for upstream source)
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct StreamCdcTableScan {
40    pub base: PlanBase<Stream>,
41    core: generic::CdcScan,
42}
43
44impl StreamCdcTableScan {
45    pub fn new(core: generic::CdcScan) -> Self {
46        let distribution = Distribution::SomeShard;
47        let base = PlanBase::new_stream_with_core(
48            &core,
49            distribution,
50            StreamKind::Retract,
51            false,
52            core.watermark_columns(),
53            core.columns_monotonicity(),
54        );
55        Self { base, core }
56    }
57
58    pub fn table_name(&self) -> &str {
59        &self.core.table_name
60    }
61
62    pub fn core(&self) -> &generic::CdcScan {
63        &self.core
64    }
65
66    /// Build catalog for cdc backfill state.
67    ///
68    /// For non-parallelized cdc backfill:
69    /// Right now we only persist whether the backfill is finished and the corresponding cdc offset
70    /// schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
71    ///
72    /// For parallelized cdc backfill:
73    /// schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset_low` | `cdc_offset_high` |
74    pub fn build_backfill_state_catalog(
75        &self,
76        state: &mut BuildFragmentGraphState,
77        is_parallelized_backfill: bool,
78    ) -> TableCatalog {
79        if is_parallelized_backfill {
80            let mut catalog_builder = TableCatalogBuilder::default();
81            // Use `split_id` as primary key in state table.
82            catalog_builder.add_column(&Field::with_name(DataType::Int64, "split_id"));
83            catalog_builder.add_order_column(0, OrderType::ascending());
84            catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
85            // `row_count` column, the number of rows read from snapshot
86            catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
87            catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset_low"));
88            catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset_high"));
89            catalog_builder
90                .build(vec![], 1)
91                .with_id(state.gen_table_id_wrapped())
92        } else {
93            let mut catalog_builder = TableCatalogBuilder::default();
94            let upstream_schema = &self.core.get_table_columns();
95
96            // Use `split_id` as primary key in state table.
97            // Currently we only support single split for cdc backfill.
98            catalog_builder.add_column(&Field::with_name(DataType::Varchar, "split_id"));
99            catalog_builder.add_order_column(0, OrderType::ascending());
100
101            // pk columns
102            for col_order in self.core.primary_key() {
103                let col = &upstream_schema[col_order.column_index];
104                catalog_builder.add_column(&Field::from(col));
105            }
106
107            catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
108
109            // `row_count` column, the number of rows read from snapshot
110            catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
111
112            // The offset is only for observability, not for recovery right now
113            catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset"));
114
115            // leave dist key empty, since the cdc backfill executor is singleton
116            catalog_builder
117                .build(vec![], 1)
118                .with_id(state.gen_table_id_wrapped())
119        }
120    }
121}
122
123impl_plan_tree_node_for_leaf! { Stream, StreamCdcTableScan }
124
125impl Distill for StreamCdcTableScan {
126    fn distill<'a>(&self) -> XmlNode<'a> {
127        let verbose = self.base.ctx().is_explain_verbose();
128        let mut vec = Vec::with_capacity(4);
129        vec.push(("table", Pretty::from(self.core.table_name.clone())));
130        vec.push(("columns", self.core.columns_pretty(verbose)));
131
132        if verbose {
133            let pk = IndicesDisplay {
134                indices: self.stream_key().unwrap_or_default(),
135                schema: self.base.schema(),
136            };
137            vec.push(("pk", pk.distill()));
138            let dist = Pretty::display(&DistributionDisplay {
139                distribution: self.distribution(),
140                input_schema: self.base.schema(),
141            });
142            vec.push(("dist", dist));
143        }
144
145        childless_record("StreamCdcTableScan", vec)
146    }
147}
148
149impl StreamNode for StreamCdcTableScan {
150    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
151        unreachable!(
152            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
153        )
154    }
155}
156
157impl StreamCdcTableScan {
158    /// plan: merge -> filter -> exchange(simple) -> `stream_scan`
159    pub fn adhoc_to_stream_prost(
160        &self,
161        state: &mut BuildFragmentGraphState,
162    ) -> SchedulerResult<PbStreamNode> {
163        use risingwave_pb::stream_plan::*;
164
165        let stream_key = self
166            .stream_key()
167            .unwrap_or_else(|| {
168                panic!(
169                    "should always have a stream key in the stream plan but not, sub plan: {}",
170                    PlanRef::from(self.clone()).explain_to_string()
171                )
172            })
173            .iter()
174            .map(|x| *x as u32)
175            .collect_vec();
176
177        // The schema of the shared cdc source upstream is different from snapshot.
178        let cdc_source_schema = ColumnCatalog::debezium_cdc_source_cols()
179            .into_iter()
180            .map(|c| Field::from(c.column_desc).to_prost())
181            .collect_vec();
182
183        let catalog = self
184            .build_backfill_state_catalog(state, self.core.options.is_parallelized_backfill())
185            .to_internal_table_prost();
186
187        // We need to pass the id of upstream source job here
188        let upstream_source_id = self.core.cdc_table_desc.source_id;
189
190        // filter upstream source chunk by the value of `_rw_table_name` column
191        let filter_expr =
192            Self::build_cdc_filter_expr(self.core.cdc_table_desc.external_table_name.as_str());
193
194        let filter_operator_id = self.core.ctx.next_plan_node_id();
195        // The filter node receive chunks in `(payload, _rw_offset, _rw_table_name)` schema
196        let filter_stream_node = StreamNode {
197            operator_id: filter_operator_id.to_stream_node_operator_id(),
198            input: vec![
199                // The merge node body will be filled by the `ActorBuilder` on the meta service.
200                PbStreamNode {
201                    node_body: Some(PbNodeBody::Merge(Default::default())),
202                    identity: "Upstream".into(),
203                    fields: cdc_source_schema.clone(),
204                    stream_key: vec![], // not used
205                    ..Default::default()
206                },
207            ],
208            stream_key: vec![], // not used
209            stream_kind: PbStreamKind::AppendOnly as _,
210            identity: "StreamCdcFilter".to_owned(),
211            fields: cdc_source_schema.clone(),
212            node_body: Some(PbNodeBody::CdcFilter(Box::new(CdcFilterNode {
213                search_condition: Some(filter_expr.to_expr_proto()),
214                upstream_source_id,
215            }))),
216        };
217
218        let exchange_operator_id = self.core.ctx.next_plan_node_id();
219        let strategy = if self.core.options.is_parallelized_backfill() {
220            DispatchStrategy {
221                r#type: DispatcherType::Broadcast as _,
222                dist_key_indices: vec![],
223                output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
224            }
225        } else {
226            DispatchStrategy {
227                r#type: DispatcherType::Simple as _,
228                dist_key_indices: vec![], // simple exchange doesn't need dist key
229                output_mapping: PbDispatchOutputMapping::identical(cdc_source_schema.len()).into(),
230            }
231        };
232        // Add a simple exchange node between filter and stream scan
233        let exchange_stream_node = StreamNode {
234            operator_id: exchange_operator_id.to_stream_node_operator_id(),
235            input: vec![filter_stream_node],
236            stream_key: vec![], // not used
237            stream_kind: PbStreamKind::AppendOnly as _,
238            identity: "Exchange".to_owned(),
239            fields: cdc_source_schema,
240            node_body: Some(PbNodeBody::Exchange(Box::new(ExchangeNode {
241                strategy: Some(strategy),
242            }))),
243        };
244
245        // The required columns from the external table
246        let upstream_column_ids = self
247            .core
248            .output_and_pk_column_ids()
249            .iter()
250            .map(ColumnId::get_id)
251            .collect_vec();
252
253        let output_indices = self
254            .core
255            .output_column_ids()
256            .iter()
257            .map(|i| {
258                upstream_column_ids
259                    .iter()
260                    .position(|&x| x == i.get_id())
261                    .unwrap() as u32
262            })
263            .collect_vec();
264
265        tracing::debug!(
266            output_column_ids=?self.core.output_column_ids(),
267            ?upstream_column_ids,
268            ?output_indices,
269            "stream cdc table scan output indices"
270        );
271
272        let options = self.core.options.to_proto();
273        let stream_scan_body = PbNodeBody::StreamCdcScan(Box::new(StreamCdcScanNode {
274            table_id: upstream_source_id.as_cdc_table_id(),
275            upstream_column_ids,
276            output_indices,
277            // The table desc used by backfill executor
278            state_table: Some(catalog),
279            cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
280            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
281            disable_backfill: options.disable_backfill,
282            options: Some(options),
283        }));
284
285        // plan: merge -> filter -> exchange(simple) -> stream_scan
286        Ok(PbStreamNode {
287            fields: self.schema().to_prost(),
288            input: vec![exchange_stream_node],
289            node_body: Some(stream_scan_body),
290            stream_key,
291            operator_id: self.base.id().to_stream_node_operator_id(),
292            identity: self.distill_to_string(),
293            stream_kind: self.stream_kind().to_protobuf() as i32,
294        })
295    }
296
297    // The filter node receive input chunks in `(payload, _rw_offset, _rw_table_name)` schema
298    pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl {
299        let table_name_ref: ExprImpl = InputRef::new(2, DataType::Varchar).into();
300        let mut filter_expr = build_cdc_table_name_eq_expr(table_name_ref.clone(), cdc_table_name);
301
302        if let Some(compat_table_name) = normalize_simple_postgres_quoted_table_name(cdc_table_name)
303        {
304            // The left branch keeps matching the catalog/user-facing table name stored in
305            // `ExternalTableDesc`, e.g. `public."TableName"`. The right branch matches the
306            // Debezium runtime routing key carried in `_rw_table_name`, e.g. `public.TableName`.
307            // See `DbzChangeEventConsumer` for deriving the runtime table name from Debezium
308            // topics, and `DebeziumCdcMeta::extract_table_name` for how `_rw_table_name` is
309            // populated from that runtime name.
310            filter_expr = FunctionCall::new(
311                ExprType::Or,
312                vec![
313                    filter_expr,
314                    build_cdc_table_name_eq_expr(table_name_ref, &compat_table_name),
315                ],
316            )
317            .unwrap()
318            .into();
319        }
320
321        filter_expr
322    }
323}
324
325fn build_cdc_table_name_eq_expr(table_name_ref: ExprImpl, cdc_table_name: &str) -> ExprImpl {
326    FunctionCall::new(
327        ExprType::Equal,
328        vec![
329            table_name_ref,
330            ExprImpl::literal_varchar(cdc_table_name.into()),
331        ],
332    )
333    .unwrap()
334    .into()
335}
336
337impl ExprRewritable<Stream> for StreamCdcTableScan {
338    fn has_rewritable_expr(&self) -> bool {
339        true
340    }
341
342    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
343        let core = self.core.clone();
344        core.rewrite_exprs(r);
345        Self::new(core).into()
346    }
347}
348
349impl ExprVisitable for StreamCdcTableScan {
350    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
351        self.core.visit_exprs(v);
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use std::str::FromStr;
358
359    use risingwave_common::row::OwnedRow;
360    use risingwave_common::types::{JsonbVal, ScalarImpl};
361
362    use super::*;
363
364    #[tokio::test]
365    async fn test_cdc_filter_expr() {
366        let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
367        let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
368
369        // NOTES: transaction metadata column expects to be filtered out before going to cdc filter
370        let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap();
371        let row1 = OwnedRow::new(vec![
372            Some(t1_json.into()),
373            Some(r#"{"file": "1.binlog", "pos": 100}"#.into()),
374            Some("public.t2".into()),
375        ]);
376        let row2 = OwnedRow::new(vec![
377            Some(t2_json.into()),
378            Some(r#"{"file": "2.binlog", "pos": 100}"#.into()),
379            Some("abs.t2".into()),
380        ]);
381
382        let row3 = OwnedRow::new(vec![
383            Some(trx_json.into()),
384            Some(r#"{"file": "3.binlog", "pos": 100}"#.into()),
385            Some("public.t2".into()),
386        ]);
387
388        let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("public.t2");
389        assert_eq!(
390            filter_expr.eval_row(&row1).await.unwrap(),
391            Some(ScalarImpl::Bool(true))
392        );
393        assert_eq!(
394            filter_expr.eval_row(&row2).await.unwrap(),
395            Some(ScalarImpl::Bool(false))
396        );
397        assert_eq!(
398            filter_expr.eval_row(&row3).await.unwrap(),
399            Some(ScalarImpl::Bool(true))
400        )
401    }
402
403    #[tokio::test]
404    async fn test_cdc_filter_expr_postgres_quoted_table_name_compat() {
405        let row_with_unquoted_schema_table =
406            OwnedRow::new(vec![None, None, Some("public.QuotedNoteE2e".into())]);
407        let row_with_other_table = OwnedRow::new(vec![None, None, Some("public.other".into())]);
408
409        let filter_expr = StreamCdcTableScan::build_cdc_filter_expr(r#"public."QuotedNoteE2e""#);
410        assert_eq!(
411            filter_expr
412                .eval_row(&row_with_unquoted_schema_table)
413                .await
414                .unwrap(),
415            Some(ScalarImpl::Bool(true))
416        );
417        assert_eq!(
418            filter_expr.eval_row(&row_with_other_table).await.unwrap(),
419            Some(ScalarImpl::Bool(false))
420        );
421    }
422}