risingwave_connector/source/cdc/external/
mock_external_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::AtomicUsize;
16
17use futures::stream::BoxStream;
18use futures_async_stream::try_stream;
19use risingwave_common::row::OwnedRow;
20use risingwave_common::types::ScalarImpl;
21
22use crate::error::{ConnectorError, ConnectorResult};
23use crate::source::cdc::external::{
24    CdcOffset, CdcOffsetParseFunc, ExternalTableReader, MySqlOffset, SchemaTableName,
25};
26
27#[derive(Debug)]
28pub struct MockExternalTableReader {
29    binlog_watermarks: Vec<MySqlOffset>,
30    snapshot_cnt: AtomicUsize,
31}
32
33impl MockExternalTableReader {
34    pub fn new() -> Self {
35        let binlog_file = String::from("1.binlog");
36        // mock binlog watermarks for backfill
37        // initial low watermark: 1.binlog, pos=2 and expected behaviors:
38        // - ignore events before (1.binlog, pos=2);
39        // - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot
40        let binlog_watermarks = vec![
41            MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark
42            MySqlOffset::new(binlog_file.clone(), 4),
43            MySqlOffset::new(binlog_file.clone(), 6),
44            MySqlOffset::new(binlog_file.clone(), 8),
45            MySqlOffset::new(binlog_file.clone(), 10),
46        ];
47        Self {
48            binlog_watermarks,
49            snapshot_cnt: AtomicUsize::new(0),
50        }
51    }
52
53    pub fn get_normalized_table_name(_table_name: &SchemaTableName) -> String {
54        "`mock_table`".to_owned()
55    }
56
57    pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
58        Box::new(move |offset| {
59            Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
60                offset,
61            )?))
62        })
63    }
64
65    /// The snapshot will emit to downstream all in once, because it is too small.
66    /// After that we will emit the buffered upstream chunks all in one.
67    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
68    async fn snapshot_read_inner(&self) {
69        let snap_idx = self
70            .snapshot_cnt
71            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
72        println!("snapshot read: idx {}", snap_idx);
73
74        let snap0 = vec![
75            OwnedRow::new(vec![
76                Some(ScalarImpl::Int64(1)),
77                Some(ScalarImpl::Float64(1.0001.into())),
78            ]),
79            OwnedRow::new(vec![
80                Some(ScalarImpl::Int64(1)),
81                Some(ScalarImpl::Float64(11.00.into())),
82            ]),
83            OwnedRow::new(vec![
84                Some(ScalarImpl::Int64(2)),
85                Some(ScalarImpl::Float64(22.00.into())),
86            ]),
87            OwnedRow::new(vec![
88                Some(ScalarImpl::Int64(5)),
89                Some(ScalarImpl::Float64(1.0005.into())),
90            ]),
91            OwnedRow::new(vec![
92                Some(ScalarImpl::Int64(6)),
93                Some(ScalarImpl::Float64(1.0006.into())),
94            ]),
95            OwnedRow::new(vec![
96                Some(ScalarImpl::Int64(8)),
97                Some(ScalarImpl::Float64(1.0008.into())),
98            ]),
99        ];
100
101        let snapshots = [snap0];
102        if snap_idx >= snapshots.len() {
103            return Ok(());
104        }
105
106        for row in &snapshots[snap_idx] {
107            yield row.clone();
108        }
109    }
110}
111
112impl ExternalTableReader for MockExternalTableReader {
113    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
114        static IDX: AtomicUsize = AtomicUsize::new(0);
115
116        let idx = IDX.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
117        if idx < self.binlog_watermarks.len() {
118            Ok(CdcOffset::MySql(self.binlog_watermarks[idx].clone()))
119        } else {
120            Ok(CdcOffset::MySql(MySqlOffset {
121                filename: "1.binlog".to_owned(),
122                position: u64::MAX,
123            }))
124        }
125    }
126
127    fn snapshot_read(
128        &self,
129        _table_name: SchemaTableName,
130        _start_pk: Option<OwnedRow>,
131        _primary_keys: Vec<String>,
132        _limit: u32,
133    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
134        self.snapshot_read_inner()
135    }
136}