risingwave_stream/executor/source/
reader_stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{ColumnId, TableId};
20use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
21use risingwave_connector::source::reader::desc::SourceDesc;
22use risingwave_connector::source::{
23    BoxSourceChunkStream, ConnectorState, CreateSplitReaderResult, SourceContext, SourceCtrlOpts,
24    SplitMetaData, StreamChunkWithState,
25};
26use thiserror_ext::AsReport;
27use tokio::sync::{mpsc, oneshot};
28
29use super::{apply_rate_limit, get_split_offset_col_idx};
30use crate::common::rate_limit::limited_chunk_size;
31use crate::executor::prelude::*;
32
33pub(crate) struct StreamReaderBuilder {
34    pub source_desc: SourceDesc,
35    pub rate_limit: Option<u32>,
36    pub source_id: TableId,
37    pub source_name: String,
38    pub reader_stream: Option<BoxSourceChunkStream>,
39
40    // cdc related
41    pub is_auto_schema_change_enable: bool,
42    pub actor_ctx: ActorContextRef,
43}
44
45impl StreamReaderBuilder {
46    fn prepare_source_stream_build(&self) -> (Vec<ColumnId>, SourceContext) {
47        let column_ids = self
48            .source_desc
49            .columns
50            .iter()
51            .map(|column_desc| column_desc.column_id)
52            .collect_vec();
53
54        let (schema_change_tx, mut schema_change_rx) =
55            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
56        let schema_change_tx = if self.is_auto_schema_change_enable {
57            let meta_client = self.actor_ctx.meta_client.clone();
58            // spawn a task to handle schema change event from source parser
59            let _join_handle = tokio::task::spawn(async move {
60                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
61                    let table_ids = schema_change.table_ids();
62                    tracing::info!(
63                        target: "auto_schema_change",
64                        "recv a schema change event for tables: {:?}", table_ids);
65                    // TODO: retry on rpc error
66                    if let Some(ref meta_client) = meta_client {
67                        match meta_client
68                            .auto_schema_change(schema_change.to_protobuf())
69                            .await
70                        {
71                            Ok(_) => {
72                                tracing::info!(
73                                    target: "auto_schema_change",
74                                    "schema change success for tables: {:?}", table_ids);
75                                finish_tx.send(()).unwrap();
76                            }
77                            Err(e) => {
78                                tracing::error!(
79                                    target: "auto_schema_change",
80                                    error = %e.as_report(), "schema change error");
81                                finish_tx.send(()).unwrap();
82                            }
83                        }
84                    }
85                }
86            });
87            Some(schema_change_tx)
88        } else {
89            info!("auto schema change is disabled in config");
90            None
91        };
92
93        let source_ctx = SourceContext::new(
94            self.actor_ctx.id,
95            self.source_id,
96            self.actor_ctx.fragment_id,
97            self.source_name.clone(),
98            self.source_desc.metrics.clone(),
99            SourceCtrlOpts {
100                chunk_size: limited_chunk_size(self.rate_limit),
101                split_txn: self.rate_limit.is_some(), // when rate limiting, we may split txn
102            },
103            self.source_desc.source.config.clone(),
104            schema_change_tx,
105        );
106
107        (column_ids, source_ctx)
108    }
109
110    pub(crate) async fn fetch_latest_splits(
111        &mut self,
112        state: ConnectorState,
113        seek_to_latest: bool,
114    ) -> StreamExecutorResult<CreateSplitReaderResult> {
115        let (column_ids, source_ctx) = self.prepare_source_stream_build();
116        let source_ctx_ref = Arc::new(source_ctx);
117        let (stream, res) = self
118            .source_desc
119            .source
120            .build_stream(
121                state.clone(),
122                column_ids.clone(),
123                source_ctx_ref.clone(),
124                seek_to_latest,
125            )
126            .await
127            .map_err(StreamExecutorError::connector_error)?;
128        self.reader_stream = Some(stream);
129        Ok(res)
130    }
131
132    #[try_stream(ok = StreamChunkWithState, error = StreamExecutorError)]
133    pub(crate) async fn into_retry_stream(mut self, state: ConnectorState, is_initial_build: bool) {
134        let (column_ids, source_ctx) = self.prepare_source_stream_build();
135        let source_ctx_ref = Arc::new(source_ctx);
136
137        let mut latest_splits_info = {
138            if let Some(splits) = state.as_ref() {
139                splits
140                    .iter()
141                    .map(|split| (split.id(), split.clone()))
142                    .collect::<HashMap<_, _>>()
143            } else {
144                HashMap::new()
145            }
146        };
147
148        let (Some(split_idx), Some(offset_idx)) =
149            get_split_offset_col_idx(&self.source_desc.columns)
150        else {
151            unreachable!("Partition and offset columns must be set.");
152        };
153
154        'build_consume_loop: loop {
155            let bootstrap_state = if latest_splits_info.is_empty() {
156                None
157            } else {
158                Some(latest_splits_info.values().cloned().collect_vec())
159            };
160            tracing::debug!(
161                "build stream source reader with state: {:?}",
162                bootstrap_state
163            );
164            let build_stream_result = if let Some(exist_stream) = self.reader_stream.take() {
165                Ok((exist_stream, CreateSplitReaderResult::default()))
166            } else {
167                self.source_desc
168                    .source
169                    .build_stream(
170                        bootstrap_state,
171                        column_ids.clone(),
172                        source_ctx_ref.clone(),
173                        // just `seek_to_latest` for initial build
174                        is_initial_build,
175                    )
176                    .await
177            };
178            if let Err(e) = build_stream_result {
179                if is_initial_build {
180                    return Err(StreamExecutorError::connector_error(e));
181                } else {
182                    tracing::warn!(
183                        error = %e.as_report(),
184                        source_name = self.source_name,
185                        source_id = self.source_id.table_id,
186                        actor_id = self.actor_ctx.id,
187                        "build stream source reader error, retry in 1s"
188                    );
189                    tokio::time::sleep(Duration::from_secs(1)).await;
190                    continue 'build_consume_loop;
191                }
192            }
193
194            let (stream, _) = build_stream_result.unwrap();
195            let stream = apply_rate_limit(stream, self.rate_limit).boxed();
196            #[for_await]
197            'consume: for msg in stream {
198                match msg {
199                    Ok(msg) => {
200                        for (_, row) in msg.rows() {
201                            let split = row.datum_at(split_idx).unwrap().into_utf8();
202                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
203                            latest_splits_info
204                                .get_mut(&Arc::from(split.to_owned()))
205                                .map(|split_impl| split_impl.update_in_place(offset.to_owned()));
206                        }
207                        yield (msg, latest_splits_info.clone());
208                    }
209                    Err(e) => {
210                        tracing::warn!(
211                            error = %e.as_report(),
212                            source_name = self.source_name,
213                            source_id = self.source_id.table_id,
214                            actor_id = self.actor_ctx.id,
215                            "stream source reader error"
216                        );
217                        break 'consume;
218                    }
219                }
220            }
221            tracing::info!("stream source reader error, retry in 1s");
222            tokio::time::sleep(Duration::from_secs(1)).await;
223        }
224    }
225}