risingwave_stream/executor/backfill/cdc/upstream_table/
snapshot.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use futures::{Stream, pin_mut};
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{Scalar, ScalarImpl, Timestamptz};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::RateLimiter;
26use risingwave_connector::source::cdc::external::{
27    CdcOffset, ExternalTableReader, ExternalTableReaderImpl, SchemaTableName,
28};
29use risingwave_pb::plan_common::additional_column::ColumnType;
30
31use super::external::ExternalStorageTable;
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
34use crate::executor::{StreamExecutorError, StreamExecutorResult};
35
36pub trait UpstreamTableRead {
37    fn snapshot_read_full_table(
38        &self,
39        args: SnapshotReadArgs,
40        batch_size: u32,
41    ) -> impl Stream<Item = StreamExecutorResult<Option<StreamChunk>>> + Send + '_;
42
43    fn current_cdc_offset(
44        &self,
45    ) -> impl Future<Output = StreamExecutorResult<Option<CdcOffset>>> + Send + '_;
46}
47
48#[derive(Debug, Clone)]
49pub struct SnapshotReadArgs {
50    pub current_pos: Option<OwnedRow>,
51    pub rate_limit_rps: Option<u32>,
52    pub pk_indices: Vec<usize>,
53    pub additional_columns: Vec<ColumnDesc>,
54    pub schema_table_name: SchemaTableName,
55    pub database_name: String,
56}
57
58impl SnapshotReadArgs {
59    pub fn new(
60        current_pos: Option<OwnedRow>,
61        rate_limit_rps: Option<u32>,
62        pk_indices: Vec<usize>,
63        additional_columns: Vec<ColumnDesc>,
64        schema_table_name: SchemaTableName,
65        database_name: String,
66    ) -> Self {
67        Self {
68            current_pos,
69            rate_limit_rps,
70            pk_indices,
71            additional_columns,
72            schema_table_name,
73            database_name,
74        }
75    }
76}
77
78/// A wrapper of upstream table for snapshot read
79/// because we need to customize the snapshot read for managed upstream table (e.g. mv, index)
80/// and external upstream table.
81pub struct UpstreamTableReader<T> {
82    table: T,
83    pub(crate) reader: ExternalTableReaderImpl,
84}
85
86impl<T> UpstreamTableReader<T> {
87    pub fn new(table: T, reader: ExternalTableReaderImpl) -> Self {
88        Self { table, reader }
89    }
90}
91
92/// Append additional columns with value as null to the snapshot chunk
93fn with_additional_columns(
94    snapshot_chunk: StreamChunk,
95    additional_columns: &[ColumnDesc],
96    schema_table_name: SchemaTableName,
97    database_name: String,
98) -> StreamChunk {
99    let (ops, mut columns, visibility) = snapshot_chunk.into_inner();
100    for desc in additional_columns {
101        let mut builder = desc.data_type.create_array_builder(visibility.len());
102        match *desc.additional_column.column_type.as_ref().unwrap() {
103            // set default value for timestamp
104            ColumnType::Timestamp(_) => builder.append_n(
105                visibility.len(),
106                Some(Timestamptz::default().to_scalar_value()),
107            ),
108            ColumnType::DatabaseName(_) => {
109                builder.append_n(
110                    visibility.len(),
111                    Some(ScalarImpl::from(database_name.clone())),
112                );
113            }
114            ColumnType::SchemaName(_) => {
115                builder.append_n(
116                    visibility.len(),
117                    Some(ScalarImpl::from(schema_table_name.schema_name.clone())),
118                );
119            }
120            ColumnType::TableName(_) => {
121                builder.append_n(
122                    visibility.len(),
123                    Some(ScalarImpl::from(schema_table_name.table_name.clone())),
124                );
125            }
126            // set null for other additional columns
127            _ => {
128                builder.append_n_null(visibility.len());
129            }
130        }
131        columns.push(builder.finish().into());
132    }
133    StreamChunk::with_visibility(ops, columns, visibility)
134}
135
136impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {
137    #[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
138    async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) {
139        let primary_keys = self
140            .table
141            .pk_indices()
142            .iter()
143            .map(|idx| {
144                let f = &self.table.schema().fields[*idx];
145                f.name.clone()
146            })
147            .collect_vec();
148
149        // prepare rate limiter
150        if args.rate_limit_rps == Some(0) {
151            // If limit is 0, we should not read any data from the upstream table.
152            // Keep waiting util the stream is rebuilt.
153            let future = futures::future::pending::<()>();
154            future.await;
155            unreachable!();
156        }
157
158        let rate_limiter = RateLimiter::new(
159            args.rate_limit_rps
160                .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
161                .into(),
162        );
163
164        let mut read_args = args;
165        let schema_table_name = read_args.schema_table_name.clone();
166        let database_name = read_args.database_name.clone();
167        // loop to read all data from the table
168        loop {
169            tracing::debug!(
170                "snapshot_read primary keys: {:?}, current_pos: {:?}",
171                primary_keys,
172                read_args.current_pos
173            );
174
175            let mut read_count: usize = 0;
176            let row_stream = self.reader.snapshot_read(
177                self.table.schema_table_name(),
178                read_args.current_pos.clone(),
179                primary_keys.clone(),
180                batch_size,
181            );
182
183            pin_mut!(row_stream);
184            let mut builder = DataChunkBuilder::new(
185                self.table.schema().data_types(),
186                limited_chunk_size(read_args.rate_limit_rps),
187            );
188            let chunk_stream = iter_chunks(row_stream, &mut builder);
189            let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default();
190
191            #[for_await]
192            for chunk in chunk_stream {
193                let chunk = chunk?;
194                let chunk_size = chunk.capacity();
195                read_count += chunk.cardinality();
196                current_pk_pos = get_new_pos(&chunk, &read_args.pk_indices);
197
198                if read_args.rate_limit_rps.is_none() || chunk_size == 0 {
199                    // no limit, or empty chunk
200                    yield Some(with_additional_columns(
201                        chunk,
202                        &read_args.additional_columns,
203                        schema_table_name.clone(),
204                        database_name.clone(),
205                    ));
206                    continue;
207                } else {
208                    // Apply rate limit, see `risingwave_stream::executor::source::apply_rate_limit` for more.
209                    // May be should be refactored to a common function later.
210                    let limit = read_args.rate_limit_rps.unwrap() as usize;
211
212                    // Because we produce chunks with limited-sized data chunk builder and all rows
213                    // are `Insert`s, the chunk size should never exceed the limit.
214                    assert!(chunk_size <= limit);
215
216                    // `InsufficientCapacity` should never happen because we have check the cardinality
217                    rate_limiter.wait(chunk_size as _).await;
218                    yield Some(with_additional_columns(
219                        chunk,
220                        &read_args.additional_columns,
221                        schema_table_name.clone(),
222                        database_name.clone(),
223                    ));
224                }
225            }
226
227            // check read_count if the snapshot batch is finished
228            if read_count < batch_size as _ {
229                tracing::debug!("finished loading of full table snapshot");
230                yield None;
231                unreachable!()
232            } else {
233                // update PK position and continue to read the table
234                read_args.current_pos = Some(current_pk_pos);
235            }
236        }
237    }
238
239    async fn current_cdc_offset(&self) -> StreamExecutorResult<Option<CdcOffset>> {
240        let binlog = self.reader.current_cdc_offset();
241        let binlog = binlog.await?;
242        Ok(Some(binlog))
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use std::collections::HashMap;
249
250    use futures::pin_mut;
251    use futures_async_stream::for_await;
252    use maplit::{convert_args, hashmap};
253    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
254    use risingwave_common::row::OwnedRow;
255    use risingwave_common::types::{DataType, ScalarImpl};
256    use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
257    use risingwave_connector::source::cdc::external::mysql::MySqlExternalTableReader;
258    use risingwave_connector::source::cdc::external::{
259        ExternalTableConfig, ExternalTableReader, SchemaTableName,
260    };
261
262    use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
263
264    #[ignore]
265    #[tokio::test]
266    async fn test_mysql_table_reader() {
267        let columns = vec![
268            ColumnDesc::named("o_orderkey", ColumnId::new(1), DataType::Int64),
269            ColumnDesc::named("o_custkey", ColumnId::new(2), DataType::Int64),
270            ColumnDesc::named("o_orderstatus", ColumnId::new(3), DataType::Varchar),
271        ];
272        let rw_schema = Schema {
273            fields: columns.iter().map(Field::from).collect(),
274        };
275        let props: HashMap<String, String> = convert_args!(hashmap!(
276                "hostname" => "localhost",
277                "port" => "8306",
278                "username" => "root",
279                "password" => "123456",
280                "database.name" => "mydb",
281                "table.name" => "orders_rw"));
282
283        let config =
284            serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
285                .unwrap();
286        let reader = MySqlExternalTableReader::new(config, rw_schema.clone())
287            .await
288            .unwrap();
289
290        let mut cnt: usize = 0;
291        let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))]));
292        loop {
293            let row_stream = reader.snapshot_read(
294                SchemaTableName {
295                    schema_name: "mydb".to_owned(),
296                    table_name: "orders_rw".to_owned(),
297                },
298                start_pk.clone(),
299                vec!["o_orderkey".to_owned()],
300                1000,
301            );
302            let mut builder = DataChunkBuilder::new(rw_schema.clone().data_types(), 256);
303            let chunk_stream = iter_chunks(row_stream, &mut builder);
304            let pk_indices = vec![0];
305            pin_mut!(chunk_stream);
306            #[for_await]
307            for chunk in chunk_stream {
308                let chunk = chunk.expect("data");
309                start_pk = Some(get_new_pos(&chunk, &pk_indices));
310                cnt += chunk.capacity();
311                // println!("chunk: {:#?}", chunk);
312                println!("cnt: {}", cnt);
313            }
314            if cnt >= 1499900 {
315                println!("bye!");
316                break;
317            }
318        }
319    }
320}