risingwave_stream/executor/backfill/cdc/upstream_table/
snapshot.rs1use std::future::Future;
16
17use futures::{Stream, pin_mut};
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{Scalar, ScalarImpl, Timestamptz};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::RateLimiter;
26use risingwave_connector::source::cdc::external::{
27 CdcOffset, ExternalTableReader, ExternalTableReaderImpl, SchemaTableName,
28};
29use risingwave_pb::plan_common::additional_column::ColumnType;
30
31use super::external::ExternalStorageTable;
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
34use crate::executor::{StreamExecutorError, StreamExecutorResult};
35
36pub trait UpstreamTableRead {
37 fn snapshot_read_full_table(
38 &self,
39 args: SnapshotReadArgs,
40 batch_size: u32,
41 ) -> impl Stream<Item = StreamExecutorResult<Option<StreamChunk>>> + Send + '_;
42
43 fn current_cdc_offset(
44 &self,
45 ) -> impl Future<Output = StreamExecutorResult<Option<CdcOffset>>> + Send + '_;
46}
47
48#[derive(Debug, Clone)]
49pub struct SnapshotReadArgs {
50 pub current_pos: Option<OwnedRow>,
51 pub rate_limit_rps: Option<u32>,
52 pub pk_indices: Vec<usize>,
53 pub additional_columns: Vec<ColumnDesc>,
54 pub schema_table_name: SchemaTableName,
55 pub database_name: String,
56}
57
58impl SnapshotReadArgs {
59 pub fn new(
60 current_pos: Option<OwnedRow>,
61 rate_limit_rps: Option<u32>,
62 pk_indices: Vec<usize>,
63 additional_columns: Vec<ColumnDesc>,
64 schema_table_name: SchemaTableName,
65 database_name: String,
66 ) -> Self {
67 Self {
68 current_pos,
69 rate_limit_rps,
70 pk_indices,
71 additional_columns,
72 schema_table_name,
73 database_name,
74 }
75 }
76}
77
78pub struct UpstreamTableReader<T> {
82 table: T,
83 pub(crate) reader: ExternalTableReaderImpl,
84}
85
86impl<T> UpstreamTableReader<T> {
87 pub fn new(table: T, reader: ExternalTableReaderImpl) -> Self {
88 Self { table, reader }
89 }
90}
91
92fn with_additional_columns(
94 snapshot_chunk: StreamChunk,
95 additional_columns: &[ColumnDesc],
96 schema_table_name: SchemaTableName,
97 database_name: String,
98) -> StreamChunk {
99 let (ops, mut columns, visibility) = snapshot_chunk.into_inner();
100 for desc in additional_columns {
101 let mut builder = desc.data_type.create_array_builder(visibility.len());
102 match *desc.additional_column.column_type.as_ref().unwrap() {
103 ColumnType::Timestamp(_) => builder.append_n(
105 visibility.len(),
106 Some(Timestamptz::default().to_scalar_value()),
107 ),
108 ColumnType::DatabaseName(_) => {
109 builder.append_n(
110 visibility.len(),
111 Some(ScalarImpl::from(database_name.clone())),
112 );
113 }
114 ColumnType::SchemaName(_) => {
115 builder.append_n(
116 visibility.len(),
117 Some(ScalarImpl::from(schema_table_name.schema_name.clone())),
118 );
119 }
120 ColumnType::TableName(_) => {
121 builder.append_n(
122 visibility.len(),
123 Some(ScalarImpl::from(schema_table_name.table_name.clone())),
124 );
125 }
126 _ => {
128 builder.append_n_null(visibility.len());
129 }
130 }
131 columns.push(builder.finish().into());
132 }
133 StreamChunk::with_visibility(ops, columns, visibility)
134}
135
136impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {
137 #[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
138 async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) {
139 let primary_keys = self
140 .table
141 .pk_indices()
142 .iter()
143 .map(|idx| {
144 let f = &self.table.schema().fields[*idx];
145 f.name.clone()
146 })
147 .collect_vec();
148
149 if args.rate_limit_rps == Some(0) {
151 let future = futures::future::pending::<()>();
154 future.await;
155 unreachable!();
156 }
157
158 let rate_limiter = RateLimiter::new(
159 args.rate_limit_rps
160 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
161 .into(),
162 );
163
164 let mut read_args = args;
165 let schema_table_name = read_args.schema_table_name.clone();
166 let database_name = read_args.database_name.clone();
167 loop {
169 tracing::debug!(
170 "snapshot_read primary keys: {:?}, current_pos: {:?}",
171 primary_keys,
172 read_args.current_pos
173 );
174
175 let mut read_count: usize = 0;
176 let row_stream = self.reader.snapshot_read(
177 self.table.schema_table_name(),
178 read_args.current_pos.clone(),
179 primary_keys.clone(),
180 batch_size,
181 );
182
183 pin_mut!(row_stream);
184 let mut builder = DataChunkBuilder::new(
185 self.table.schema().data_types(),
186 limited_chunk_size(read_args.rate_limit_rps),
187 );
188 let chunk_stream = iter_chunks(row_stream, &mut builder);
189 let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default();
190
191 #[for_await]
192 for chunk in chunk_stream {
193 let chunk = chunk?;
194 let chunk_size = chunk.capacity();
195 read_count += chunk.cardinality();
196 current_pk_pos = get_new_pos(&chunk, &read_args.pk_indices);
197
198 if read_args.rate_limit_rps.is_none() || chunk_size == 0 {
199 yield Some(with_additional_columns(
201 chunk,
202 &read_args.additional_columns,
203 schema_table_name.clone(),
204 database_name.clone(),
205 ));
206 continue;
207 } else {
208 let limit = read_args.rate_limit_rps.unwrap() as usize;
211
212 assert!(chunk_size <= limit);
215
216 rate_limiter.wait(chunk_size as _).await;
218 yield Some(with_additional_columns(
219 chunk,
220 &read_args.additional_columns,
221 schema_table_name.clone(),
222 database_name.clone(),
223 ));
224 }
225 }
226
227 if read_count < batch_size as _ {
229 tracing::debug!("finished loading of full table snapshot");
230 yield None;
231 unreachable!()
232 } else {
233 read_args.current_pos = Some(current_pk_pos);
235 }
236 }
237 }
238
239 async fn current_cdc_offset(&self) -> StreamExecutorResult<Option<CdcOffset>> {
240 let binlog = self.reader.current_cdc_offset();
241 let binlog = binlog.await?;
242 Ok(Some(binlog))
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use std::collections::HashMap;
249
250 use futures::pin_mut;
251 use futures_async_stream::for_await;
252 use maplit::{convert_args, hashmap};
253 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
254 use risingwave_common::row::OwnedRow;
255 use risingwave_common::types::{DataType, ScalarImpl};
256 use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
257 use risingwave_connector::source::cdc::external::mysql::MySqlExternalTableReader;
258 use risingwave_connector::source::cdc::external::{
259 ExternalTableConfig, ExternalTableReader, SchemaTableName,
260 };
261
262 use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
263
264 #[ignore]
265 #[tokio::test]
266 async fn test_mysql_table_reader() {
267 let columns = vec![
268 ColumnDesc::named("o_orderkey", ColumnId::new(1), DataType::Int64),
269 ColumnDesc::named("o_custkey", ColumnId::new(2), DataType::Int64),
270 ColumnDesc::named("o_orderstatus", ColumnId::new(3), DataType::Varchar),
271 ];
272 let rw_schema = Schema {
273 fields: columns.iter().map(Field::from).collect(),
274 };
275 let props: HashMap<String, String> = convert_args!(hashmap!(
276 "hostname" => "localhost",
277 "port" => "8306",
278 "username" => "root",
279 "password" => "123456",
280 "database.name" => "mydb",
281 "table.name" => "orders_rw"));
282
283 let config =
284 serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
285 .unwrap();
286 let reader = MySqlExternalTableReader::new(config, rw_schema.clone())
287 .await
288 .unwrap();
289
290 let mut cnt: usize = 0;
291 let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))]));
292 loop {
293 let row_stream = reader.snapshot_read(
294 SchemaTableName {
295 schema_name: "mydb".to_owned(),
296 table_name: "orders_rw".to_owned(),
297 },
298 start_pk.clone(),
299 vec!["o_orderkey".to_owned()],
300 1000,
301 );
302 let mut builder = DataChunkBuilder::new(rw_schema.clone().data_types(), 256);
303 let chunk_stream = iter_chunks(row_stream, &mut builder);
304 let pk_indices = vec![0];
305 pin_mut!(chunk_stream);
306 #[for_await]
307 for chunk in chunk_stream {
308 let chunk = chunk.expect("data");
309 start_pk = Some(get_new_pos(&chunk, &pk_indices));
310 cnt += chunk.capacity();
311 println!("cnt: {}", cnt);
313 }
314 if cnt >= 1499900 {
315 println!("bye!");
316 break;
317 }
318 }
319 }
320}