risingwave_stream/executor/source/
reader_stream.rs1use std::collections::HashMap;
16use std::time::Duration;
17
18use itertools::Itertools;
19use risingwave_common::catalog::ColumnId;
20use risingwave_common::id::SourceId;
21use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
22use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
23use risingwave_connector::source::reader::desc::SourceDesc;
24use risingwave_connector::source::{
25 BoxSourceChunkStream, CdcAutoSchemaChangeFailCallback, ConnectorState, CreateSplitReaderResult,
26 SourceContext, SourceCtrlOpts, SplitMetaData, StreamChunkWithState,
27};
28use thiserror_ext::AsReport;
29use tokio::sync::{mpsc, oneshot};
30
31use super::{apply_rate_limit, get_split_offset_col_idx};
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::prelude::*;
34
35type AutoSchemaChangeSetup = (
36 Option<mpsc::Sender<(SchemaChangeEnvelope, oneshot::Sender<()>)>>,
37 Option<CdcAutoSchemaChangeFailCallback>,
38);
39
40pub(crate) struct StreamReaderBuilder {
41 pub source_desc: SourceDesc,
42 pub rate_limit: Option<u32>,
43 pub source_id: SourceId,
44 pub source_name: String,
45 pub reader_stream: Option<BoxSourceChunkStream>,
46
47 pub is_auto_schema_change_enable: bool,
49 pub actor_ctx: ActorContextRef,
50}
51
52impl StreamReaderBuilder {
53 fn setup_auto_schema_change(&self) -> AutoSchemaChangeSetup {
54 if self.is_auto_schema_change_enable {
55 let (schema_change_tx, mut schema_change_rx) =
56 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
57 let meta_client = self.actor_ctx.meta_client.clone();
58 let _join_handle = tokio::task::spawn(async move {
60 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
61 let table_ids = schema_change.table_ids();
62 tracing::info!(
63 target: "auto_schema_change",
64 "recv a schema change event for tables: {:?}", table_ids);
65 if let Some(ref meta_client) = meta_client {
67 match meta_client
68 .auto_schema_change(schema_change.to_protobuf())
69 .await
70 {
71 Ok(_) => {
72 tracing::info!(
73 target: "auto_schema_change",
74 "schema change success for tables: {:?}", table_ids);
75 finish_tx.send(()).unwrap();
76 }
77 Err(e) => {
78 tracing::error!(
79 target: "auto_schema_change",
80 error = %e.as_report(), "schema change error");
81
82 finish_tx.send(()).unwrap();
83 }
84 }
85 }
86 }
87 });
88
89 let on_cdc_auto_schema_change_failure = if let Some(ref meta_client) =
91 self.actor_ctx.meta_client
92 {
93 let meta_client = meta_client.clone();
94 Some(CdcAutoSchemaChangeFailCallback::new(
95 move |source_id: SourceId,
96 table_name: String,
97 cdc_table_id: String,
98 upstream_ddl: String,
99 fail_info: String| {
100 let meta_client = meta_client.clone();
101 tokio::spawn(async move {
102 if let Err(e) = meta_client
103 .add_cdc_auto_schema_change_fail_event(
104 source_id,
105 table_name,
106 cdc_table_id,
107 upstream_ddl,
108 fail_info,
109 )
110 .await
111 {
112 tracing::warn!(
113 error = %e.as_report(),
114 %source_id,
115 "Failed to add CDC auto schema change fail event to event log."
116 );
117 }
118 });
119 },
120 ))
121 } else {
122 None
123 };
124
125 (Some(schema_change_tx), on_cdc_auto_schema_change_failure)
126 } else {
127 info!("auto schema change is disabled in config");
128 (None, None)
129 }
130 }
131
132 fn prepare_source_stream_build(&self) -> (Vec<ColumnId>, SourceContext) {
133 let column_ids = self
134 .source_desc
135 .columns
136 .iter()
137 .map(|column_desc| column_desc.column_id)
138 .collect_vec();
139 debug_assert!(column_ids.iter().all_unique(), "column_ids must be unique");
140
141 let (schema_change_tx, on_cdc_auto_schema_change_failure) = self.setup_auto_schema_change();
142
143 let source_ctx = SourceContext::new_with_auto_schema_change_callback(
144 self.actor_ctx.id,
145 self.source_id,
146 self.actor_ctx.fragment_id,
147 self.source_name.clone(),
148 self.source_desc.metrics.clone(),
149 SourceCtrlOpts {
150 chunk_size: limited_chunk_size(self.rate_limit),
151 split_txn: self.rate_limit.is_some(), },
153 self.source_desc.source.config.clone(),
154 schema_change_tx,
155 on_cdc_auto_schema_change_failure,
156 );
157
158 (column_ids, source_ctx)
159 }
160
161 pub(crate) async fn fetch_latest_splits(
162 &mut self,
163 state: ConnectorState,
164 seek_to_latest: bool,
165 ) -> StreamExecutorResult<CreateSplitReaderResult> {
166 let (column_ids, source_ctx) = self.prepare_source_stream_build();
167 let source_ctx_ref = Arc::new(source_ctx);
168 let (stream, res) = self
169 .source_desc
170 .source
171 .build_stream(
172 state.clone(),
173 column_ids.clone(),
174 source_ctx_ref.clone(),
175 seek_to_latest,
176 )
177 .await
178 .map_err(StreamExecutorError::connector_error)?;
179 self.reader_stream = Some(stream);
180 Ok(res)
181 }
182
183 #[try_stream(ok = StreamChunkWithState, error = StreamExecutorError)]
184 pub(crate) async fn into_retry_stream(mut self, state: ConnectorState, is_initial_build: bool) {
185 let (column_ids, source_ctx) = self.prepare_source_stream_build();
186 let source_ctx_ref = Arc::new(source_ctx);
187
188 let mut latest_splits_info = {
189 if let Some(splits) = state.as_ref() {
190 splits
191 .iter()
192 .map(|split| (split.id(), split.clone()))
193 .collect::<HashMap<_, _>>()
194 } else {
195 HashMap::new()
196 }
197 };
198
199 let (Some(split_idx), Some(offset_idx), _) =
200 get_split_offset_col_idx(&self.source_desc.columns)
201 else {
202 unreachable!("Partition and offset columns must be set.");
203 };
204
205 'build_consume_loop: loop {
206 let bootstrap_state = if latest_splits_info.is_empty() {
207 None
208 } else {
209 Some(latest_splits_info.values().cloned().collect_vec())
210 };
211 tracing::debug!(
212 "build stream source reader with state: {:?}",
213 bootstrap_state
214 );
215 let build_stream_result = if let Some(exist_stream) = self.reader_stream.take() {
216 Ok((exist_stream, CreateSplitReaderResult::default()))
217 } else {
218 self.source_desc
219 .source
220 .build_stream(
221 bootstrap_state,
222 column_ids.clone(),
223 source_ctx_ref.clone(),
224 is_initial_build,
226 )
227 .await
228 };
229 if let Err(e) = build_stream_result {
230 if is_initial_build {
231 return Err(StreamExecutorError::connector_error(e));
232 } else {
233 tracing::error!(
234 error = %e.as_report(),
235 source_name = self.source_name,
236 source_id = %self.source_id,
237 actor_id = %self.actor_ctx.id,
238 "build stream source reader error, retry in 1s"
239 );
240 GLOBAL_ERROR_METRICS.user_source_error.report([
241 e.variant_name().to_owned(),
242 self.source_id.to_string(),
243 self.source_name.clone(),
244 self.actor_ctx.fragment_id.to_string(),
245 ]);
246 tokio::time::sleep(Duration::from_secs(1)).await;
247 continue 'build_consume_loop;
248 }
249 }
250
251 let (stream, _) = build_stream_result.unwrap();
252 let stream = apply_rate_limit(stream, self.rate_limit).boxed();
253 let mut is_error = false;
254 #[for_await]
255 'consume: for msg in stream {
256 match msg {
257 Ok(msg) => {
258 for i in 0..msg.capacity() {
260 let (_, row, _) = msg.row_at(i);
261 let split = row.datum_at(split_idx).unwrap().into_utf8();
262 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
263 latest_splits_info
264 .get_mut(&Arc::from(split.to_owned()))
265 .map(|split_impl| split_impl.update_in_place(offset.to_owned()));
266 }
267 yield (msg, latest_splits_info.clone());
268 }
269 Err(e) => {
270 tracing::error!(
271 error = %e.as_report(),
272 source_name = self.source_name,
273 source_id = %self.source_id,
274 actor_id = %self.actor_ctx.id,
275 "stream source reader error"
276 );
277 GLOBAL_ERROR_METRICS.user_source_error.report([
278 e.variant_name().to_owned(),
279 self.source_id.to_string(),
280 self.source_name.clone(),
281 self.actor_ctx.fragment_id.to_string(),
282 ]);
283 is_error = true;
284 break 'consume;
285 }
286 }
287 }
288 if !is_error {
289 tracing::info!("stream source reader consume finished");
290 latest_splits_info.values_mut().for_each(|split_impl| {
291 if let Some(mut batch_split) = split_impl.clone().into_batch_split() {
292 batch_split.finish();
293 *split_impl = batch_split.into();
294 }
295 });
296 yield (
297 StreamChunk::empty(
298 self.source_desc
299 .columns
300 .iter()
301 .map(|c| c.data_type.clone())
302 .collect_vec()
303 .as_slice(),
304 ),
305 latest_splits_info.clone(),
306 );
307 break 'build_consume_loop;
308 }
309 tracing::info!("stream source reader error, retry in 1s");
310 tokio::time::sleep(Duration::from_secs(1)).await;
311 }
312 }
313}