risingwave_connector/source/cdc/external/
mock_external_table.rs1use std::sync::atomic::AtomicUsize;
16
17use futures::stream::BoxStream;
18use futures_async_stream::try_stream;
19use risingwave_common::row::OwnedRow;
20use risingwave_common::types::ScalarImpl;
21
22use crate::error::{ConnectorError, ConnectorResult};
23use crate::source::cdc::external::{
24 CdcOffset, CdcOffsetParseFunc, ExternalTableReader, MySqlOffset, SchemaTableName,
25};
26
27#[derive(Debug)]
28pub struct MockExternalTableReader {
29 binlog_watermarks: Vec<MySqlOffset>,
30 snapshot_cnt: AtomicUsize,
31}
32
33impl MockExternalTableReader {
34 pub fn new() -> Self {
35 let binlog_file = String::from("1.binlog");
36 let binlog_watermarks = vec![
41 MySqlOffset::new(binlog_file.clone(), 2), MySqlOffset::new(binlog_file.clone(), 4),
43 MySqlOffset::new(binlog_file.clone(), 6),
44 MySqlOffset::new(binlog_file.clone(), 8),
45 MySqlOffset::new(binlog_file.clone(), 10),
46 ];
47 Self {
48 binlog_watermarks,
49 snapshot_cnt: AtomicUsize::new(0),
50 }
51 }
52
53 pub fn get_normalized_table_name(_table_name: &SchemaTableName) -> String {
54 "`mock_table`".to_owned()
55 }
56
57 pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
58 Box::new(move |offset| {
59 Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
60 offset,
61 )?))
62 })
63 }
64
65 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
68 async fn snapshot_read_inner(&self) {
69 let snap_idx = self
70 .snapshot_cnt
71 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
72 println!("snapshot read: idx {}", snap_idx);
73
74 let snap0 = vec![
75 OwnedRow::new(vec![
76 Some(ScalarImpl::Int64(1)),
77 Some(ScalarImpl::Float64(1.0001.into())),
78 ]),
79 OwnedRow::new(vec![
80 Some(ScalarImpl::Int64(1)),
81 Some(ScalarImpl::Float64(11.00.into())),
82 ]),
83 OwnedRow::new(vec![
84 Some(ScalarImpl::Int64(2)),
85 Some(ScalarImpl::Float64(22.00.into())),
86 ]),
87 OwnedRow::new(vec![
88 Some(ScalarImpl::Int64(5)),
89 Some(ScalarImpl::Float64(1.0005.into())),
90 ]),
91 OwnedRow::new(vec![
92 Some(ScalarImpl::Int64(6)),
93 Some(ScalarImpl::Float64(1.0006.into())),
94 ]),
95 OwnedRow::new(vec![
96 Some(ScalarImpl::Int64(8)),
97 Some(ScalarImpl::Float64(1.0008.into())),
98 ]),
99 ];
100
101 let snapshots = [snap0];
102 if snap_idx >= snapshots.len() {
103 return Ok(());
104 }
105
106 for row in &snapshots[snap_idx] {
107 yield row.clone();
108 }
109 }
110}
111
112impl ExternalTableReader for MockExternalTableReader {
113 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
114 static IDX: AtomicUsize = AtomicUsize::new(0);
115
116 let idx = IDX.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
117 if idx < self.binlog_watermarks.len() {
118 Ok(CdcOffset::MySql(self.binlog_watermarks[idx].clone()))
119 } else {
120 Ok(CdcOffset::MySql(MySqlOffset {
121 filename: "1.binlog".to_owned(),
122 position: u64::MAX,
123 }))
124 }
125 }
126
127 fn snapshot_read(
128 &self,
129 _table_name: SchemaTableName,
130 _start_pk: Option<OwnedRow>,
131 _primary_keys: Vec<String>,
132 _limit: u32,
133 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
134 self.snapshot_read_inner()
135 }
136}