risingwave_stream/executor/backfill/cdc/upstream_table/
snapshot.rs1use std::future::Future;
16
17use futures::{Stream, pin_mut};
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{Scalar, ScalarImpl, Timestamptz};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::RateLimiter;
26use risingwave_connector::source::cdc::external::{
27 CdcOffset, ExternalTableReader, ExternalTableReaderImpl, SchemaTableName,
28};
29use risingwave_pb::plan_common::additional_column::ColumnType;
30
31use super::external::ExternalStorageTable;
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
34use crate::executor::{StreamExecutorError, StreamExecutorResult};
35
36pub trait UpstreamTableRead {
37 fn snapshot_read_full_table(
38 &self,
39 args: SnapshotReadArgs,
40 batch_size: u32,
41 ) -> impl Stream<Item = StreamExecutorResult<Option<StreamChunk>>> + Send + '_;
42
43 fn current_cdc_offset(
44 &self,
45 ) -> impl Future<Output = StreamExecutorResult<Option<CdcOffset>>> + Send + '_;
46
47 async fn disconnect(self) -> StreamExecutorResult<()>;
48}
49
50#[derive(Debug, Clone)]
51pub struct SnapshotReadArgs {
52 pub current_pos: Option<OwnedRow>,
53 pub rate_limit_rps: Option<u32>,
54 pub pk_indices: Vec<usize>,
55 pub additional_columns: Vec<ColumnDesc>,
56 pub schema_table_name: SchemaTableName,
57 pub database_name: String,
58}
59
60impl SnapshotReadArgs {
61 pub fn new(
62 current_pos: Option<OwnedRow>,
63 rate_limit_rps: Option<u32>,
64 pk_indices: Vec<usize>,
65 additional_columns: Vec<ColumnDesc>,
66 schema_table_name: SchemaTableName,
67 database_name: String,
68 ) -> Self {
69 Self {
70 current_pos,
71 rate_limit_rps,
72 pk_indices,
73 additional_columns,
74 schema_table_name,
75 database_name,
76 }
77 }
78}
79
80pub struct UpstreamTableReader<T> {
84 table: T,
85 pub(crate) reader: ExternalTableReaderImpl,
86}
87
88impl<T> UpstreamTableReader<T> {
89 pub fn new(table: T, reader: ExternalTableReaderImpl) -> Self {
90 Self { table, reader }
91 }
92}
93
94fn with_additional_columns(
96 snapshot_chunk: StreamChunk,
97 additional_columns: &[ColumnDesc],
98 schema_table_name: SchemaTableName,
99 database_name: String,
100) -> StreamChunk {
101 let (ops, mut columns, visibility) = snapshot_chunk.into_inner();
102 for desc in additional_columns {
103 let mut builder = desc.data_type.create_array_builder(visibility.len());
104 match *desc.additional_column.column_type.as_ref().unwrap() {
105 ColumnType::Timestamp(_) => builder.append_n(
107 visibility.len(),
108 Some(Timestamptz::default().to_scalar_value()),
109 ),
110 ColumnType::DatabaseName(_) => {
111 builder.append_n(
112 visibility.len(),
113 Some(ScalarImpl::from(database_name.clone())),
114 );
115 }
116 ColumnType::SchemaName(_) => {
117 builder.append_n(
118 visibility.len(),
119 Some(ScalarImpl::from(schema_table_name.schema_name.clone())),
120 );
121 }
122 ColumnType::TableName(_) => {
123 builder.append_n(
124 visibility.len(),
125 Some(ScalarImpl::from(schema_table_name.table_name.clone())),
126 );
127 }
128 _ => {
130 builder.append_n_null(visibility.len());
131 }
132 }
133 columns.push(builder.finish().into());
134 }
135 StreamChunk::with_visibility(ops, columns, visibility)
136}
137
138impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {
139 #[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
140 async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) {
141 let primary_keys = self
142 .table
143 .pk_indices()
144 .iter()
145 .map(|idx| {
146 let f = &self.table.schema().fields[*idx];
147 f.name.clone()
148 })
149 .collect_vec();
150
151 if args.rate_limit_rps == Some(0) {
153 let future = futures::future::pending::<()>();
156 future.await;
157 unreachable!();
158 }
159
160 let rate_limiter = RateLimiter::new(
161 args.rate_limit_rps
162 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
163 .into(),
164 );
165
166 let mut read_args = args;
167 let schema_table_name = read_args.schema_table_name.clone();
168 let database_name = read_args.database_name.clone();
169 loop {
171 tracing::debug!(
172 "snapshot_read primary keys: {:?}, current_pos: {:?}",
173 primary_keys,
174 read_args.current_pos
175 );
176
177 let mut read_count: usize = 0;
178 let row_stream = self.reader.snapshot_read(
179 self.table.schema_table_name(),
180 read_args.current_pos.clone(),
181 primary_keys.clone(),
182 batch_size,
183 );
184
185 pin_mut!(row_stream);
186 let mut builder = DataChunkBuilder::new(
187 self.table.schema().data_types(),
188 limited_chunk_size(read_args.rate_limit_rps),
189 );
190 let chunk_stream = iter_chunks(row_stream, &mut builder);
191 let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default();
192
193 #[for_await]
194 for chunk in chunk_stream {
195 let chunk = chunk?;
196 let chunk_size = chunk.capacity();
197 read_count += chunk.cardinality();
198 current_pk_pos = get_new_pos(&chunk, &read_args.pk_indices);
199
200 if read_args.rate_limit_rps.is_none() || chunk_size == 0 {
201 yield Some(with_additional_columns(
203 chunk,
204 &read_args.additional_columns,
205 schema_table_name.clone(),
206 database_name.clone(),
207 ));
208 continue;
209 } else {
210 let limit = read_args.rate_limit_rps.unwrap() as usize;
213
214 assert!(chunk_size <= limit);
217
218 rate_limiter.wait(chunk_size as _).await;
220 yield Some(with_additional_columns(
221 chunk,
222 &read_args.additional_columns,
223 schema_table_name.clone(),
224 database_name.clone(),
225 ));
226 }
227 }
228
229 if read_count < batch_size as _ {
231 tracing::debug!("finished loading of full table snapshot");
232 yield None;
233 unreachable!()
234 } else {
235 read_args.current_pos = Some(current_pk_pos);
237 }
238 }
239 }
240
241 async fn current_cdc_offset(&self) -> StreamExecutorResult<Option<CdcOffset>> {
242 let binlog = self.reader.current_cdc_offset();
243 let binlog = binlog.await?;
244 Ok(Some(binlog))
245 }
246
247 async fn disconnect(self) -> StreamExecutorResult<()> {
248 self.reader.disconnect().await?;
249 Ok(())
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use std::collections::HashMap;
256
257 use futures::pin_mut;
258 use futures_async_stream::for_await;
259 use maplit::{convert_args, hashmap};
260 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
261 use risingwave_common::row::OwnedRow;
262 use risingwave_common::types::{DataType, ScalarImpl};
263 use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
264 use risingwave_connector::source::cdc::external::mysql::MySqlExternalTableReader;
265 use risingwave_connector::source::cdc::external::{
266 ExternalTableConfig, ExternalTableReader, SchemaTableName,
267 };
268
269 use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
270
271 #[ignore]
272 #[tokio::test]
273 async fn test_mysql_table_reader() {
274 let columns = vec![
275 ColumnDesc::named("o_orderkey", ColumnId::new(1), DataType::Int64),
276 ColumnDesc::named("o_custkey", ColumnId::new(2), DataType::Int64),
277 ColumnDesc::named("o_orderstatus", ColumnId::new(3), DataType::Varchar),
278 ];
279 let rw_schema = Schema {
280 fields: columns.iter().map(Field::from).collect(),
281 };
282 let props: HashMap<String, String> = convert_args!(hashmap!(
283 "hostname" => "localhost",
284 "port" => "8306",
285 "username" => "root",
286 "password" => "123456",
287 "database.name" => "mydb",
288 "table.name" => "orders_rw"));
289
290 let config =
291 serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
292 .unwrap();
293 let reader = MySqlExternalTableReader::new(config, rw_schema.clone()).unwrap();
294
295 let mut cnt: usize = 0;
296 let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))]));
297 loop {
298 let row_stream = reader.snapshot_read(
299 SchemaTableName {
300 schema_name: "mydb".to_owned(),
301 table_name: "orders_rw".to_owned(),
302 },
303 start_pk.clone(),
304 vec!["o_orderkey".to_owned()],
305 1000,
306 );
307 let mut builder = DataChunkBuilder::new(rw_schema.clone().data_types(), 256);
308 let chunk_stream = iter_chunks(row_stream, &mut builder);
309 let pk_indices = vec![0];
310 pin_mut!(chunk_stream);
311 #[for_await]
312 for chunk in chunk_stream {
313 let chunk = chunk.expect("data");
314 start_pk = Some(get_new_pos(&chunk, &pk_indices));
315 cnt += chunk.capacity();
316 println!("cnt: {}", cnt);
318 }
319 if cnt >= 1499900 {
320 println!("bye!");
321 break;
322 }
323 }
324 }
325}