risingwave_stream/executor/source/
reader_stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use futures::StreamExt;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::ColumnId;
22use risingwave_common::id::SourceId;
23use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
24use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
25use risingwave_connector::source::reader::desc::SourceDesc;
26use risingwave_connector::source::{
27    BoxSourceReaderEventStream, CdcAutoSchemaChangeFailCallback, ConnectorState,
28    CreateSplitReaderResult, SourceContext, SourceCtrlOpts, SourceReaderEvent, SplitId, SplitImpl,
29    SplitMetaData, StreamChunkWithState,
30};
31use thiserror_ext::AsReport;
32use tokio::sync::{mpsc, oneshot};
33
34use super::{apply_rate_limit_to_source_reader_event, get_split_offset_col_idx};
35use crate::common::rate_limit::limited_chunk_size;
36use crate::executor::prelude::*;
37
38type AutoSchemaChangeSetup = (
39    Option<mpsc::Sender<(SchemaChangeEnvelope, oneshot::Sender<()>)>>,
40    Option<CdcAutoSchemaChangeFailCallback>,
41);
42
43#[derive(Debug)]
44pub(crate) enum SourceReaderEventWithState {
45    Data(StreamChunkWithState),
46    Progress(HashMap<SplitId, SplitImpl>),
47}
48
49pub(crate) struct StreamReaderBuilder {
50    pub source_desc: SourceDesc,
51    pub rate_limit: Option<u32>,
52    pub source_id: SourceId,
53    pub source_name: String,
54    pub reader_stream: Option<BoxSourceReaderEventStream>,
55
56    // cdc related
57    pub is_auto_schema_change_enable: bool,
58    pub actor_ctx: ActorContextRef,
59}
60
61impl StreamReaderBuilder {
62    fn update_offsets_from_chunk(
63        latest_splits_info: &mut HashMap<SplitId, SplitImpl>,
64        chunk: &StreamChunk,
65        split_idx: usize,
66        offset_idx: usize,
67    ) {
68        // All rows (including those visible or invisible) will be used to update the source offset.
69        for i in 0..chunk.capacity() {
70            let (_, row, _) = chunk.row_at(i);
71            let split = row.datum_at(split_idx).unwrap().into_utf8();
72            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
73            latest_splits_info
74                .get_mut(&Arc::from(split.to_owned()))
75                .map(|split_impl| split_impl.update_in_place(offset.to_owned()));
76        }
77    }
78
79    fn apply_split_progress(
80        latest_splits_info: &mut HashMap<SplitId, SplitImpl>,
81        split_progress: HashMap<SplitId, String>,
82    ) {
83        for (split_id, offset) in split_progress {
84            if let Some(split_impl) = latest_splits_info.get_mut(&split_id) {
85                if let Err(e) = split_impl.update_in_place(offset) {
86                    tracing::warn!(
87                        error = %e.as_report(),
88                        split_id = %split_id,
89                        "failed to apply split progress update",
90                    );
91                }
92            } else {
93                tracing::warn!(
94                    split_id = %split_id,
95                    "ignore progress for unknown split",
96                );
97            }
98        }
99    }
100
101    fn setup_auto_schema_change(&self) -> AutoSchemaChangeSetup {
102        if self.is_auto_schema_change_enable {
103            let (schema_change_tx, mut schema_change_rx) =
104                mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
105            let meta_client = self.actor_ctx.meta_client.clone();
106            // spawn a task to handle schema change event from source parser
107            let _join_handle = tokio::task::spawn(async move {
108                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
109                    let table_ids = schema_change.table_ids();
110                    tracing::info!(
111                        target: "auto_schema_change",
112                        "recv a schema change event for tables: {:?}", table_ids);
113                    // TODO: retry on rpc error
114                    if let Some(ref meta_client) = meta_client {
115                        match meta_client
116                            .auto_schema_change(schema_change.to_protobuf())
117                            .await
118                        {
119                            Ok(_) => {
120                                tracing::info!(
121                                    target: "auto_schema_change",
122                                    "schema change success for tables: {:?}", table_ids);
123                                finish_tx.send(()).unwrap();
124                            }
125                            Err(e) => {
126                                tracing::error!(
127                                    target: "auto_schema_change",
128                                    error = %e.as_report(), "schema change error");
129
130                                finish_tx.send(()).unwrap();
131                            }
132                        }
133                    }
134                }
135            });
136
137            // Create callback function for reporting CDC auto schema change fail events
138            let on_cdc_auto_schema_change_failure = if let Some(ref meta_client) =
139                self.actor_ctx.meta_client
140            {
141                let meta_client = meta_client.clone();
142                Some(CdcAutoSchemaChangeFailCallback::new(
143                    move |source_id: SourceId,
144                          table_name: String,
145                          cdc_table_id: String,
146                          upstream_ddl: String,
147                          fail_info: String| {
148                        let meta_client = meta_client.clone();
149                        tokio::spawn(async move {
150                            if let Err(e) = meta_client
151                                .add_cdc_auto_schema_change_fail_event(
152                                    source_id,
153                                    table_name,
154                                    cdc_table_id,
155                                    upstream_ddl,
156                                    fail_info,
157                                )
158                                .await
159                            {
160                                tracing::warn!(
161                                    error = %e.as_report(),
162                                    %source_id,
163                                    "Failed to add CDC auto schema change fail event to event log."
164                                );
165                            }
166                        });
167                    },
168                ))
169            } else {
170                None
171            };
172
173            (Some(schema_change_tx), on_cdc_auto_schema_change_failure)
174        } else {
175            info!("auto schema change is disabled in config");
176            (None, None)
177        }
178    }
179
180    fn prepare_source_stream_build(&self) -> (Vec<ColumnId>, SourceContext) {
181        let column_ids = self
182            .source_desc
183            .columns
184            .iter()
185            .map(|column_desc| column_desc.column_id)
186            .collect_vec();
187        debug_assert!(column_ids.iter().all_unique(), "column_ids must be unique");
188
189        let (schema_change_tx, on_cdc_auto_schema_change_failure) = self.setup_auto_schema_change();
190
191        let source_ctx = SourceContext::new_with_auto_schema_change_callback(
192            self.actor_ctx.id,
193            self.source_id,
194            self.actor_ctx.fragment_id,
195            self.source_name.clone(),
196            self.source_desc.metrics.clone(),
197            SourceCtrlOpts {
198                chunk_size: limited_chunk_size(self.rate_limit),
199                split_txn: self.rate_limit.is_some(), // when rate limiting, we may split txn
200            },
201            self.source_desc.source.config.clone(),
202            schema_change_tx,
203            on_cdc_auto_schema_change_failure,
204        );
205
206        (column_ids, source_ctx)
207    }
208
209    pub(crate) async fn fetch_latest_splits(
210        &mut self,
211        state: ConnectorState,
212        seek_to_latest: bool,
213    ) -> StreamExecutorResult<CreateSplitReaderResult> {
214        let (column_ids, source_ctx) = self.prepare_source_stream_build();
215        let source_ctx_ref = Arc::new(source_ctx);
216        let (stream, res) = self
217            .source_desc
218            .source
219            .build_stream(
220                state.clone(),
221                column_ids.clone(),
222                source_ctx_ref.clone(),
223                seek_to_latest,
224            )
225            .await
226            .map_err(StreamExecutorError::connector_error)?;
227        self.reader_stream = Some(stream);
228        Ok(res)
229    }
230
231    #[try_stream(ok = SourceReaderEventWithState, error = StreamExecutorError)]
232    pub(crate) async fn into_retry_stream(mut self, state: ConnectorState, is_initial_build: bool) {
233        let (column_ids, source_ctx) = self.prepare_source_stream_build();
234        let source_ctx_ref = Arc::new(source_ctx);
235
236        let mut latest_splits_info = {
237            if let Some(splits) = state.as_ref() {
238                splits
239                    .iter()
240                    .map(|split| (split.id(), split.clone()))
241                    .collect::<HashMap<_, _>>()
242            } else {
243                HashMap::new()
244            }
245        };
246
247        let (Some(split_idx), Some(offset_idx), _) =
248            get_split_offset_col_idx(&self.source_desc.columns)
249        else {
250            unreachable!("Partition and offset columns must be set.");
251        };
252
253        'build_consume_loop: loop {
254            let bootstrap_state = if latest_splits_info.is_empty() {
255                None
256            } else {
257                Some(latest_splits_info.values().cloned().collect_vec())
258            };
259            tracing::debug!(
260                "build stream source reader with state: {:?}",
261                bootstrap_state
262            );
263            let build_stream_result = if let Some(exist_stream) = self.reader_stream.take() {
264                Ok((exist_stream, CreateSplitReaderResult::default()))
265            } else {
266                self.source_desc
267                    .source
268                    .build_stream(
269                        bootstrap_state,
270                        column_ids.clone(),
271                        source_ctx_ref.clone(),
272                        // just `seek_to_latest` for initial build
273                        is_initial_build,
274                    )
275                    .await
276            };
277            if let Err(e) = build_stream_result {
278                if is_initial_build {
279                    return Err(StreamExecutorError::connector_error(e));
280                } else {
281                    tracing::error!(
282                        error = %e.as_report(),
283                        source_name = self.source_name,
284                        source_id = %self.source_id,
285                        actor_id = %self.actor_ctx.id,
286                        "build stream source reader error, retry in 1s"
287                    );
288                    GLOBAL_ERROR_METRICS.user_source_error.report([
289                        e.variant_name().to_owned(),
290                        self.source_id.to_string(),
291                        self.source_name.clone(),
292                        self.actor_ctx.fragment_id.to_string(),
293                    ]);
294                    tokio::time::sleep(Duration::from_secs(1)).await;
295                    continue 'build_consume_loop;
296                }
297            }
298
299            let (stream, _) = build_stream_result.unwrap();
300            let stream = apply_rate_limit_to_source_reader_event(stream, self.rate_limit).boxed();
301            let mut is_error = false;
302            #[for_await]
303            'consume: for event in stream {
304                let event = match event {
305                    Ok(event) => event,
306                    Err(e) => {
307                        tracing::error!(
308                            error = %e.as_report(),
309                            source_name = self.source_name,
310                            source_id = %self.source_id,
311                            actor_id = %self.actor_ctx.id,
312                            "stream source reader error"
313                        );
314                        GLOBAL_ERROR_METRICS.user_source_error.report([
315                            e.variant_name().to_owned(),
316                            self.source_id.to_string(),
317                            self.source_name.clone(),
318                            self.actor_ctx.fragment_id.to_string(),
319                        ]);
320                        is_error = true;
321                        break 'consume;
322                    }
323                };
324
325                match event {
326                    SourceReaderEvent::DataChunk(chunk) => {
327                        Self::update_offsets_from_chunk(
328                            &mut latest_splits_info,
329                            &chunk,
330                            split_idx,
331                            offset_idx,
332                        );
333                        yield SourceReaderEventWithState::Data((chunk, latest_splits_info.clone()));
334                    }
335                    SourceReaderEvent::SplitProgress(split_progress) => {
336                        Self::apply_split_progress(&mut latest_splits_info, split_progress);
337                        yield SourceReaderEventWithState::Progress(latest_splits_info.clone());
338                    }
339                }
340            }
341            if !is_error {
342                tracing::info!("stream source reader consume finished");
343                latest_splits_info.values_mut().for_each(|split_impl| {
344                    if let Some(mut batch_split) = split_impl.clone().into_batch_split() {
345                        batch_split.finish();
346                        *split_impl = batch_split.into();
347                    }
348                });
349                yield SourceReaderEventWithState::Data((
350                    StreamChunk::empty(
351                        self.source_desc
352                            .columns
353                            .iter()
354                            .map(|c| c.data_type.clone())
355                            .collect_vec()
356                            .as_slice(),
357                    ),
358                    latest_splits_info.clone(),
359                ));
360                break 'build_consume_loop;
361            }
362            tracing::info!("stream source reader error, retry in 1s");
363            tokio::time::sleep(Duration::from_secs(1)).await;
364        }
365    }
366}