risingwave_stream/executor/source/
reader_stream.rs1use std::collections::HashMap;
16use std::time::Duration;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{ColumnId, TableId};
20use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
21use risingwave_connector::source::reader::desc::SourceDesc;
22use risingwave_connector::source::{
23 BoxSourceChunkStream, ConnectorState, CreateSplitReaderResult, SourceContext, SourceCtrlOpts,
24 SplitMetaData, StreamChunkWithState,
25};
26use thiserror_ext::AsReport;
27use tokio::sync::{mpsc, oneshot};
28
29use super::{apply_rate_limit, get_split_offset_col_idx};
30use crate::common::rate_limit::limited_chunk_size;
31use crate::executor::prelude::*;
32
33pub(crate) struct StreamReaderBuilder {
34 pub source_desc: SourceDesc,
35 pub rate_limit: Option<u32>,
36 pub source_id: TableId,
37 pub source_name: String,
38 pub reader_stream: Option<BoxSourceChunkStream>,
39
40 pub is_auto_schema_change_enable: bool,
42 pub actor_ctx: ActorContextRef,
43}
44
45impl StreamReaderBuilder {
46 fn prepare_source_stream_build(&self) -> (Vec<ColumnId>, SourceContext) {
47 let column_ids = self
48 .source_desc
49 .columns
50 .iter()
51 .map(|column_desc| column_desc.column_id)
52 .collect_vec();
53
54 let (schema_change_tx, mut schema_change_rx) =
55 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
56 let schema_change_tx = if self.is_auto_schema_change_enable {
57 let meta_client = self.actor_ctx.meta_client.clone();
58 let _join_handle = tokio::task::spawn(async move {
60 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
61 let table_ids = schema_change.table_ids();
62 tracing::info!(
63 target: "auto_schema_change",
64 "recv a schema change event for tables: {:?}", table_ids);
65 if let Some(ref meta_client) = meta_client {
67 match meta_client
68 .auto_schema_change(schema_change.to_protobuf())
69 .await
70 {
71 Ok(_) => {
72 tracing::info!(
73 target: "auto_schema_change",
74 "schema change success for tables: {:?}", table_ids);
75 finish_tx.send(()).unwrap();
76 }
77 Err(e) => {
78 tracing::error!(
79 target: "auto_schema_change",
80 error = %e.as_report(), "schema change error");
81 finish_tx.send(()).unwrap();
82 }
83 }
84 }
85 }
86 });
87 Some(schema_change_tx)
88 } else {
89 info!("auto schema change is disabled in config");
90 None
91 };
92
93 let source_ctx = SourceContext::new(
94 self.actor_ctx.id,
95 self.source_id,
96 self.actor_ctx.fragment_id,
97 self.source_name.clone(),
98 self.source_desc.metrics.clone(),
99 SourceCtrlOpts {
100 chunk_size: limited_chunk_size(self.rate_limit),
101 split_txn: self.rate_limit.is_some(), },
103 self.source_desc.source.config.clone(),
104 schema_change_tx,
105 );
106
107 (column_ids, source_ctx)
108 }
109
110 pub(crate) async fn fetch_latest_splits(
111 &mut self,
112 state: ConnectorState,
113 seek_to_latest: bool,
114 ) -> StreamExecutorResult<CreateSplitReaderResult> {
115 let (column_ids, source_ctx) = self.prepare_source_stream_build();
116 let source_ctx_ref = Arc::new(source_ctx);
117 let (stream, res) = self
118 .source_desc
119 .source
120 .build_stream(
121 state.clone(),
122 column_ids.clone(),
123 source_ctx_ref.clone(),
124 seek_to_latest,
125 )
126 .await
127 .map_err(StreamExecutorError::connector_error)?;
128 self.reader_stream = Some(stream);
129 Ok(res)
130 }
131
132 #[try_stream(ok = StreamChunkWithState, error = StreamExecutorError)]
133 pub(crate) async fn into_retry_stream(mut self, state: ConnectorState, is_initial_build: bool) {
134 let (column_ids, source_ctx) = self.prepare_source_stream_build();
135 let source_ctx_ref = Arc::new(source_ctx);
136
137 let mut latest_splits_info = {
138 if let Some(splits) = state.as_ref() {
139 splits
140 .iter()
141 .map(|split| (split.id(), split.clone()))
142 .collect::<HashMap<_, _>>()
143 } else {
144 HashMap::new()
145 }
146 };
147
148 let (Some(split_idx), Some(offset_idx)) =
149 get_split_offset_col_idx(&self.source_desc.columns)
150 else {
151 unreachable!("Partition and offset columns must be set.");
152 };
153
154 'build_consume_loop: loop {
155 let bootstrap_state = if latest_splits_info.is_empty() {
156 None
157 } else {
158 Some(latest_splits_info.values().cloned().collect_vec())
159 };
160 tracing::debug!(
161 "build stream source reader with state: {:?}",
162 bootstrap_state
163 );
164 let build_stream_result = if let Some(exist_stream) = self.reader_stream.take() {
165 Ok((exist_stream, CreateSplitReaderResult::default()))
166 } else {
167 self.source_desc
168 .source
169 .build_stream(
170 bootstrap_state,
171 column_ids.clone(),
172 source_ctx_ref.clone(),
173 is_initial_build,
175 )
176 .await
177 };
178 if let Err(e) = build_stream_result {
179 if is_initial_build {
180 return Err(StreamExecutorError::connector_error(e));
181 } else {
182 tracing::warn!(
183 error = %e.as_report(),
184 source_name = self.source_name,
185 source_id = self.source_id.table_id,
186 actor_id = self.actor_ctx.id,
187 "build stream source reader error, retry in 1s"
188 );
189 tokio::time::sleep(Duration::from_secs(1)).await;
190 continue 'build_consume_loop;
191 }
192 }
193
194 let (stream, _) = build_stream_result.unwrap();
195 let stream = apply_rate_limit(stream, self.rate_limit).boxed();
196 #[for_await]
197 'consume: for msg in stream {
198 match msg {
199 Ok(msg) => {
200 for (_, row) in msg.rows() {
201 let split = row.datum_at(split_idx).unwrap().into_utf8();
202 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
203 latest_splits_info
204 .get_mut(&Arc::from(split.to_owned()))
205 .map(|split_impl| split_impl.update_in_place(offset.to_owned()));
206 }
207 yield (msg, latest_splits_info.clone());
208 }
209 Err(e) => {
210 tracing::warn!(
211 error = %e.as_report(),
212 source_name = self.source_name,
213 source_id = self.source_id.table_id,
214 actor_id = self.actor_ctx.id,
215 "stream source reader error"
216 );
217 break 'consume;
218 }
219 }
220 }
221 tracing::info!("stream source reader error, retry in 1s");
222 tokio::time::sleep(Duration::from_secs(1)).await;
223 }
224 }
225}