risingwave_stream/executor/source/
reader_stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{ColumnId, TableId};
20use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
21use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
22use risingwave_connector::source::reader::desc::SourceDesc;
23use risingwave_connector::source::{
24    BoxSourceChunkStream, CdcAutoSchemaChangeFailCallback, ConnectorState, CreateSplitReaderResult,
25    SourceContext, SourceCtrlOpts, SplitMetaData, StreamChunkWithState,
26};
27use thiserror_ext::AsReport;
28use tokio::sync::{mpsc, oneshot};
29
30use super::{apply_rate_limit, get_split_offset_col_idx};
31use crate::common::rate_limit::limited_chunk_size;
32use crate::executor::prelude::*;
33
34type AutoSchemaChangeSetup = (
35    Option<mpsc::Sender<(SchemaChangeEnvelope, oneshot::Sender<()>)>>,
36    Option<CdcAutoSchemaChangeFailCallback>,
37);
38
39pub(crate) struct StreamReaderBuilder {
40    pub source_desc: SourceDesc,
41    pub rate_limit: Option<u32>,
42    pub source_id: TableId,
43    pub source_name: String,
44    pub reader_stream: Option<BoxSourceChunkStream>,
45
46    // cdc related
47    pub is_auto_schema_change_enable: bool,
48    pub actor_ctx: ActorContextRef,
49}
50
51impl StreamReaderBuilder {
52    fn setup_auto_schema_change(&self) -> AutoSchemaChangeSetup {
53        if self.is_auto_schema_change_enable {
54            let (schema_change_tx, mut schema_change_rx) =
55                mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
56            let meta_client = self.actor_ctx.meta_client.clone();
57            // spawn a task to handle schema change event from source parser
58            let _join_handle = tokio::task::spawn(async move {
59                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
60                    let table_ids = schema_change.table_ids();
61                    tracing::info!(
62                        target: "auto_schema_change",
63                        "recv a schema change event for tables: {:?}", table_ids);
64                    // TODO: retry on rpc error
65                    if let Some(ref meta_client) = meta_client {
66                        match meta_client
67                            .auto_schema_change(schema_change.to_protobuf())
68                            .await
69                        {
70                            Ok(_) => {
71                                tracing::info!(
72                                    target: "auto_schema_change",
73                                    "schema change success for tables: {:?}", table_ids);
74                                finish_tx.send(()).unwrap();
75                            }
76                            Err(e) => {
77                                tracing::error!(
78                                    target: "auto_schema_change",
79                                    error = %e.as_report(), "schema change error");
80
81                                finish_tx.send(()).unwrap();
82                            }
83                        }
84                    }
85                }
86            });
87
88            // Create callback function for reporting CDC auto schema change fail events
89            let on_cdc_auto_schema_change_failure = if let Some(ref meta_client) =
90                self.actor_ctx.meta_client
91            {
92                let meta_client = meta_client.clone();
93                let source_id = self.source_id;
94                Some(CdcAutoSchemaChangeFailCallback::new(
95                    move |table_id: u32,
96                          table_name: String,
97                          cdc_table_id: String,
98                          upstream_ddl: String,
99                          fail_info: String| {
100                        let meta_client = meta_client.clone();
101                        let source_id = source_id;
102                        tokio::spawn(async move {
103                            if let Err(e) = meta_client
104                                .add_cdc_auto_schema_change_fail_event(
105                                    table_id,
106                                    table_name,
107                                    cdc_table_id,
108                                    upstream_ddl,
109                                    fail_info,
110                                )
111                                .await
112                            {
113                                tracing::warn!(
114                                    error = %e.as_report(),
115                                    %source_id,
116                                    "Failed to add CDC auto schema change fail event to event log."
117                                );
118                            }
119                        });
120                    },
121                ))
122            } else {
123                None
124            };
125
126            (Some(schema_change_tx), on_cdc_auto_schema_change_failure)
127        } else {
128            info!("auto schema change is disabled in config");
129            (None, None)
130        }
131    }
132
133    fn prepare_source_stream_build(&self) -> (Vec<ColumnId>, SourceContext) {
134        let column_ids = self
135            .source_desc
136            .columns
137            .iter()
138            .map(|column_desc| column_desc.column_id)
139            .collect_vec();
140        debug_assert!(column_ids.iter().all_unique(), "column_ids must be unique");
141
142        let (schema_change_tx, on_cdc_auto_schema_change_failure) = self.setup_auto_schema_change();
143
144        let source_ctx = SourceContext::new_with_auto_schema_change_callback(
145            self.actor_ctx.id,
146            self.source_id,
147            self.actor_ctx.fragment_id,
148            self.source_name.clone(),
149            self.source_desc.metrics.clone(),
150            SourceCtrlOpts {
151                chunk_size: limited_chunk_size(self.rate_limit),
152                split_txn: self.rate_limit.is_some(), // when rate limiting, we may split txn
153            },
154            self.source_desc.source.config.clone(),
155            schema_change_tx,
156            on_cdc_auto_schema_change_failure,
157        );
158
159        (column_ids, source_ctx)
160    }
161
162    pub(crate) async fn fetch_latest_splits(
163        &mut self,
164        state: ConnectorState,
165        seek_to_latest: bool,
166    ) -> StreamExecutorResult<CreateSplitReaderResult> {
167        let (column_ids, source_ctx) = self.prepare_source_stream_build();
168        let source_ctx_ref = Arc::new(source_ctx);
169        let (stream, res) = self
170            .source_desc
171            .source
172            .build_stream(
173                state.clone(),
174                column_ids.clone(),
175                source_ctx_ref.clone(),
176                seek_to_latest,
177            )
178            .await
179            .map_err(StreamExecutorError::connector_error)?;
180        self.reader_stream = Some(stream);
181        Ok(res)
182    }
183
184    #[try_stream(ok = StreamChunkWithState, error = StreamExecutorError)]
185    pub(crate) async fn into_retry_stream(mut self, state: ConnectorState, is_initial_build: bool) {
186        let (column_ids, source_ctx) = self.prepare_source_stream_build();
187        let source_ctx_ref = Arc::new(source_ctx);
188
189        let mut latest_splits_info = {
190            if let Some(splits) = state.as_ref() {
191                splits
192                    .iter()
193                    .map(|split| (split.id(), split.clone()))
194                    .collect::<HashMap<_, _>>()
195            } else {
196                HashMap::new()
197            }
198        };
199
200        let (Some(split_idx), Some(offset_idx), _) =
201            get_split_offset_col_idx(&self.source_desc.columns)
202        else {
203            unreachable!("Partition and offset columns must be set.");
204        };
205
206        'build_consume_loop: loop {
207            let bootstrap_state = if latest_splits_info.is_empty() {
208                None
209            } else {
210                Some(latest_splits_info.values().cloned().collect_vec())
211            };
212            tracing::debug!(
213                "build stream source reader with state: {:?}",
214                bootstrap_state
215            );
216            let build_stream_result = if let Some(exist_stream) = self.reader_stream.take() {
217                Ok((exist_stream, CreateSplitReaderResult::default()))
218            } else {
219                self.source_desc
220                    .source
221                    .build_stream(
222                        bootstrap_state,
223                        column_ids.clone(),
224                        source_ctx_ref.clone(),
225                        // just `seek_to_latest` for initial build
226                        is_initial_build,
227                    )
228                    .await
229            };
230            if let Err(e) = build_stream_result {
231                if is_initial_build {
232                    return Err(StreamExecutorError::connector_error(e));
233                } else {
234                    tracing::error!(
235                        error = %e.as_report(),
236                        source_name = self.source_name,
237                        source_id = %self.source_id,
238                        actor_id = self.actor_ctx.id,
239                        "build stream source reader error, retry in 1s"
240                    );
241                    GLOBAL_ERROR_METRICS.user_source_error.report([
242                        e.variant_name().to_owned(),
243                        self.source_id.to_string(),
244                        self.source_name.clone(),
245                        self.actor_ctx.fragment_id.to_string(),
246                    ]);
247                    tokio::time::sleep(Duration::from_secs(1)).await;
248                    continue 'build_consume_loop;
249                }
250            }
251
252            let (stream, _) = build_stream_result.unwrap();
253            let stream = apply_rate_limit(stream, self.rate_limit).boxed();
254            let mut is_error = false;
255            #[for_await]
256            'consume: for msg in stream {
257                match msg {
258                    Ok(msg) => {
259                        for (_, row) in msg.rows() {
260                            let split = row.datum_at(split_idx).unwrap().into_utf8();
261                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
262                            latest_splits_info
263                                .get_mut(&Arc::from(split.to_owned()))
264                                .map(|split_impl| split_impl.update_in_place(offset.to_owned()));
265                        }
266                        yield (msg, latest_splits_info.clone());
267                    }
268                    Err(e) => {
269                        tracing::error!(
270                            error = %e.as_report(),
271                            source_name = self.source_name,
272                            source_id = %self.source_id,
273                            actor_id = self.actor_ctx.id,
274                            "stream source reader error"
275                        );
276                        GLOBAL_ERROR_METRICS.user_source_error.report([
277                            e.variant_name().to_owned(),
278                            self.source_id.to_string(),
279                            self.source_name.clone(),
280                            self.actor_ctx.fragment_id.to_string(),
281                        ]);
282                        is_error = true;
283                        break 'consume;
284                    }
285                }
286            }
287            if !is_error {
288                tracing::info!("stream source reader consume finished");
289                latest_splits_info.values_mut().for_each(|split_impl| {
290                    if let Some(mut batch_split) = split_impl.clone().into_batch_split() {
291                        batch_split.finish();
292                        *split_impl = batch_split.into();
293                    }
294                });
295                yield (
296                    StreamChunk::empty(
297                        self.source_desc
298                            .columns
299                            .iter()
300                            .map(|c| c.data_type.clone())
301                            .collect_vec()
302                            .as_slice(),
303                    ),
304                    latest_splits_info.clone(),
305                );
306                break 'build_consume_loop;
307            }
308            tracing::info!("stream source reader error, retry in 1s");
309            tokio::time::sleep(Duration::from_secs(1)).await;
310        }
311    }
312}