risingwave_connector/source/cdc/external/
mock_external_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::AtomicUsize;
16
17use futures::stream::BoxStream;
18use futures_async_stream::try_stream;
19use risingwave_common::catalog::Field;
20use risingwave_common::row::OwnedRow;
21use risingwave_common::types::{DataType, ScalarImpl};
22use risingwave_common::util::sort_util::{OrderType, cmp_datum};
23
24use crate::error::{ConnectorError, ConnectorResult};
25use crate::source::CdcTableSnapshotSplit;
26use crate::source::cdc::external::{
27    CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, ExternalTableReader, MySqlOffset,
28    SchemaTableName,
29};
30#[derive(Debug)]
31pub struct MockExternalTableReader {
32    binlog_watermarks: Vec<MySqlOffset>,
33    snapshot_cnt: AtomicUsize,
34    parallel_backfill_snapshots: Vec<OwnedRow>,
35}
36
37impl MockExternalTableReader {
38    pub fn new() -> Self {
39        let binlog_file = String::from("1.binlog");
40        // mock binlog watermarks for backfill
41        // initial low watermark: 1.binlog, pos=2 and expected behaviors:
42        // - ignore events before (1.binlog, pos=2);
43        // - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot
44        let binlog_watermarks = vec![
45            MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark
46            MySqlOffset::new(binlog_file.clone(), 4),
47            MySqlOffset::new(binlog_file.clone(), 6),
48            MySqlOffset::new(binlog_file.clone(), 8),
49            MySqlOffset::new(binlog_file.clone(), 10),
50        ];
51        let parallel_backfill_snapshots = vec![
52            OwnedRow::new(vec![
53                Some(ScalarImpl::Int64(1)),
54                Some(ScalarImpl::Float64(1.0001.into())),
55            ]),
56            OwnedRow::new(vec![
57                Some(ScalarImpl::Int64(1)),
58                Some(ScalarImpl::Float64(11.00.into())),
59            ]),
60            OwnedRow::new(vec![
61                Some(ScalarImpl::Int64(2)),
62                Some(ScalarImpl::Float64(22.00.into())),
63            ]),
64            OwnedRow::new(vec![
65                Some(ScalarImpl::Int64(5)),
66                Some(ScalarImpl::Float64(1.0005.into())),
67            ]),
68            OwnedRow::new(vec![
69                Some(ScalarImpl::Int64(6)),
70                Some(ScalarImpl::Float64(1.0006.into())),
71            ]),
72            OwnedRow::new(vec![
73                Some(ScalarImpl::Int64(900)),
74                Some(ScalarImpl::Float64(900.1.into())),
75            ]),
76            OwnedRow::new(vec![
77                Some(ScalarImpl::Int64(8)),
78                Some(ScalarImpl::Float64(1.0008.into())),
79            ]),
80            OwnedRow::new(vec![
81                Some(ScalarImpl::Int64(400)),
82                Some(ScalarImpl::Float64(400.1.into())),
83            ]),
84        ];
85        Self {
86            binlog_watermarks,
87            snapshot_cnt: AtomicUsize::new(0),
88            parallel_backfill_snapshots,
89        }
90    }
91
92    pub fn get_normalized_table_name(_table_name: &SchemaTableName) -> String {
93        "`mock_table`".to_owned()
94    }
95
96    pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
97        Box::new(move |offset| {
98            Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
99                offset,
100            )?))
101        })
102    }
103
104    /// The snapshot will emit to downstream all in once, because it is too small.
105    /// After that we will emit the buffered upstream chunks all in one.
106    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
107    async fn snapshot_read_inner(&self) {
108        let snap_idx = self
109            .snapshot_cnt
110            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
111        println!("snapshot read: idx {}", snap_idx);
112
113        let snap0 = vec![
114            OwnedRow::new(vec![
115                Some(ScalarImpl::Int64(1)),
116                Some(ScalarImpl::Float64(1.0001.into())),
117            ]),
118            OwnedRow::new(vec![
119                Some(ScalarImpl::Int64(1)),
120                Some(ScalarImpl::Float64(11.00.into())),
121            ]),
122            OwnedRow::new(vec![
123                Some(ScalarImpl::Int64(2)),
124                Some(ScalarImpl::Float64(22.00.into())),
125            ]),
126            OwnedRow::new(vec![
127                Some(ScalarImpl::Int64(5)),
128                Some(ScalarImpl::Float64(1.0005.into())),
129            ]),
130            OwnedRow::new(vec![
131                Some(ScalarImpl::Int64(6)),
132                Some(ScalarImpl::Float64(1.0006.into())),
133            ]),
134            OwnedRow::new(vec![
135                Some(ScalarImpl::Int64(8)),
136                Some(ScalarImpl::Float64(1.0008.into())),
137            ]),
138        ];
139
140        let snapshots = [snap0];
141        if snap_idx >= snapshots.len() {
142            return Ok(());
143        }
144
145        for row in &snapshots[snap_idx] {
146            yield row.clone();
147        }
148    }
149
150    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
151    async fn split_snapshot_read_inner(&self, left: OwnedRow, right: OwnedRow) {
152        for row in &self.parallel_backfill_snapshots {
153            if (left[0].is_none()
154                || cmp_datum(&row[0], &left[0], OrderType::ascending_nulls_first()).is_ge())
155                && (right[0].is_none()
156                    || cmp_datum(&row[0], &right[0], OrderType::ascending_nulls_first()).is_lt())
157            {
158                yield row.clone();
159            }
160        }
161    }
162}
163
164impl ExternalTableReader for MockExternalTableReader {
165    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
166        static IDX: AtomicUsize = AtomicUsize::new(0);
167
168        let idx = IDX.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
169        if idx < self.binlog_watermarks.len() {
170            Ok(CdcOffset::MySql(self.binlog_watermarks[idx].clone()))
171        } else {
172            Ok(CdcOffset::MySql(MySqlOffset {
173                filename: "1.binlog".to_owned(),
174                position: u64::MAX,
175            }))
176        }
177    }
178
179    fn snapshot_read(
180        &self,
181        _table_name: SchemaTableName,
182        _start_pk: Option<OwnedRow>,
183        _primary_keys: Vec<String>,
184        _limit: u32,
185    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
186        self.snapshot_read_inner()
187    }
188
189    fn get_parallel_cdc_splits(
190        &self,
191        _options: CdcTableSnapshotSplitOption,
192    ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
193        unreachable!()
194    }
195
196    fn split_snapshot_read(
197        &self,
198        _table_name: SchemaTableName,
199        left: OwnedRow,
200        right: OwnedRow,
201        split_columns: Vec<Field>,
202    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
203        assert_eq!(split_columns.len(), 1);
204        assert_eq!(split_columns[0].data_type, DataType::Int64);
205        self.split_snapshot_read_inner(left, right)
206    }
207}