risingwave_meta/stream/source_manager/
worker.rs1use risingwave_connector::WithPropertiesExt;
16#[cfg(not(debug_assertions))]
17use risingwave_connector::error::ConnectorError;
18use risingwave_connector::source::AnySplitEnumerator;
19
20use super::*;
21
22const MAX_FAIL_CNT: u32 = 10;
23
24const DEBUG_SPLITS_KEY: &str = "debug_splits";
29
30pub struct SharedSplitMap {
31 pub splits: Option<BTreeMap<SplitId, SplitImpl>>,
32}
33
34type SharedSplitMapRef = Arc<Mutex<SharedSplitMap>>;
35
36pub struct ConnectorSourceWorker {
39 source_id: SourceId,
40 source_name: String,
41 current_splits: SharedSplitMapRef,
42 enumerator: Box<dyn AnySplitEnumerator>,
44 period: Duration,
45 metrics: Arc<MetaMetrics>,
46 connector_properties: ConnectorProperties,
47 fail_cnt: u32,
48 source_is_up: LabelGuardedIntGauge,
49
50 debug_splits: Option<Vec<SplitImpl>>,
51}
52
53fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
54 let options_with_secret =
55 WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
56 let mut properties = ConnectorProperties::extract(options_with_secret, false)?;
57 properties.init_from_pb_source(source);
58 Ok(properties)
59}
60fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
61 let options_with_secret = WithOptionsSecResolved::new(
62 {
63 let mut with_properties = source.with_properties.clone();
64 let _removed = with_properties.remove(DEBUG_SPLITS_KEY);
65
66 #[cfg(not(debug_assertions))]
67 {
68 if _removed.is_some() {
69 return Err(ConnectorError::from(anyhow::anyhow!(
70 "`debug_splits` is not allowed in release mode"
71 )));
72 }
73 }
74
75 with_properties
76 },
77 source.secret_refs.clone(),
78 );
79 let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
80 properties.init_from_pb_source(source);
81 Ok(properties)
82}
83
84pub async fn create_source_worker(
88 source: &Source,
89 metrics: Arc<MetaMetrics>,
90) -> MetaResult<ConnectorSourceWorkerHandle> {
91 tracing::info!("spawning new watcher for source {}", source.id);
92
93 let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
94 let current_splits_ref = splits.clone();
95
96 let connector_properties = extract_prop_from_new_source(source)?;
97 let enable_scale_in = connector_properties.enable_drop_split();
98 let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
99 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
100 let sync_call_timeout = source
101 .with_properties
102 .get_sync_call_timeout()
103 .unwrap_or(DEFAULT_SOURCE_TICK_TIMEOUT);
104 let handle = {
105 let mut worker = ConnectorSourceWorker::create(
106 source,
107 connector_properties,
108 DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
109 current_splits_ref.clone(),
110 metrics,
111 )
112 .await?;
113
114 tokio::time::timeout(sync_call_timeout, worker.tick())
116 .await
117 .with_context(|| {
118 format!(
119 "failed to fetch meta info for source {}, timeout {:?}",
120 source.id, DEFAULT_SOURCE_TICK_TIMEOUT
121 )
122 })??;
123
124 tokio::spawn(async move { worker.run(command_rx).await })
125 };
126 Ok(ConnectorSourceWorkerHandle {
127 handle,
128 command_tx,
129 splits,
130 enable_drop_split: enable_scale_in,
131 enable_adaptive_splits,
132 })
133}
134
135pub fn create_source_worker_async(
137 source: Source,
138 managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
139 metrics: Arc<MetaMetrics>,
140) -> MetaResult<()> {
141 tracing::info!("spawning new watcher for source {}", source.id);
142
143 let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
144 let current_splits_ref = splits.clone();
145 let source_id = source.id;
146
147 let connector_properties = extract_prop_from_existing_source(&source)?;
148
149 let enable_drop_split = connector_properties.enable_drop_split();
150 let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
151 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
152 let handle = tokio::spawn(async move {
153 let mut ticker = time::interval(DEFAULT_SOURCE_WORKER_TICK_INTERVAL);
154 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
155
156 let mut worker = loop {
157 ticker.tick().await;
158
159 match ConnectorSourceWorker::create(
160 &source,
161 connector_properties.clone(),
162 DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
163 current_splits_ref.clone(),
164 metrics.clone(),
165 )
166 .await
167 {
168 Ok(worker) => {
169 break worker;
170 }
171 Err(e) => {
172 tracing::warn!(error = %e.as_report(), "failed to create source worker");
173 }
174 }
175 };
176
177 worker.run(command_rx).await
178 });
179
180 managed_sources.insert(
181 source_id as SourceId,
182 ConnectorSourceWorkerHandle {
183 handle,
184 command_tx,
185 splits,
186 enable_drop_split,
187 enable_adaptive_splits,
188 },
189 );
190 Ok(())
191}
192
193const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);
194
195impl ConnectorSourceWorker {
196 async fn refresh(&mut self) -> MetaResult<()> {
198 let enumerator = self
199 .connector_properties
200 .clone()
201 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
202 metrics: self.metrics.source_enumerator_metrics.clone(),
203 info: SourceEnumeratorInfo {
204 source_id: self.source_id as u32,
205 },
206 }))
207 .await
208 .context("failed to create SplitEnumerator")?;
209 self.enumerator = enumerator;
210 self.fail_cnt = 0;
211 tracing::info!("refreshed source enumerator: {}", self.source_name);
212 Ok(())
213 }
214
215 pub async fn create(
218 source: &Source,
219 connector_properties: ConnectorProperties,
220 period: Duration,
221 splits: Arc<Mutex<SharedSplitMap>>,
222 metrics: Arc<MetaMetrics>,
223 ) -> MetaResult<Self> {
224 let enumerator = connector_properties
225 .clone()
226 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
227 metrics: metrics.source_enumerator_metrics.clone(),
228 info: SourceEnumeratorInfo {
229 source_id: source.id,
230 },
231 }))
232 .await
233 .context("failed to create SplitEnumerator")?;
234
235 let source_is_up = metrics
236 .source_is_up
237 .with_guarded_label_values(&[source.id.to_string().as_str(), &source.name]);
238
239 Ok(Self {
240 source_id: source.id as SourceId,
241 source_name: source.name.clone(),
242 current_splits: splits,
243 enumerator,
244 period,
245 metrics,
246 connector_properties,
247 fail_cnt: 0,
248 source_is_up,
249 debug_splits: {
250 let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
251 #[cfg(not(debug_assertions))]
252 {
253 if debug_splits.is_some() {
254 return Err(ConnectorError::from(anyhow::anyhow!(
255 "`debug_splits` is not allowed in release mode"
256 ))
257 .into());
258 }
259 None
260 }
261
262 #[cfg(debug_assertions)]
263 {
264 use risingwave_common::types::JsonbVal;
265 if let Some(debug_splits) = debug_splits {
266 let mut splits = Vec::new();
267 let debug_splits_value =
268 jsonbb::serde_json::from_str::<serde_json::Value>(debug_splits)
269 .context("failed to parse split impl")?;
270 for split_impl_value in debug_splits_value.as_array().unwrap() {
271 splits.push(SplitImpl::restore_from_json(JsonbVal::from(
272 split_impl_value.clone(),
273 ))?);
274 }
275 Some(splits)
276 } else {
277 None
278 }
279 }
280 },
281 })
282 }
283
284 pub async fn run(&mut self, mut command_rx: UnboundedReceiver<SourceWorkerCommand>) {
285 let mut interval = time::interval(self.period);
286 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
287 loop {
288 select! {
289 biased;
290 cmd = command_rx.borrow_mut().recv() => {
291 if let Some(cmd) = cmd {
292 match cmd {
293 SourceWorkerCommand::Tick(tx) => {
294 let _ = tx.send(self.tick().await);
295 }
296 SourceWorkerCommand::DropFragments(fragment_ids) => {
297 if let Err(e) = self.drop_fragments(fragment_ids).await {
298 tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
300 }
301 }
302 SourceWorkerCommand::FinishBackfill(fragment_ids) => {
303 if let Err(e) = self.finish_backfill(fragment_ids).await {
304 tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
306 }
307 }
308 SourceWorkerCommand::UpdateProps(new_props) => {
309 self.connector_properties = new_props;
310 if let Err(e) = self.refresh().await {
311 tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
312 }
313 tracing::debug!("source {} worker properties updated", self.source_name);
314 }
315 SourceWorkerCommand::Terminate => {
316 return;
317 }
318 }
319 }
320 }
321 _ = interval.tick() => {
322 if self.fail_cnt > MAX_FAIL_CNT
323 && let Err(e) = self.refresh().await {
324 tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
325 }
326 if let Err(e) = self.tick().await {
327 tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker");
328 }
329 }
330 }
331 }
332 }
333
334 async fn tick(&mut self) -> MetaResult<()> {
336 let source_is_up = |res: i64| {
337 self.source_is_up.set(res);
338 };
339
340 let splits = {
341 if let Some(debug_splits) = &self.debug_splits {
342 debug_splits.clone()
343 } else {
344 self.enumerator.list_splits().await.inspect_err(|_| {
345 source_is_up(0);
346 self.fail_cnt += 1;
347 })?
348 }
349 };
350
351 source_is_up(1);
352 self.fail_cnt = 0;
353 let mut current_splits = self.current_splits.lock().await;
354 current_splits.splits.replace(
355 splits
356 .into_iter()
357 .map(|split| (split.id(), split))
358 .collect(),
359 );
360
361 Ok(())
362 }
363
364 async fn drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
365 self.enumerator.on_drop_fragments(fragment_ids).await?;
366 Ok(())
367 }
368
369 async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
370 self.enumerator.on_finish_backfill(fragment_ids).await?;
371 Ok(())
372 }
373}
374
375pub struct ConnectorSourceWorkerHandle {
377 #[expect(dead_code)]
378 handle: JoinHandle<()>,
379 command_tx: UnboundedSender<SourceWorkerCommand>,
380 pub splits: SharedSplitMapRef,
381 pub enable_drop_split: bool,
382 pub enable_adaptive_splits: bool,
383}
384
385impl ConnectorSourceWorkerHandle {
386 pub fn get_enable_adaptive_splits(&self) -> bool {
387 self.enable_adaptive_splits
388 }
389
390 pub async fn discovered_splits(
391 &self,
392 source_id: SourceId,
393 actors: &HashSet<ActorId>,
394 ) -> MetaResult<BTreeMap<Arc<str>, SplitImpl>> {
395 let Some(mut discovered_splits) = self.splits.lock().await.splits.clone() else {
397 tracing::info!(
398 "The discover loop for source {} is not ready yet; we'll wait for the next run",
399 source_id
400 );
401 return Ok(BTreeMap::new());
402 };
403 if discovered_splits.is_empty() {
404 tracing::warn!("No splits discovered for source {}", source_id);
405 }
406
407 if self.enable_adaptive_splits {
408 debug_assert!(self.enable_drop_split);
413 debug_assert!(discovered_splits.len() == 1);
414 discovered_splits =
415 fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?;
416 }
417
418 Ok(discovered_splits)
419 }
420
421 fn send_command(&self, command: SourceWorkerCommand) -> MetaResult<()> {
422 let cmd_str = format!("{:?}", command);
423 self.command_tx
424 .send(command)
425 .with_context(|| format!("failed to send {cmd_str} command to source worker"))?;
426 Ok(())
427 }
428
429 pub async fn force_tick(&self) -> MetaResult<()> {
431 let (tx, rx) = oneshot::channel();
432 self.send_command(SourceWorkerCommand::Tick(tx))?;
433 rx.await
434 .context("failed to receive tick command response from source worker")?
435 .context("source worker tick failed")?;
436 Ok(())
437 }
438
439 pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
440 tracing::debug!("drop_fragments: {:?}", fragment_ids);
441 if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
442 tracing::warn!(error = %e.as_report(), "failed to drop fragments");
444 }
445 }
446
447 pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
448 tracing::debug!("finish_backfill: {:?}", fragment_ids);
449 if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
450 tracing::warn!(error = %e.as_report(), "failed to finish backfill");
452 }
453 }
454
455 pub fn update_props(&self, new_props: ConnectorProperties) {
456 if let Err(e) = self.send_command(SourceWorkerCommand::UpdateProps(new_props)) {
457 tracing::warn!(error = %e.as_report(), "failed to update source worker properties");
459 }
460 }
461
462 pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
463 tracing::debug!("terminate: {:?}", dropped_fragments);
464 if let Some(dropped_fragments) = dropped_fragments {
465 self.drop_fragments(dropped_fragments.into_iter().collect());
466 }
467 if let Err(e) = self.send_command(SourceWorkerCommand::Terminate) {
468 tracing::warn!(error = %e.as_report(), "failed to terminate source worker");
470 }
471 }
472}
473
474#[derive(educe::Educe)]
475#[educe(Debug)]
476pub enum SourceWorkerCommand {
477 Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
479 DropFragments(Vec<FragmentId>),
481 FinishBackfill(Vec<FragmentId>),
483 Terminate,
485 UpdateProps(ConnectorProperties),
487}