risingwave_stream/executor/source/
reader_stream.rs1use std::collections::HashMap;
16use std::time::Duration;
17
18use futures::StreamExt;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::ColumnId;
22use risingwave_common::id::SourceId;
23use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
24use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
25use risingwave_connector::source::reader::desc::SourceDesc;
26use risingwave_connector::source::{
27 BoxSourceReaderEventStream, CdcAutoSchemaChangeFailCallback, ConnectorState,
28 CreateSplitReaderResult, SourceContext, SourceCtrlOpts, SourceReaderEvent, SplitId, SplitImpl,
29 SplitMetaData, StreamChunkWithState,
30};
31use thiserror_ext::AsReport;
32use tokio::sync::{mpsc, oneshot};
33
34use super::{apply_rate_limit_to_source_reader_event, get_split_offset_col_idx};
35use crate::common::rate_limit::limited_chunk_size;
36use crate::executor::prelude::*;
37
38type AutoSchemaChangeSetup = (
39 Option<mpsc::Sender<(SchemaChangeEnvelope, oneshot::Sender<()>)>>,
40 Option<CdcAutoSchemaChangeFailCallback>,
41);
42
43#[derive(Debug)]
44pub(crate) enum SourceReaderEventWithState {
45 Data(StreamChunkWithState),
46 Progress(HashMap<SplitId, SplitImpl>),
47}
48
49pub(crate) struct StreamReaderBuilder {
50 pub source_desc: SourceDesc,
51 pub rate_limit: Option<u32>,
52 pub source_id: SourceId,
53 pub source_name: String,
54 pub reader_stream: Option<BoxSourceReaderEventStream>,
55
56 pub is_auto_schema_change_enable: bool,
58 pub actor_ctx: ActorContextRef,
59}
60
61impl StreamReaderBuilder {
62 fn update_offsets_from_chunk(
63 latest_splits_info: &mut HashMap<SplitId, SplitImpl>,
64 chunk: &StreamChunk,
65 split_idx: usize,
66 offset_idx: usize,
67 ) {
68 for i in 0..chunk.capacity() {
70 let (_, row, _) = chunk.row_at(i);
71 let split = row.datum_at(split_idx).unwrap().into_utf8();
72 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
73 latest_splits_info
74 .get_mut(&Arc::from(split.to_owned()))
75 .map(|split_impl| split_impl.update_in_place(offset.to_owned()));
76 }
77 }
78
79 fn apply_split_progress(
80 latest_splits_info: &mut HashMap<SplitId, SplitImpl>,
81 split_progress: HashMap<SplitId, String>,
82 ) {
83 for (split_id, offset) in split_progress {
84 if let Some(split_impl) = latest_splits_info.get_mut(&split_id) {
85 if let Err(e) = split_impl.update_in_place(offset) {
86 tracing::warn!(
87 error = %e.as_report(),
88 split_id = %split_id,
89 "failed to apply split progress update",
90 );
91 }
92 } else {
93 tracing::warn!(
94 split_id = %split_id,
95 "ignore progress for unknown split",
96 );
97 }
98 }
99 }
100
101 fn setup_auto_schema_change(&self) -> AutoSchemaChangeSetup {
102 if self.is_auto_schema_change_enable {
103 let (schema_change_tx, mut schema_change_rx) =
104 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
105 let meta_client = self.actor_ctx.meta_client.clone();
106 let _join_handle = tokio::task::spawn(async move {
108 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
109 let table_ids = schema_change.table_ids();
110 tracing::info!(
111 target: "auto_schema_change",
112 "recv a schema change event for tables: {:?}", table_ids);
113 if let Some(ref meta_client) = meta_client {
115 match meta_client
116 .auto_schema_change(schema_change.to_protobuf())
117 .await
118 {
119 Ok(_) => {
120 tracing::info!(
121 target: "auto_schema_change",
122 "schema change success for tables: {:?}", table_ids);
123 finish_tx.send(()).unwrap();
124 }
125 Err(e) => {
126 tracing::error!(
127 target: "auto_schema_change",
128 error = %e.as_report(), "schema change error");
129
130 finish_tx.send(()).unwrap();
131 }
132 }
133 }
134 }
135 });
136
137 let on_cdc_auto_schema_change_failure = if let Some(ref meta_client) =
139 self.actor_ctx.meta_client
140 {
141 let meta_client = meta_client.clone();
142 Some(CdcAutoSchemaChangeFailCallback::new(
143 move |source_id: SourceId,
144 table_name: String,
145 cdc_table_id: String,
146 upstream_ddl: String,
147 fail_info: String| {
148 let meta_client = meta_client.clone();
149 tokio::spawn(async move {
150 if let Err(e) = meta_client
151 .add_cdc_auto_schema_change_fail_event(
152 source_id,
153 table_name,
154 cdc_table_id,
155 upstream_ddl,
156 fail_info,
157 )
158 .await
159 {
160 tracing::warn!(
161 error = %e.as_report(),
162 %source_id,
163 "Failed to add CDC auto schema change fail event to event log."
164 );
165 }
166 });
167 },
168 ))
169 } else {
170 None
171 };
172
173 (Some(schema_change_tx), on_cdc_auto_schema_change_failure)
174 } else {
175 info!("auto schema change is disabled in config");
176 (None, None)
177 }
178 }
179
180 fn prepare_source_stream_build(&self) -> (Vec<ColumnId>, SourceContext) {
181 let column_ids = self
182 .source_desc
183 .columns
184 .iter()
185 .map(|column_desc| column_desc.column_id)
186 .collect_vec();
187 debug_assert!(column_ids.iter().all_unique(), "column_ids must be unique");
188
189 let (schema_change_tx, on_cdc_auto_schema_change_failure) = self.setup_auto_schema_change();
190
191 let source_ctx = SourceContext::new_with_auto_schema_change_callback(
192 self.actor_ctx.id,
193 self.source_id,
194 self.actor_ctx.fragment_id,
195 self.source_name.clone(),
196 self.source_desc.metrics.clone(),
197 SourceCtrlOpts {
198 chunk_size: limited_chunk_size(self.rate_limit),
199 split_txn: self.rate_limit.is_some(), },
201 self.source_desc.source.config.clone(),
202 schema_change_tx,
203 on_cdc_auto_schema_change_failure,
204 );
205
206 (column_ids, source_ctx)
207 }
208
209 pub(crate) async fn fetch_latest_splits(
210 &mut self,
211 state: ConnectorState,
212 seek_to_latest: bool,
213 ) -> StreamExecutorResult<CreateSplitReaderResult> {
214 let (column_ids, source_ctx) = self.prepare_source_stream_build();
215 let source_ctx_ref = Arc::new(source_ctx);
216 let (stream, res) = self
217 .source_desc
218 .source
219 .build_stream(
220 state.clone(),
221 column_ids.clone(),
222 source_ctx_ref.clone(),
223 seek_to_latest,
224 )
225 .await
226 .map_err(StreamExecutorError::connector_error)?;
227 self.reader_stream = Some(stream);
228 Ok(res)
229 }
230
231 #[try_stream(ok = SourceReaderEventWithState, error = StreamExecutorError)]
232 pub(crate) async fn into_retry_stream(mut self, state: ConnectorState, is_initial_build: bool) {
233 let (column_ids, source_ctx) = self.prepare_source_stream_build();
234 let source_ctx_ref = Arc::new(source_ctx);
235
236 let mut latest_splits_info = {
237 if let Some(splits) = state.as_ref() {
238 splits
239 .iter()
240 .map(|split| (split.id(), split.clone()))
241 .collect::<HashMap<_, _>>()
242 } else {
243 HashMap::new()
244 }
245 };
246
247 let (Some(split_idx), Some(offset_idx), _) =
248 get_split_offset_col_idx(&self.source_desc.columns)
249 else {
250 unreachable!("Partition and offset columns must be set.");
251 };
252
253 'build_consume_loop: loop {
254 let bootstrap_state = if latest_splits_info.is_empty() {
255 None
256 } else {
257 Some(latest_splits_info.values().cloned().collect_vec())
258 };
259 tracing::debug!(
260 "build stream source reader with state: {:?}",
261 bootstrap_state
262 );
263 let build_stream_result = if let Some(exist_stream) = self.reader_stream.take() {
264 Ok((exist_stream, CreateSplitReaderResult::default()))
265 } else {
266 self.source_desc
267 .source
268 .build_stream(
269 bootstrap_state,
270 column_ids.clone(),
271 source_ctx_ref.clone(),
272 is_initial_build,
274 )
275 .await
276 };
277 if let Err(e) = build_stream_result {
278 if is_initial_build {
279 return Err(StreamExecutorError::connector_error(e));
280 } else {
281 tracing::error!(
282 error = %e.as_report(),
283 source_name = self.source_name,
284 source_id = %self.source_id,
285 actor_id = %self.actor_ctx.id,
286 "build stream source reader error, retry in 1s"
287 );
288 GLOBAL_ERROR_METRICS.user_source_error.report([
289 e.variant_name().to_owned(),
290 self.source_id.to_string(),
291 self.source_name.clone(),
292 self.actor_ctx.fragment_id.to_string(),
293 ]);
294 tokio::time::sleep(Duration::from_secs(1)).await;
295 continue 'build_consume_loop;
296 }
297 }
298
299 let (stream, _) = build_stream_result.unwrap();
300 let stream = apply_rate_limit_to_source_reader_event(stream, self.rate_limit).boxed();
301 let mut is_error = false;
302 #[for_await]
303 'consume: for event in stream {
304 let event = match event {
305 Ok(event) => event,
306 Err(e) => {
307 tracing::error!(
308 error = %e.as_report(),
309 source_name = self.source_name,
310 source_id = %self.source_id,
311 actor_id = %self.actor_ctx.id,
312 "stream source reader error"
313 );
314 GLOBAL_ERROR_METRICS.user_source_error.report([
315 e.variant_name().to_owned(),
316 self.source_id.to_string(),
317 self.source_name.clone(),
318 self.actor_ctx.fragment_id.to_string(),
319 ]);
320 is_error = true;
321 break 'consume;
322 }
323 };
324
325 match event {
326 SourceReaderEvent::DataChunk(chunk) => {
327 Self::update_offsets_from_chunk(
328 &mut latest_splits_info,
329 &chunk,
330 split_idx,
331 offset_idx,
332 );
333 yield SourceReaderEventWithState::Data((chunk, latest_splits_info.clone()));
334 }
335 SourceReaderEvent::SplitProgress(split_progress) => {
336 Self::apply_split_progress(&mut latest_splits_info, split_progress);
337 yield SourceReaderEventWithState::Progress(latest_splits_info.clone());
338 }
339 }
340 }
341 if !is_error {
342 tracing::info!("stream source reader consume finished");
343 latest_splits_info.values_mut().for_each(|split_impl| {
344 if let Some(mut batch_split) = split_impl.clone().into_batch_split() {
345 batch_split.finish();
346 *split_impl = batch_split.into();
347 }
348 });
349 yield SourceReaderEventWithState::Data((
350 StreamChunk::empty(
351 self.source_desc
352 .columns
353 .iter()
354 .map(|c| c.data_type.clone())
355 .collect_vec()
356 .as_slice(),
357 ),
358 latest_splits_info.clone(),
359 ));
360 break 'build_consume_loop;
361 }
362 tracing::info!("stream source reader error, retry in 1s");
363 tokio::time::sleep(Duration::from_secs(1)).await;
364 }
365 }
366}