risingwave_stream/executor/backfill/cdc/upstream_table/
snapshot.rs1use std::future::Future;
16
17use futures::{Stream, pin_mut};
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::{ColumnDesc, Field};
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{Scalar, ScalarImpl, Timestamptz};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::RateLimiter;
26use risingwave_connector::source::cdc::external::{
27 CdcOffset, ExternalTableReader, ExternalTableReaderImpl, SchemaTableName,
28};
29use risingwave_pb::plan_common::additional_column::ColumnType;
30
31use super::external::ExternalStorageTable;
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
34use crate::executor::{StreamExecutorError, StreamExecutorResult};
35
36pub trait UpstreamTableRead {
37 fn snapshot_read_full_table(
38 &self,
39 args: SnapshotReadArgs,
40 batch_size: u32,
41 ) -> impl Stream<Item = StreamExecutorResult<Option<StreamChunk>>> + Send + '_;
42
43 fn current_cdc_offset(
44 &self,
45 ) -> impl Future<Output = StreamExecutorResult<Option<CdcOffset>>> + Send + '_;
46
47 async fn disconnect(self) -> StreamExecutorResult<()>;
48
49 fn snapshot_read_table_split(
50 &self,
51 args: SplitSnapshotReadArgs,
52 ) -> impl Stream<Item = StreamExecutorResult<Option<StreamChunk>>> + Send + '_;
53}
54
55#[derive(Debug, Clone)]
56pub struct SnapshotReadArgs {
57 pub current_pos: Option<OwnedRow>,
58 pub rate_limit_rps: Option<u32>,
59 pub pk_indices: Vec<usize>,
60 pub additional_columns: Vec<ColumnDesc>,
61 pub schema_table_name: SchemaTableName,
62 pub database_name: String,
63}
64
65impl SnapshotReadArgs {
66 pub fn new(
67 current_pos: Option<OwnedRow>,
68 rate_limit_rps: Option<u32>,
69 pk_indices: Vec<usize>,
70 additional_columns: Vec<ColumnDesc>,
71 schema_table_name: SchemaTableName,
72 database_name: String,
73 ) -> Self {
74 Self {
75 current_pos,
76 rate_limit_rps,
77 pk_indices,
78 additional_columns,
79 schema_table_name,
80 database_name,
81 }
82 }
83}
84
85#[derive(Debug, Clone)]
86pub struct SplitSnapshotReadArgs {
87 pub left_bound_inclusive: OwnedRow,
88 pub right_bound_exclusive: OwnedRow,
89 pub split_columns: Vec<Field>,
90 pub rate_limit_rps: Option<u32>,
91 pub additional_columns: Vec<ColumnDesc>,
92 pub schema_table_name: SchemaTableName,
93 pub database_name: String,
94}
95
96impl SplitSnapshotReadArgs {
97 pub fn new(
98 left_bound_inclusive: OwnedRow,
99 right_bound_exclusive: OwnedRow,
100 split_columns: Vec<Field>,
101 rate_limit_rps: Option<u32>,
102 additional_columns: Vec<ColumnDesc>,
103 schema_table_name: SchemaTableName,
104 database_name: String,
105 ) -> Self {
106 Self {
107 left_bound_inclusive,
108 right_bound_exclusive,
109 split_columns,
110 rate_limit_rps,
111 additional_columns,
112 schema_table_name,
113 database_name,
114 }
115 }
116}
117
118pub struct UpstreamTableReader<T> {
122 table: T,
123 pub(crate) reader: ExternalTableReaderImpl,
124}
125
126impl<T> UpstreamTableReader<T> {
127 pub fn new(table: T, reader: ExternalTableReaderImpl) -> Self {
128 Self { table, reader }
129 }
130}
131
132fn with_additional_columns(
134 snapshot_chunk: StreamChunk,
135 additional_columns: &[ColumnDesc],
136 schema_table_name: SchemaTableName,
137 database_name: String,
138) -> StreamChunk {
139 let (ops, mut columns, visibility) = snapshot_chunk.into_inner();
140 for desc in additional_columns {
141 let mut builder = desc.data_type.create_array_builder(visibility.len());
142 match *desc.additional_column.column_type.as_ref().unwrap() {
143 ColumnType::Timestamp(_) => builder.append_n(
145 visibility.len(),
146 Some(Timestamptz::default().to_scalar_value()),
147 ),
148 ColumnType::DatabaseName(_) => {
149 builder.append_n(
150 visibility.len(),
151 Some(ScalarImpl::from(database_name.clone())),
152 );
153 }
154 ColumnType::SchemaName(_) => {
155 builder.append_n(
156 visibility.len(),
157 Some(ScalarImpl::from(schema_table_name.schema_name.clone())),
158 );
159 }
160 ColumnType::TableName(_) => {
161 builder.append_n(
162 visibility.len(),
163 Some(ScalarImpl::from(schema_table_name.table_name.clone())),
164 );
165 }
166 _ => {
168 builder.append_n_null(visibility.len());
169 }
170 }
171 columns.push(builder.finish().into());
172 }
173 StreamChunk::with_visibility(ops, columns, visibility)
174}
175
176impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {
177 #[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
178 async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) {
179 let primary_keys = self
180 .table
181 .pk_indices()
182 .iter()
183 .map(|idx| {
184 let f = &self.table.schema().fields[*idx];
185 f.name.clone()
186 })
187 .collect_vec();
188
189 if args.rate_limit_rps == Some(0) {
191 let future = futures::future::pending::<()>();
194 future.await;
195 unreachable!();
196 }
197
198 let rate_limiter = RateLimiter::new(
199 args.rate_limit_rps
200 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
201 .into(),
202 );
203
204 let mut read_args = args;
205 let schema_table_name = read_args.schema_table_name.clone();
206 let database_name = read_args.database_name.clone();
207 loop {
209 tracing::debug!(
210 "snapshot_read primary keys: {:?}, current_pos: {:?}",
211 primary_keys,
212 read_args.current_pos
213 );
214
215 let mut read_count: usize = 0;
216 let row_stream = self.reader.snapshot_read(
217 self.table.schema_table_name(),
218 read_args.current_pos.clone(),
219 primary_keys.clone(),
220 batch_size,
221 );
222
223 pin_mut!(row_stream);
224 let mut builder = DataChunkBuilder::new(
225 self.table.schema().data_types(),
226 limited_chunk_size(read_args.rate_limit_rps),
227 );
228 let chunk_stream = iter_chunks(row_stream, &mut builder);
229 let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default();
230
231 #[for_await]
232 for chunk in chunk_stream {
233 let chunk = chunk?;
234 let chunk_size = chunk.capacity();
235 read_count += chunk.cardinality();
236 current_pk_pos = get_new_pos(&chunk, &read_args.pk_indices);
237
238 if let Some(rate_limit_rps) = read_args.rate_limit_rps
239 && chunk_size != 0
240 {
241 let limit = rate_limit_rps as usize;
244
245 assert!(chunk_size <= limit);
248
249 rate_limiter.wait(chunk_size as _).await;
251 yield Some(with_additional_columns(
252 chunk,
253 &read_args.additional_columns,
254 schema_table_name.clone(),
255 database_name.clone(),
256 ));
257 } else {
258 yield Some(with_additional_columns(
260 chunk,
261 &read_args.additional_columns,
262 schema_table_name.clone(),
263 database_name.clone(),
264 ));
265 continue;
266 }
267 }
268
269 if read_count < batch_size as _ {
271 tracing::debug!("finished loading of full table snapshot");
272 yield None;
273 unreachable!()
274 } else {
275 read_args.current_pos = Some(current_pk_pos);
277 }
278 }
279 }
280
281 #[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
282 async fn snapshot_read_table_split(&self, args: SplitSnapshotReadArgs) {
283 if args.rate_limit_rps == Some(0) {
285 let future = futures::future::pending::<()>();
288 future.await;
289 unreachable!();
290 }
291
292 let rate_limiter = RateLimiter::new(
293 args.rate_limit_rps
294 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
295 .into(),
296 );
297
298 let read_args = args;
299 let schema_table_name = read_args.schema_table_name.clone();
300 let database_name = read_args.database_name.clone();
301 let row_stream = self.reader.split_snapshot_read(
304 self.table.schema_table_name(),
305 read_args.left_bound_inclusive.clone(),
306 read_args.right_bound_exclusive.clone(),
307 read_args.split_columns.clone(),
308 );
309
310 pin_mut!(row_stream);
311 let mut builder = DataChunkBuilder::new(
312 self.table.schema().data_types(),
313 limited_chunk_size(read_args.rate_limit_rps),
314 );
315 let chunk_stream = iter_chunks(row_stream, &mut builder);
316
317 #[for_await]
318 for chunk in chunk_stream {
319 let chunk = chunk?;
320 let chunk_size = chunk.capacity();
321
322 if let Some(rate_limit_rps) = read_args.rate_limit_rps
323 && chunk_size != 0
324 {
325 let limit = rate_limit_rps as usize;
328
329 assert!(chunk_size <= limit);
332
333 rate_limiter.wait(chunk_size as _).await;
335 yield Some(with_additional_columns(
336 chunk,
337 &read_args.additional_columns,
338 schema_table_name.clone(),
339 database_name.clone(),
340 ));
341 } else {
342 yield Some(with_additional_columns(
344 chunk,
345 &read_args.additional_columns,
346 schema_table_name.clone(),
347 database_name.clone(),
348 ));
349 }
350 }
351 yield None;
352 }
353
354 async fn current_cdc_offset(&self) -> StreamExecutorResult<Option<CdcOffset>> {
355 let binlog = self.reader.current_cdc_offset();
356 let binlog = binlog.await?;
357 Ok(Some(binlog))
358 }
359
360 async fn disconnect(self) -> StreamExecutorResult<()> {
361 self.reader.disconnect().await?;
362 Ok(())
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use std::collections::HashMap;
369
370 use futures::pin_mut;
371 use futures_async_stream::for_await;
372 use maplit::{convert_args, hashmap};
373 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
374 use risingwave_common::row::OwnedRow;
375 use risingwave_common::types::{DataType, ScalarImpl};
376 use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
377 use risingwave_connector::source::cdc::external::mysql::MySqlExternalTableReader;
378 use risingwave_connector::source::cdc::external::{
379 ExternalTableConfig, ExternalTableReader, SchemaTableName,
380 };
381
382 use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
383
384 #[ignore]
385 #[tokio::test]
386 async fn test_mysql_table_reader() {
387 let columns = [
388 ColumnDesc::named("o_orderkey", ColumnId::new(1), DataType::Int64),
389 ColumnDesc::named("o_custkey", ColumnId::new(2), DataType::Int64),
390 ColumnDesc::named("o_orderstatus", ColumnId::new(3), DataType::Varchar),
391 ];
392 let rw_schema = Schema {
393 fields: columns.iter().map(Field::from).collect(),
394 };
395 let props: HashMap<String, String> = convert_args!(hashmap!(
396 "hostname" => "localhost",
397 "port" => "8306",
398 "username" => "root",
399 "password" => "123456",
400 "database.name" => "mydb",
401 "table.name" => "orders_rw"));
402
403 let config =
404 serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
405 .unwrap();
406 let reader = MySqlExternalTableReader::new(config, rw_schema.clone())
407 .await
408 .unwrap();
409
410 let mut cnt: usize = 0;
411 let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))]));
412 loop {
413 let row_stream = reader.snapshot_read(
414 SchemaTableName {
415 schema_name: "mydb".to_owned(),
416 table_name: "orders_rw".to_owned(),
417 },
418 start_pk.clone(),
419 vec!["o_orderkey".to_owned()],
420 1000,
421 );
422 let mut builder = DataChunkBuilder::new(rw_schema.clone().data_types(), 256);
423 let chunk_stream = iter_chunks(row_stream, &mut builder);
424 let pk_indices = vec![0];
425 pin_mut!(chunk_stream);
426 #[for_await]
427 for chunk in chunk_stream {
428 let chunk = chunk.expect("data");
429 start_pk = Some(get_new_pos(&chunk, &pk_indices));
430 cnt += chunk.capacity();
431 println!("cnt: {}", cnt);
433 }
434 if cnt >= 1499900 {
435 println!("bye!");
436 break;
437 }
438 }
439 }
440}