risingwave_connector/source/cdc/external/
mock_external_table.rs1use std::sync::atomic::AtomicUsize;
16
17use futures::stream::BoxStream;
18use futures_async_stream::try_stream;
19use risingwave_common::catalog::Field;
20use risingwave_common::row::OwnedRow;
21use risingwave_common::types::{DataType, ScalarImpl};
22use risingwave_common::util::sort_util::{OrderType, cmp_datum};
23
24use crate::error::{ConnectorError, ConnectorResult};
25use crate::source::CdcTableSnapshotSplit;
26use crate::source::cdc::external::{
27 CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, ExternalTableReader, MySqlOffset,
28 SchemaTableName,
29};
30#[derive(Debug)]
31pub struct MockExternalTableReader {
32 binlog_watermarks: Vec<MySqlOffset>,
33 snapshot_cnt: AtomicUsize,
34 parallel_backfill_snapshots: Vec<OwnedRow>,
35}
36
37impl MockExternalTableReader {
38 pub fn new() -> Self {
39 let binlog_file = String::from("1.binlog");
40 let binlog_watermarks = vec![
45 MySqlOffset::new(binlog_file.clone(), 2), MySqlOffset::new(binlog_file.clone(), 4),
47 MySqlOffset::new(binlog_file.clone(), 6),
48 MySqlOffset::new(binlog_file.clone(), 8),
49 MySqlOffset::new(binlog_file.clone(), 10),
50 ];
51 let parallel_backfill_snapshots = vec![
52 OwnedRow::new(vec![
53 Some(ScalarImpl::Int64(1)),
54 Some(ScalarImpl::Float64(1.0001.into())),
55 ]),
56 OwnedRow::new(vec![
57 Some(ScalarImpl::Int64(1)),
58 Some(ScalarImpl::Float64(11.00.into())),
59 ]),
60 OwnedRow::new(vec![
61 Some(ScalarImpl::Int64(2)),
62 Some(ScalarImpl::Float64(22.00.into())),
63 ]),
64 OwnedRow::new(vec![
65 Some(ScalarImpl::Int64(5)),
66 Some(ScalarImpl::Float64(1.0005.into())),
67 ]),
68 OwnedRow::new(vec![
69 Some(ScalarImpl::Int64(6)),
70 Some(ScalarImpl::Float64(1.0006.into())),
71 ]),
72 OwnedRow::new(vec![
73 Some(ScalarImpl::Int64(900)),
74 Some(ScalarImpl::Float64(900.1.into())),
75 ]),
76 OwnedRow::new(vec![
77 Some(ScalarImpl::Int64(8)),
78 Some(ScalarImpl::Float64(1.0008.into())),
79 ]),
80 OwnedRow::new(vec![
81 Some(ScalarImpl::Int64(400)),
82 Some(ScalarImpl::Float64(400.1.into())),
83 ]),
84 ];
85 Self {
86 binlog_watermarks,
87 snapshot_cnt: AtomicUsize::new(0),
88 parallel_backfill_snapshots,
89 }
90 }
91
92 pub fn get_normalized_table_name(_table_name: &SchemaTableName) -> String {
93 "`mock_table`".to_owned()
94 }
95
96 pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
97 Box::new(move |offset| {
98 Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
99 offset,
100 )?))
101 })
102 }
103
104 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
107 async fn snapshot_read_inner(&self) {
108 let snap_idx = self
109 .snapshot_cnt
110 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
111 println!("snapshot read: idx {}", snap_idx);
112
113 let snap0 = vec![
114 OwnedRow::new(vec![
115 Some(ScalarImpl::Int64(1)),
116 Some(ScalarImpl::Float64(1.0001.into())),
117 ]),
118 OwnedRow::new(vec![
119 Some(ScalarImpl::Int64(1)),
120 Some(ScalarImpl::Float64(11.00.into())),
121 ]),
122 OwnedRow::new(vec![
123 Some(ScalarImpl::Int64(2)),
124 Some(ScalarImpl::Float64(22.00.into())),
125 ]),
126 OwnedRow::new(vec![
127 Some(ScalarImpl::Int64(5)),
128 Some(ScalarImpl::Float64(1.0005.into())),
129 ]),
130 OwnedRow::new(vec![
131 Some(ScalarImpl::Int64(6)),
132 Some(ScalarImpl::Float64(1.0006.into())),
133 ]),
134 OwnedRow::new(vec![
135 Some(ScalarImpl::Int64(8)),
136 Some(ScalarImpl::Float64(1.0008.into())),
137 ]),
138 ];
139
140 let snapshots = [snap0];
141 if snap_idx >= snapshots.len() {
142 return Ok(());
143 }
144
145 for row in &snapshots[snap_idx] {
146 yield row.clone();
147 }
148 }
149
150 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
151 async fn split_snapshot_read_inner(&self, left: OwnedRow, right: OwnedRow) {
152 for row in &self.parallel_backfill_snapshots {
153 if (left[0].is_none()
154 || cmp_datum(&row[0], &left[0], OrderType::ascending_nulls_first()).is_ge())
155 && (right[0].is_none()
156 || cmp_datum(&row[0], &right[0], OrderType::ascending_nulls_first()).is_lt())
157 {
158 yield row.clone();
159 }
160 }
161 }
162}
163
164impl ExternalTableReader for MockExternalTableReader {
165 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
166 static IDX: AtomicUsize = AtomicUsize::new(0);
167
168 let idx = IDX.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
169 if idx < self.binlog_watermarks.len() {
170 Ok(CdcOffset::MySql(self.binlog_watermarks[idx].clone()))
171 } else {
172 Ok(CdcOffset::MySql(MySqlOffset {
173 filename: "1.binlog".to_owned(),
174 position: u64::MAX,
175 }))
176 }
177 }
178
179 fn snapshot_read(
180 &self,
181 _table_name: SchemaTableName,
182 _start_pk: Option<OwnedRow>,
183 _primary_keys: Vec<String>,
184 _limit: u32,
185 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
186 self.snapshot_read_inner()
187 }
188
189 fn get_parallel_cdc_splits(
190 &self,
191 _options: CdcTableSnapshotSplitOption,
192 ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
193 unreachable!()
194 }
195
196 fn split_snapshot_read(
197 &self,
198 _table_name: SchemaTableName,
199 left: OwnedRow,
200 right: OwnedRow,
201 split_columns: Vec<Field>,
202 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
203 assert_eq!(split_columns.len(), 1);
204 assert_eq!(split_columns[0].data_type, DataType::Int64);
205 self.split_snapshot_read_inner(left, right)
206 }
207}