risingwave_stream/executor/source/
reader_stream.rs1use std::collections::HashMap;
16use std::time::Duration;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{ColumnId, TableId};
20use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
21use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
22use risingwave_connector::source::reader::desc::SourceDesc;
23use risingwave_connector::source::{
24 BoxSourceChunkStream, CdcAutoSchemaChangeFailCallback, ConnectorState, CreateSplitReaderResult,
25 SourceContext, SourceCtrlOpts, SplitMetaData, StreamChunkWithState,
26};
27use thiserror_ext::AsReport;
28use tokio::sync::{mpsc, oneshot};
29
30use super::{apply_rate_limit, get_split_offset_col_idx};
31use crate::common::rate_limit::limited_chunk_size;
32use crate::executor::prelude::*;
33
34type AutoSchemaChangeSetup = (
35 Option<mpsc::Sender<(SchemaChangeEnvelope, oneshot::Sender<()>)>>,
36 Option<CdcAutoSchemaChangeFailCallback>,
37);
38
39pub(crate) struct StreamReaderBuilder {
40 pub source_desc: SourceDesc,
41 pub rate_limit: Option<u32>,
42 pub source_id: TableId,
43 pub source_name: String,
44 pub reader_stream: Option<BoxSourceChunkStream>,
45
46 pub is_auto_schema_change_enable: bool,
48 pub actor_ctx: ActorContextRef,
49}
50
51impl StreamReaderBuilder {
52 fn setup_auto_schema_change(&self) -> AutoSchemaChangeSetup {
53 if self.is_auto_schema_change_enable {
54 let (schema_change_tx, mut schema_change_rx) =
55 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
56 let meta_client = self.actor_ctx.meta_client.clone();
57 let _join_handle = tokio::task::spawn(async move {
59 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
60 let table_ids = schema_change.table_ids();
61 tracing::info!(
62 target: "auto_schema_change",
63 "recv a schema change event for tables: {:?}", table_ids);
64 if let Some(ref meta_client) = meta_client {
66 match meta_client
67 .auto_schema_change(schema_change.to_protobuf())
68 .await
69 {
70 Ok(_) => {
71 tracing::info!(
72 target: "auto_schema_change",
73 "schema change success for tables: {:?}", table_ids);
74 finish_tx.send(()).unwrap();
75 }
76 Err(e) => {
77 tracing::error!(
78 target: "auto_schema_change",
79 error = %e.as_report(), "schema change error");
80
81 finish_tx.send(()).unwrap();
82 }
83 }
84 }
85 }
86 });
87
88 let on_cdc_auto_schema_change_failure = if let Some(ref meta_client) =
90 self.actor_ctx.meta_client
91 {
92 let meta_client = meta_client.clone();
93 let source_id = self.source_id;
94 Some(CdcAutoSchemaChangeFailCallback::new(
95 move |table_id: u32,
96 table_name: String,
97 cdc_table_id: String,
98 upstream_ddl: String,
99 fail_info: String| {
100 let meta_client = meta_client.clone();
101 let source_id = source_id;
102 tokio::spawn(async move {
103 if let Err(e) = meta_client
104 .add_cdc_auto_schema_change_fail_event(
105 table_id,
106 table_name,
107 cdc_table_id,
108 upstream_ddl,
109 fail_info,
110 )
111 .await
112 {
113 tracing::warn!(
114 error = %e.as_report(),
115 %source_id,
116 "Failed to add CDC auto schema change fail event to event log."
117 );
118 }
119 });
120 },
121 ))
122 } else {
123 None
124 };
125
126 (Some(schema_change_tx), on_cdc_auto_schema_change_failure)
127 } else {
128 info!("auto schema change is disabled in config");
129 (None, None)
130 }
131 }
132
133 fn prepare_source_stream_build(&self) -> (Vec<ColumnId>, SourceContext) {
134 let column_ids = self
135 .source_desc
136 .columns
137 .iter()
138 .map(|column_desc| column_desc.column_id)
139 .collect_vec();
140 debug_assert!(column_ids.iter().all_unique(), "column_ids must be unique");
141
142 let (schema_change_tx, on_cdc_auto_schema_change_failure) = self.setup_auto_schema_change();
143
144 let source_ctx = SourceContext::new_with_auto_schema_change_callback(
145 self.actor_ctx.id,
146 self.source_id,
147 self.actor_ctx.fragment_id,
148 self.source_name.clone(),
149 self.source_desc.metrics.clone(),
150 SourceCtrlOpts {
151 chunk_size: limited_chunk_size(self.rate_limit),
152 split_txn: self.rate_limit.is_some(), },
154 self.source_desc.source.config.clone(),
155 schema_change_tx,
156 on_cdc_auto_schema_change_failure,
157 );
158
159 (column_ids, source_ctx)
160 }
161
162 pub(crate) async fn fetch_latest_splits(
163 &mut self,
164 state: ConnectorState,
165 seek_to_latest: bool,
166 ) -> StreamExecutorResult<CreateSplitReaderResult> {
167 let (column_ids, source_ctx) = self.prepare_source_stream_build();
168 let source_ctx_ref = Arc::new(source_ctx);
169 let (stream, res) = self
170 .source_desc
171 .source
172 .build_stream(
173 state.clone(),
174 column_ids.clone(),
175 source_ctx_ref.clone(),
176 seek_to_latest,
177 )
178 .await
179 .map_err(StreamExecutorError::connector_error)?;
180 self.reader_stream = Some(stream);
181 Ok(res)
182 }
183
184 #[try_stream(ok = StreamChunkWithState, error = StreamExecutorError)]
185 pub(crate) async fn into_retry_stream(mut self, state: ConnectorState, is_initial_build: bool) {
186 let (column_ids, source_ctx) = self.prepare_source_stream_build();
187 let source_ctx_ref = Arc::new(source_ctx);
188
189 let mut latest_splits_info = {
190 if let Some(splits) = state.as_ref() {
191 splits
192 .iter()
193 .map(|split| (split.id(), split.clone()))
194 .collect::<HashMap<_, _>>()
195 } else {
196 HashMap::new()
197 }
198 };
199
200 let (Some(split_idx), Some(offset_idx), _) =
201 get_split_offset_col_idx(&self.source_desc.columns)
202 else {
203 unreachable!("Partition and offset columns must be set.");
204 };
205
206 'build_consume_loop: loop {
207 let bootstrap_state = if latest_splits_info.is_empty() {
208 None
209 } else {
210 Some(latest_splits_info.values().cloned().collect_vec())
211 };
212 tracing::debug!(
213 "build stream source reader with state: {:?}",
214 bootstrap_state
215 );
216 let build_stream_result = if let Some(exist_stream) = self.reader_stream.take() {
217 Ok((exist_stream, CreateSplitReaderResult::default()))
218 } else {
219 self.source_desc
220 .source
221 .build_stream(
222 bootstrap_state,
223 column_ids.clone(),
224 source_ctx_ref.clone(),
225 is_initial_build,
227 )
228 .await
229 };
230 if let Err(e) = build_stream_result {
231 if is_initial_build {
232 return Err(StreamExecutorError::connector_error(e));
233 } else {
234 tracing::error!(
235 error = %e.as_report(),
236 source_name = self.source_name,
237 source_id = %self.source_id,
238 actor_id = self.actor_ctx.id,
239 "build stream source reader error, retry in 1s"
240 );
241 GLOBAL_ERROR_METRICS.user_source_error.report([
242 e.variant_name().to_owned(),
243 self.source_id.to_string(),
244 self.source_name.clone(),
245 self.actor_ctx.fragment_id.to_string(),
246 ]);
247 tokio::time::sleep(Duration::from_secs(1)).await;
248 continue 'build_consume_loop;
249 }
250 }
251
252 let (stream, _) = build_stream_result.unwrap();
253 let stream = apply_rate_limit(stream, self.rate_limit).boxed();
254 let mut is_error = false;
255 #[for_await]
256 'consume: for msg in stream {
257 match msg {
258 Ok(msg) => {
259 for (_, row) in msg.rows() {
260 let split = row.datum_at(split_idx).unwrap().into_utf8();
261 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
262 latest_splits_info
263 .get_mut(&Arc::from(split.to_owned()))
264 .map(|split_impl| split_impl.update_in_place(offset.to_owned()));
265 }
266 yield (msg, latest_splits_info.clone());
267 }
268 Err(e) => {
269 tracing::error!(
270 error = %e.as_report(),
271 source_name = self.source_name,
272 source_id = %self.source_id,
273 actor_id = self.actor_ctx.id,
274 "stream source reader error"
275 );
276 GLOBAL_ERROR_METRICS.user_source_error.report([
277 e.variant_name().to_owned(),
278 self.source_id.to_string(),
279 self.source_name.clone(),
280 self.actor_ctx.fragment_id.to_string(),
281 ]);
282 is_error = true;
283 break 'consume;
284 }
285 }
286 }
287 if !is_error {
288 tracing::info!("stream source reader consume finished");
289 latest_splits_info.values_mut().for_each(|split_impl| {
290 if let Some(mut batch_split) = split_impl.clone().into_batch_split() {
291 batch_split.finish();
292 *split_impl = batch_split.into();
293 }
294 });
295 yield (
296 StreamChunk::empty(
297 self.source_desc
298 .columns
299 .iter()
300 .map(|c| c.data_type.clone())
301 .collect_vec()
302 .as_slice(),
303 ),
304 latest_splits_info.clone(),
305 );
306 break 'build_consume_loop;
307 }
308 tracing::info!("stream source reader error, retry in 1s");
309 tokio::time::sleep(Duration::from_secs(1)).await;
310 }
311 }
312}