risingwave_stream/executor/source/
reader_stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use itertools::Itertools;
19use risingwave_common::catalog::ColumnId;
20use risingwave_common::id::SourceId;
21use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
22use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
23use risingwave_connector::source::reader::desc::SourceDesc;
24use risingwave_connector::source::{
25    BoxSourceChunkStream, CdcAutoSchemaChangeFailCallback, ConnectorState, CreateSplitReaderResult,
26    SourceContext, SourceCtrlOpts, SplitMetaData, StreamChunkWithState,
27};
28use thiserror_ext::AsReport;
29use tokio::sync::{mpsc, oneshot};
30
31use super::{apply_rate_limit, get_split_offset_col_idx};
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::prelude::*;
34
35type AutoSchemaChangeSetup = (
36    Option<mpsc::Sender<(SchemaChangeEnvelope, oneshot::Sender<()>)>>,
37    Option<CdcAutoSchemaChangeFailCallback>,
38);
39
40pub(crate) struct StreamReaderBuilder {
41    pub source_desc: SourceDesc,
42    pub rate_limit: Option<u32>,
43    pub source_id: SourceId,
44    pub source_name: String,
45    pub reader_stream: Option<BoxSourceChunkStream>,
46
47    // cdc related
48    pub is_auto_schema_change_enable: bool,
49    pub actor_ctx: ActorContextRef,
50}
51
52impl StreamReaderBuilder {
53    fn setup_auto_schema_change(&self) -> AutoSchemaChangeSetup {
54        if self.is_auto_schema_change_enable {
55            let (schema_change_tx, mut schema_change_rx) =
56                mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
57            let meta_client = self.actor_ctx.meta_client.clone();
58            // spawn a task to handle schema change event from source parser
59            let _join_handle = tokio::task::spawn(async move {
60                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
61                    let table_ids = schema_change.table_ids();
62                    tracing::info!(
63                        target: "auto_schema_change",
64                        "recv a schema change event for tables: {:?}", table_ids);
65                    // TODO: retry on rpc error
66                    if let Some(ref meta_client) = meta_client {
67                        match meta_client
68                            .auto_schema_change(schema_change.to_protobuf())
69                            .await
70                        {
71                            Ok(_) => {
72                                tracing::info!(
73                                    target: "auto_schema_change",
74                                    "schema change success for tables: {:?}", table_ids);
75                                finish_tx.send(()).unwrap();
76                            }
77                            Err(e) => {
78                                tracing::error!(
79                                    target: "auto_schema_change",
80                                    error = %e.as_report(), "schema change error");
81
82                                finish_tx.send(()).unwrap();
83                            }
84                        }
85                    }
86                }
87            });
88
89            // Create callback function for reporting CDC auto schema change fail events
90            let on_cdc_auto_schema_change_failure = if let Some(ref meta_client) =
91                self.actor_ctx.meta_client
92            {
93                let meta_client = meta_client.clone();
94                Some(CdcAutoSchemaChangeFailCallback::new(
95                    move |source_id: SourceId,
96                          table_name: String,
97                          cdc_table_id: String,
98                          upstream_ddl: String,
99                          fail_info: String| {
100                        let meta_client = meta_client.clone();
101                        tokio::spawn(async move {
102                            if let Err(e) = meta_client
103                                .add_cdc_auto_schema_change_fail_event(
104                                    source_id,
105                                    table_name,
106                                    cdc_table_id,
107                                    upstream_ddl,
108                                    fail_info,
109                                )
110                                .await
111                            {
112                                tracing::warn!(
113                                    error = %e.as_report(),
114                                    %source_id,
115                                    "Failed to add CDC auto schema change fail event to event log."
116                                );
117                            }
118                        });
119                    },
120                ))
121            } else {
122                None
123            };
124
125            (Some(schema_change_tx), on_cdc_auto_schema_change_failure)
126        } else {
127            info!("auto schema change is disabled in config");
128            (None, None)
129        }
130    }
131
132    fn prepare_source_stream_build(&self) -> (Vec<ColumnId>, SourceContext) {
133        let column_ids = self
134            .source_desc
135            .columns
136            .iter()
137            .map(|column_desc| column_desc.column_id)
138            .collect_vec();
139        debug_assert!(column_ids.iter().all_unique(), "column_ids must be unique");
140
141        let (schema_change_tx, on_cdc_auto_schema_change_failure) = self.setup_auto_schema_change();
142
143        let source_ctx = SourceContext::new_with_auto_schema_change_callback(
144            self.actor_ctx.id,
145            self.source_id,
146            self.actor_ctx.fragment_id,
147            self.source_name.clone(),
148            self.source_desc.metrics.clone(),
149            SourceCtrlOpts {
150                chunk_size: limited_chunk_size(self.rate_limit),
151                split_txn: self.rate_limit.is_some(), // when rate limiting, we may split txn
152            },
153            self.source_desc.source.config.clone(),
154            schema_change_tx,
155            on_cdc_auto_schema_change_failure,
156        );
157
158        (column_ids, source_ctx)
159    }
160
161    pub(crate) async fn fetch_latest_splits(
162        &mut self,
163        state: ConnectorState,
164        seek_to_latest: bool,
165    ) -> StreamExecutorResult<CreateSplitReaderResult> {
166        let (column_ids, source_ctx) = self.prepare_source_stream_build();
167        let source_ctx_ref = Arc::new(source_ctx);
168        let (stream, res) = self
169            .source_desc
170            .source
171            .build_stream(
172                state.clone(),
173                column_ids.clone(),
174                source_ctx_ref.clone(),
175                seek_to_latest,
176            )
177            .await
178            .map_err(StreamExecutorError::connector_error)?;
179        self.reader_stream = Some(stream);
180        Ok(res)
181    }
182
183    #[try_stream(ok = StreamChunkWithState, error = StreamExecutorError)]
184    pub(crate) async fn into_retry_stream(mut self, state: ConnectorState, is_initial_build: bool) {
185        let (column_ids, source_ctx) = self.prepare_source_stream_build();
186        let source_ctx_ref = Arc::new(source_ctx);
187
188        let mut latest_splits_info = {
189            if let Some(splits) = state.as_ref() {
190                splits
191                    .iter()
192                    .map(|split| (split.id(), split.clone()))
193                    .collect::<HashMap<_, _>>()
194            } else {
195                HashMap::new()
196            }
197        };
198
199        let (Some(split_idx), Some(offset_idx), _) =
200            get_split_offset_col_idx(&self.source_desc.columns)
201        else {
202            unreachable!("Partition and offset columns must be set.");
203        };
204
205        'build_consume_loop: loop {
206            let bootstrap_state = if latest_splits_info.is_empty() {
207                None
208            } else {
209                Some(latest_splits_info.values().cloned().collect_vec())
210            };
211            tracing::debug!(
212                "build stream source reader with state: {:?}",
213                bootstrap_state
214            );
215            let build_stream_result = if let Some(exist_stream) = self.reader_stream.take() {
216                Ok((exist_stream, CreateSplitReaderResult::default()))
217            } else {
218                self.source_desc
219                    .source
220                    .build_stream(
221                        bootstrap_state,
222                        column_ids.clone(),
223                        source_ctx_ref.clone(),
224                        // just `seek_to_latest` for initial build
225                        is_initial_build,
226                    )
227                    .await
228            };
229            if let Err(e) = build_stream_result {
230                if is_initial_build {
231                    return Err(StreamExecutorError::connector_error(e));
232                } else {
233                    tracing::error!(
234                        error = %e.as_report(),
235                        source_name = self.source_name,
236                        source_id = %self.source_id,
237                        actor_id = %self.actor_ctx.id,
238                        "build stream source reader error, retry in 1s"
239                    );
240                    GLOBAL_ERROR_METRICS.user_source_error.report([
241                        e.variant_name().to_owned(),
242                        self.source_id.to_string(),
243                        self.source_name.clone(),
244                        self.actor_ctx.fragment_id.to_string(),
245                    ]);
246                    tokio::time::sleep(Duration::from_secs(1)).await;
247                    continue 'build_consume_loop;
248                }
249            }
250
251            let (stream, _) = build_stream_result.unwrap();
252            let stream = apply_rate_limit(stream, self.rate_limit).boxed();
253            let mut is_error = false;
254            #[for_await]
255            'consume: for msg in stream {
256                match msg {
257                    Ok(msg) => {
258                        // All rows (including those visible or invisible) will be used to update the source offset.
259                        for i in 0..msg.capacity() {
260                            let (_, row, _) = msg.row_at(i);
261                            let split = row.datum_at(split_idx).unwrap().into_utf8();
262                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
263                            latest_splits_info
264                                .get_mut(&Arc::from(split.to_owned()))
265                                .map(|split_impl| split_impl.update_in_place(offset.to_owned()));
266                        }
267                        yield (msg, latest_splits_info.clone());
268                    }
269                    Err(e) => {
270                        tracing::error!(
271                            error = %e.as_report(),
272                            source_name = self.source_name,
273                            source_id = %self.source_id,
274                            actor_id = %self.actor_ctx.id,
275                            "stream source reader error"
276                        );
277                        GLOBAL_ERROR_METRICS.user_source_error.report([
278                            e.variant_name().to_owned(),
279                            self.source_id.to_string(),
280                            self.source_name.clone(),
281                            self.actor_ctx.fragment_id.to_string(),
282                        ]);
283                        is_error = true;
284                        break 'consume;
285                    }
286                }
287            }
288            if !is_error {
289                tracing::info!("stream source reader consume finished");
290                latest_splits_info.values_mut().for_each(|split_impl| {
291                    if let Some(mut batch_split) = split_impl.clone().into_batch_split() {
292                        batch_split.finish();
293                        *split_impl = batch_split.into();
294                    }
295                });
296                yield (
297                    StreamChunk::empty(
298                        self.source_desc
299                            .columns
300                            .iter()
301                            .map(|c| c.data_type.clone())
302                            .collect_vec()
303                            .as_slice(),
304                    ),
305                    latest_splits_info.clone(),
306                );
307                break 'build_consume_loop;
308            }
309            tracing::info!("stream source reader error, retry in 1s");
310            tokio::time::sleep(Duration::from_secs(1)).await;
311        }
312    }
313}