Skip to main content

risingwave_stream/executor/backfill/cdc/upstream_table/
snapshot.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use futures::{Stream, pin_mut};
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::{ColumnDesc, Field};
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{Scalar, ScalarImpl, Timestamptz};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::RateLimiter;
26use risingwave_connector::source::cdc::external::{
27    CdcOffset, ExternalTableReader, ExternalTableReaderImpl, SchemaTableName,
28};
29use risingwave_pb::plan_common::additional_column::ColumnType;
30
31use super::external::ExternalStorageTable;
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
34use crate::executor::{StreamExecutorError, StreamExecutorResult};
35
36pub trait UpstreamTableRead {
37    fn snapshot_read_full_table(
38        &self,
39        args: SnapshotReadArgs,
40        batch_size: u32,
41    ) -> impl Stream<Item = StreamExecutorResult<Option<StreamChunk>>> + Send + '_;
42
43    fn current_cdc_offset(
44        &self,
45    ) -> impl Future<Output = StreamExecutorResult<Option<CdcOffset>>> + Send + '_;
46
47    async fn disconnect(self) -> StreamExecutorResult<()>;
48
49    fn snapshot_read_table_split(
50        &self,
51        args: SplitSnapshotReadArgs,
52    ) -> impl Stream<Item = StreamExecutorResult<Option<StreamChunk>>> + Send + '_;
53}
54
55#[derive(Debug, Clone)]
56pub struct SnapshotReadArgs {
57    pub current_pos: Option<OwnedRow>,
58    pub rate_limit_rps: Option<u32>,
59    pub pk_indices: Vec<usize>,
60    pub additional_columns: Vec<ColumnDesc>,
61    pub schema_table_name: SchemaTableName,
62    pub database_name: String,
63}
64
65impl SnapshotReadArgs {
66    pub fn new(
67        current_pos: Option<OwnedRow>,
68        rate_limit_rps: Option<u32>,
69        pk_indices: Vec<usize>,
70        additional_columns: Vec<ColumnDesc>,
71        schema_table_name: SchemaTableName,
72        database_name: String,
73    ) -> Self {
74        Self {
75            current_pos,
76            rate_limit_rps,
77            pk_indices,
78            additional_columns,
79            schema_table_name,
80            database_name,
81        }
82    }
83}
84
85#[derive(Debug, Clone)]
86pub struct SplitSnapshotReadArgs {
87    pub left_bound_inclusive: OwnedRow,
88    pub right_bound_exclusive: OwnedRow,
89    pub split_columns: Vec<Field>,
90    pub rate_limit_rps: Option<u32>,
91    pub additional_columns: Vec<ColumnDesc>,
92    pub schema_table_name: SchemaTableName,
93    pub database_name: String,
94}
95
96impl SplitSnapshotReadArgs {
97    pub fn new(
98        left_bound_inclusive: OwnedRow,
99        right_bound_exclusive: OwnedRow,
100        split_columns: Vec<Field>,
101        rate_limit_rps: Option<u32>,
102        additional_columns: Vec<ColumnDesc>,
103        schema_table_name: SchemaTableName,
104        database_name: String,
105    ) -> Self {
106        Self {
107            left_bound_inclusive,
108            right_bound_exclusive,
109            split_columns,
110            rate_limit_rps,
111            additional_columns,
112            schema_table_name,
113            database_name,
114        }
115    }
116}
117
118/// A wrapper of upstream table for snapshot read
119/// because we need to customize the snapshot read for managed upstream table (e.g. mv, index)
120/// and external upstream table.
121pub struct UpstreamTableReader<T> {
122    table: T,
123    pub(crate) reader: ExternalTableReaderImpl,
124}
125
126impl<T> UpstreamTableReader<T> {
127    pub fn new(table: T, reader: ExternalTableReaderImpl) -> Self {
128        Self { table, reader }
129    }
130}
131
132/// Append additional columns with value as null to the snapshot chunk
133fn with_additional_columns(
134    snapshot_chunk: StreamChunk,
135    additional_columns: &[ColumnDesc],
136    schema_table_name: SchemaTableName,
137    database_name: String,
138) -> StreamChunk {
139    let (ops, mut columns, visibility) = snapshot_chunk.into_inner();
140    for desc in additional_columns {
141        let mut builder = desc.data_type.create_array_builder(visibility.len());
142        match *desc.additional_column.column_type.as_ref().unwrap() {
143            // set default value for timestamp
144            ColumnType::Timestamp(_) => builder.append_n(
145                visibility.len(),
146                Some(Timestamptz::default().to_scalar_value()),
147            ),
148            ColumnType::DatabaseName(_) => {
149                builder.append_n(
150                    visibility.len(),
151                    Some(ScalarImpl::from(database_name.clone())),
152                );
153            }
154            ColumnType::SchemaName(_) => {
155                builder.append_n(
156                    visibility.len(),
157                    Some(ScalarImpl::from(schema_table_name.schema_name.clone())),
158                );
159            }
160            ColumnType::TableName(_) => {
161                builder.append_n(
162                    visibility.len(),
163                    Some(ScalarImpl::from(schema_table_name.table_name.clone())),
164                );
165            }
166            // set null for other additional columns
167            _ => {
168                builder.append_n_null(visibility.len());
169            }
170        }
171        columns.push(builder.finish().into());
172    }
173    StreamChunk::with_visibility(ops, columns, visibility)
174}
175
176impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {
177    #[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
178    async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) {
179        let primary_keys = self
180            .table
181            .pk_indices()
182            .iter()
183            .map(|idx| {
184                let f = &self.table.schema().fields[*idx];
185                f.name.clone()
186            })
187            .collect_vec();
188
189        // prepare rate limiter
190        if args.rate_limit_rps == Some(0) {
191            // If limit is 0, we should not read any data from the upstream table.
192            // Keep waiting util the stream is rebuilt.
193            let future = futures::future::pending::<()>();
194            future.await;
195            unreachable!();
196        }
197
198        let rate_limiter = RateLimiter::new(
199            args.rate_limit_rps
200                .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
201                .into(),
202        );
203
204        let mut read_args = args;
205        let schema_table_name = read_args.schema_table_name.clone();
206        let database_name = read_args.database_name.clone();
207        // loop to read all data from the table
208        loop {
209            tracing::debug!(
210                "snapshot_read primary keys: {:?}, current_pos: {:?}",
211                primary_keys,
212                read_args.current_pos
213            );
214
215            let mut read_count: usize = 0;
216            let row_stream = self.reader.snapshot_read(
217                self.table.schema_table_name(),
218                read_args.current_pos.clone(),
219                primary_keys.clone(),
220                batch_size,
221            );
222
223            pin_mut!(row_stream);
224            let mut builder = DataChunkBuilder::new(
225                self.table.schema().data_types(),
226                limited_chunk_size(read_args.rate_limit_rps),
227            );
228            let chunk_stream = iter_chunks(row_stream, &mut builder);
229            let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default();
230
231            #[for_await]
232            for chunk in chunk_stream {
233                let chunk = chunk?;
234                let chunk_size = chunk.capacity();
235                read_count += chunk.cardinality();
236                current_pk_pos = get_new_pos(&chunk, &read_args.pk_indices);
237
238                if let Some(rate_limit_rps) = read_args.rate_limit_rps
239                    && chunk_size != 0
240                {
241                    // Apply rate limit, see `risingwave_stream::executor::source::apply_rate_limit` for more.
242                    // May be should be refactored to a common function later.
243                    let limit = rate_limit_rps as usize;
244
245                    // Because we produce chunks with limited-sized data chunk builder and all rows
246                    // are `Insert`s, the chunk size should never exceed the limit.
247                    assert!(chunk_size <= limit);
248
249                    // `InsufficientCapacity` should never happen because we have check the cardinality
250                    rate_limiter.wait(chunk_size as _).await;
251                    yield Some(with_additional_columns(
252                        chunk,
253                        &read_args.additional_columns,
254                        schema_table_name.clone(),
255                        database_name.clone(),
256                    ));
257                } else {
258                    // no limit, or empty chunk
259                    yield Some(with_additional_columns(
260                        chunk,
261                        &read_args.additional_columns,
262                        schema_table_name.clone(),
263                        database_name.clone(),
264                    ));
265                    continue;
266                }
267            }
268
269            // check read_count if the snapshot batch is finished
270            if read_count < batch_size as _ {
271                tracing::debug!("finished loading of full table snapshot");
272                yield None;
273                unreachable!()
274            } else {
275                // update PK position and continue to read the table
276                read_args.current_pos = Some(current_pk_pos);
277            }
278        }
279    }
280
281    #[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
282    async fn snapshot_read_table_split(&self, args: SplitSnapshotReadArgs) {
283        // prepare rate limiter
284        if args.rate_limit_rps == Some(0) {
285            // If limit is 0, we should not read any data from the upstream table.
286            // Keep waiting util the stream is rebuilt.
287            let future = futures::future::pending::<()>();
288            future.await;
289            unreachable!();
290        }
291
292        let rate_limiter = RateLimiter::new(
293            args.rate_limit_rps
294                .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
295                .into(),
296        );
297
298        let read_args = args;
299        let schema_table_name = read_args.schema_table_name.clone();
300        let database_name = read_args.database_name.clone();
301        // tracing::debug!(?args, "snapshot_read",);
302
303        let row_stream = self.reader.split_snapshot_read(
304            self.table.schema_table_name(),
305            read_args.left_bound_inclusive.clone(),
306            read_args.right_bound_exclusive.clone(),
307            read_args.split_columns.clone(),
308        );
309
310        pin_mut!(row_stream);
311        let mut builder = DataChunkBuilder::new(
312            self.table.schema().data_types(),
313            limited_chunk_size(read_args.rate_limit_rps),
314        );
315        let chunk_stream = iter_chunks(row_stream, &mut builder);
316
317        #[for_await]
318        for chunk in chunk_stream {
319            let chunk = chunk?;
320            let chunk_size = chunk.capacity();
321
322            if let Some(rate_limit_rps) = read_args.rate_limit_rps
323                && chunk_size != 0
324            {
325                // Apply rate limit, see `risingwave_stream::executor::source::apply_rate_limit` for more.
326                // May be should be refactored to a common function later.
327                let limit = rate_limit_rps as usize;
328
329                // Because we produce chunks with limited-sized data chunk builder and all rows
330                // are `Insert`s, the chunk size should never exceed the limit.
331                assert!(chunk_size <= limit);
332
333                // `InsufficientCapacity` should never happen because we have check the cardinality
334                rate_limiter.wait(chunk_size as _).await;
335                yield Some(with_additional_columns(
336                    chunk,
337                    &read_args.additional_columns,
338                    schema_table_name.clone(),
339                    database_name.clone(),
340                ));
341            } else {
342                // no limit, or empty chunk
343                yield Some(with_additional_columns(
344                    chunk,
345                    &read_args.additional_columns,
346                    schema_table_name.clone(),
347                    database_name.clone(),
348                ));
349            }
350        }
351        yield None;
352    }
353
354    async fn current_cdc_offset(&self) -> StreamExecutorResult<Option<CdcOffset>> {
355        let binlog = self.reader.current_cdc_offset();
356        let binlog = binlog.await?;
357        Ok(Some(binlog))
358    }
359
360    async fn disconnect(self) -> StreamExecutorResult<()> {
361        self.reader.disconnect().await?;
362        Ok(())
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use std::collections::HashMap;
369
370    use futures::pin_mut;
371    use futures_async_stream::for_await;
372    use maplit::{convert_args, hashmap};
373    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
374    use risingwave_common::row::OwnedRow;
375    use risingwave_common::types::{DataType, ScalarImpl};
376    use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
377    use risingwave_connector::source::cdc::external::mysql::MySqlExternalTableReader;
378    use risingwave_connector::source::cdc::external::{
379        ExternalTableConfig, ExternalTableReader, SchemaTableName,
380    };
381
382    use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
383
384    #[ignore]
385    #[tokio::test]
386    async fn test_mysql_table_reader() {
387        let columns = [
388            ColumnDesc::named("o_orderkey", ColumnId::new(1), DataType::Int64),
389            ColumnDesc::named("o_custkey", ColumnId::new(2), DataType::Int64),
390            ColumnDesc::named("o_orderstatus", ColumnId::new(3), DataType::Varchar),
391        ];
392        let rw_schema = Schema {
393            fields: columns.iter().map(Field::from).collect(),
394        };
395        let props: HashMap<String, String> = convert_args!(hashmap!(
396                "hostname" => "localhost",
397                "port" => "8306",
398                "username" => "root",
399                "password" => "123456",
400                "database.name" => "mydb",
401                "table.name" => "orders_rw"));
402
403        let config =
404            serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
405                .unwrap();
406        let reader = MySqlExternalTableReader::new(config, rw_schema.clone())
407            .await
408            .unwrap();
409
410        let mut cnt: usize = 0;
411        let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))]));
412        loop {
413            let row_stream = reader.snapshot_read(
414                SchemaTableName {
415                    schema_name: "mydb".to_owned(),
416                    table_name: "orders_rw".to_owned(),
417                },
418                start_pk.clone(),
419                vec!["o_orderkey".to_owned()],
420                1000,
421            );
422            let mut builder = DataChunkBuilder::new(rw_schema.clone().data_types(), 256);
423            let chunk_stream = iter_chunks(row_stream, &mut builder);
424            let pk_indices = vec![0];
425            pin_mut!(chunk_stream);
426            #[for_await]
427            for chunk in chunk_stream {
428                let chunk = chunk.expect("data");
429                start_pk = Some(get_new_pos(&chunk, &pk_indices));
430                cnt += chunk.capacity();
431                // println!("chunk: {:#?}", chunk);
432                println!("cnt: {}", cnt);
433            }
434            if cnt >= 1499900 {
435                println!("bye!");
436                break;
437            }
438        }
439    }
440}