risingwave_stream/executor/backfill/cdc/upstream_table/
snapshot.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use futures::{Stream, pin_mut};
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{Scalar, ScalarImpl, Timestamptz};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::RateLimiter;
26use risingwave_connector::source::cdc::external::{
27    CdcOffset, ExternalTableReader, ExternalTableReaderImpl, SchemaTableName,
28};
29use risingwave_pb::plan_common::additional_column::ColumnType;
30
31use super::external::ExternalStorageTable;
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
34use crate::executor::{StreamExecutorError, StreamExecutorResult};
35
36pub trait UpstreamTableRead {
37    fn snapshot_read_full_table(
38        &self,
39        args: SnapshotReadArgs,
40        batch_size: u32,
41    ) -> impl Stream<Item = StreamExecutorResult<Option<StreamChunk>>> + Send + '_;
42
43    fn current_cdc_offset(
44        &self,
45    ) -> impl Future<Output = StreamExecutorResult<Option<CdcOffset>>> + Send + '_;
46
47    async fn disconnect(self) -> StreamExecutorResult<()>;
48}
49
50#[derive(Debug, Clone)]
51pub struct SnapshotReadArgs {
52    pub current_pos: Option<OwnedRow>,
53    pub rate_limit_rps: Option<u32>,
54    pub pk_indices: Vec<usize>,
55    pub additional_columns: Vec<ColumnDesc>,
56    pub schema_table_name: SchemaTableName,
57    pub database_name: String,
58}
59
60impl SnapshotReadArgs {
61    pub fn new(
62        current_pos: Option<OwnedRow>,
63        rate_limit_rps: Option<u32>,
64        pk_indices: Vec<usize>,
65        additional_columns: Vec<ColumnDesc>,
66        schema_table_name: SchemaTableName,
67        database_name: String,
68    ) -> Self {
69        Self {
70            current_pos,
71            rate_limit_rps,
72            pk_indices,
73            additional_columns,
74            schema_table_name,
75            database_name,
76        }
77    }
78}
79
80/// A wrapper of upstream table for snapshot read
81/// because we need to customize the snapshot read for managed upstream table (e.g. mv, index)
82/// and external upstream table.
83pub struct UpstreamTableReader<T> {
84    table: T,
85    pub(crate) reader: ExternalTableReaderImpl,
86}
87
88impl<T> UpstreamTableReader<T> {
89    pub fn new(table: T, reader: ExternalTableReaderImpl) -> Self {
90        Self { table, reader }
91    }
92}
93
94/// Append additional columns with value as null to the snapshot chunk
95fn with_additional_columns(
96    snapshot_chunk: StreamChunk,
97    additional_columns: &[ColumnDesc],
98    schema_table_name: SchemaTableName,
99    database_name: String,
100) -> StreamChunk {
101    let (ops, mut columns, visibility) = snapshot_chunk.into_inner();
102    for desc in additional_columns {
103        let mut builder = desc.data_type.create_array_builder(visibility.len());
104        match *desc.additional_column.column_type.as_ref().unwrap() {
105            // set default value for timestamp
106            ColumnType::Timestamp(_) => builder.append_n(
107                visibility.len(),
108                Some(Timestamptz::default().to_scalar_value()),
109            ),
110            ColumnType::DatabaseName(_) => {
111                builder.append_n(
112                    visibility.len(),
113                    Some(ScalarImpl::from(database_name.clone())),
114                );
115            }
116            ColumnType::SchemaName(_) => {
117                builder.append_n(
118                    visibility.len(),
119                    Some(ScalarImpl::from(schema_table_name.schema_name.clone())),
120                );
121            }
122            ColumnType::TableName(_) => {
123                builder.append_n(
124                    visibility.len(),
125                    Some(ScalarImpl::from(schema_table_name.table_name.clone())),
126                );
127            }
128            // set null for other additional columns
129            _ => {
130                builder.append_n_null(visibility.len());
131            }
132        }
133        columns.push(builder.finish().into());
134    }
135    StreamChunk::with_visibility(ops, columns, visibility)
136}
137
138impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {
139    #[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
140    async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) {
141        let primary_keys = self
142            .table
143            .pk_indices()
144            .iter()
145            .map(|idx| {
146                let f = &self.table.schema().fields[*idx];
147                f.name.clone()
148            })
149            .collect_vec();
150
151        // prepare rate limiter
152        if args.rate_limit_rps == Some(0) {
153            // If limit is 0, we should not read any data from the upstream table.
154            // Keep waiting util the stream is rebuilt.
155            let future = futures::future::pending::<()>();
156            future.await;
157            unreachable!();
158        }
159
160        let rate_limiter = RateLimiter::new(
161            args.rate_limit_rps
162                .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
163                .into(),
164        );
165
166        let mut read_args = args;
167        let schema_table_name = read_args.schema_table_name.clone();
168        let database_name = read_args.database_name.clone();
169        // loop to read all data from the table
170        loop {
171            tracing::debug!(
172                "snapshot_read primary keys: {:?}, current_pos: {:?}",
173                primary_keys,
174                read_args.current_pos
175            );
176
177            let mut read_count: usize = 0;
178            let row_stream = self.reader.snapshot_read(
179                self.table.schema_table_name(),
180                read_args.current_pos.clone(),
181                primary_keys.clone(),
182                batch_size,
183            );
184
185            pin_mut!(row_stream);
186            let mut builder = DataChunkBuilder::new(
187                self.table.schema().data_types(),
188                limited_chunk_size(read_args.rate_limit_rps),
189            );
190            let chunk_stream = iter_chunks(row_stream, &mut builder);
191            let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default();
192
193            #[for_await]
194            for chunk in chunk_stream {
195                let chunk = chunk?;
196                let chunk_size = chunk.capacity();
197                read_count += chunk.cardinality();
198                current_pk_pos = get_new_pos(&chunk, &read_args.pk_indices);
199
200                if read_args.rate_limit_rps.is_none() || chunk_size == 0 {
201                    // no limit, or empty chunk
202                    yield Some(with_additional_columns(
203                        chunk,
204                        &read_args.additional_columns,
205                        schema_table_name.clone(),
206                        database_name.clone(),
207                    ));
208                    continue;
209                } else {
210                    // Apply rate limit, see `risingwave_stream::executor::source::apply_rate_limit` for more.
211                    // May be should be refactored to a common function later.
212                    let limit = read_args.rate_limit_rps.unwrap() as usize;
213
214                    // Because we produce chunks with limited-sized data chunk builder and all rows
215                    // are `Insert`s, the chunk size should never exceed the limit.
216                    assert!(chunk_size <= limit);
217
218                    // `InsufficientCapacity` should never happen because we have check the cardinality
219                    rate_limiter.wait(chunk_size as _).await;
220                    yield Some(with_additional_columns(
221                        chunk,
222                        &read_args.additional_columns,
223                        schema_table_name.clone(),
224                        database_name.clone(),
225                    ));
226                }
227            }
228
229            // check read_count if the snapshot batch is finished
230            if read_count < batch_size as _ {
231                tracing::debug!("finished loading of full table snapshot");
232                yield None;
233                unreachable!()
234            } else {
235                // update PK position and continue to read the table
236                read_args.current_pos = Some(current_pk_pos);
237            }
238        }
239    }
240
241    async fn current_cdc_offset(&self) -> StreamExecutorResult<Option<CdcOffset>> {
242        let binlog = self.reader.current_cdc_offset();
243        let binlog = binlog.await?;
244        Ok(Some(binlog))
245    }
246
247    async fn disconnect(self) -> StreamExecutorResult<()> {
248        self.reader.disconnect().await?;
249        Ok(())
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use std::collections::HashMap;
256
257    use futures::pin_mut;
258    use futures_async_stream::for_await;
259    use maplit::{convert_args, hashmap};
260    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
261    use risingwave_common::row::OwnedRow;
262    use risingwave_common::types::{DataType, ScalarImpl};
263    use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
264    use risingwave_connector::source::cdc::external::mysql::MySqlExternalTableReader;
265    use risingwave_connector::source::cdc::external::{
266        ExternalTableConfig, ExternalTableReader, SchemaTableName,
267    };
268
269    use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
270
271    #[ignore]
272    #[tokio::test]
273    async fn test_mysql_table_reader() {
274        let columns = vec![
275            ColumnDesc::named("o_orderkey", ColumnId::new(1), DataType::Int64),
276            ColumnDesc::named("o_custkey", ColumnId::new(2), DataType::Int64),
277            ColumnDesc::named("o_orderstatus", ColumnId::new(3), DataType::Varchar),
278        ];
279        let rw_schema = Schema {
280            fields: columns.iter().map(Field::from).collect(),
281        };
282        let props: HashMap<String, String> = convert_args!(hashmap!(
283                "hostname" => "localhost",
284                "port" => "8306",
285                "username" => "root",
286                "password" => "123456",
287                "database.name" => "mydb",
288                "table.name" => "orders_rw"));
289
290        let config =
291            serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
292                .unwrap();
293        let reader = MySqlExternalTableReader::new(config, rw_schema.clone()).unwrap();
294
295        let mut cnt: usize = 0;
296        let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))]));
297        loop {
298            let row_stream = reader.snapshot_read(
299                SchemaTableName {
300                    schema_name: "mydb".to_owned(),
301                    table_name: "orders_rw".to_owned(),
302                },
303                start_pk.clone(),
304                vec!["o_orderkey".to_owned()],
305                1000,
306            );
307            let mut builder = DataChunkBuilder::new(rw_schema.clone().data_types(), 256);
308            let chunk_stream = iter_chunks(row_stream, &mut builder);
309            let pk_indices = vec![0];
310            pin_mut!(chunk_stream);
311            #[for_await]
312            for chunk in chunk_stream {
313                let chunk = chunk.expect("data");
314                start_pk = Some(get_new_pos(&chunk, &pk_indices));
315                cnt += chunk.capacity();
316                // println!("chunk: {:#?}", chunk);
317                println!("cnt: {}", cnt);
318            }
319            if cnt >= 1499900 {
320                println!("bye!");
321                break;
322            }
323        }
324    }
325}