risingwave_meta/stream/source_manager/
worker.rs1use anyhow::Context;
16use risingwave_connector::WithPropertiesExt;
17#[cfg(not(debug_assertions))]
18use risingwave_connector::error::ConnectorError;
19use risingwave_connector::source::AnySplitEnumerator;
20use risingwave_connector::source::base::ConnectorProperties;
21
22use super::*;
23
24const MAX_FAIL_CNT: u32 = 10;
25
26const DEBUG_SPLITS_KEY: &str = "debug_splits";
31
32pub struct SharedSplitMap {
33 pub splits: Option<BTreeMap<SplitId, SplitImpl>>,
34}
35
36type SharedSplitMapRef = Arc<Mutex<SharedSplitMap>>;
37
38pub struct ConnectorSourceWorker {
41 source_id: SourceId,
42 source_name: String,
43 current_splits: SharedSplitMapRef,
44 enumerator: Box<dyn AnySplitEnumerator>,
46 period: Duration,
47 metrics: Arc<MetaMetrics>,
48 connector_properties: ConnectorProperties,
49 fail_cnt: u32,
50 source_is_up: LabelGuardedIntGauge,
51
52 debug_splits: Option<Vec<SplitImpl>>,
53}
54
55fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
56 let options_with_secret =
57 WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
58 let mut properties = ConnectorProperties::extract(options_with_secret, false)?;
59 properties.init_from_pb_source(source);
60 Ok(properties)
61}
62fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
63 let options_with_secret = WithOptionsSecResolved::new(
64 {
65 let mut with_properties = source.with_properties.clone();
66 let _removed = with_properties.remove(DEBUG_SPLITS_KEY);
67
68 #[cfg(not(debug_assertions))]
69 {
70 if _removed.is_some() {
71 return Err(ConnectorError::from(anyhow::anyhow!(
72 "`debug_splits` is not allowed in release mode"
73 )));
74 }
75 }
76
77 with_properties
78 },
79 source.secret_refs.clone(),
80 );
81 let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
82 properties.init_from_pb_source(source);
83 Ok(properties)
84}
85
86pub async fn create_source_worker(
90 source: &Source,
91 metrics: Arc<MetaMetrics>,
92) -> MetaResult<ConnectorSourceWorkerHandle> {
93 tracing::info!("spawning new watcher for source {}", source.id);
94
95 let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
96 let current_splits_ref = splits.clone();
97
98 let connector_properties = extract_prop_from_new_source(source)?;
99 let enable_scale_in = connector_properties.enable_drop_split();
100 let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
101 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
102 let sync_call_timeout = source
103 .with_properties
104 .get_sync_call_timeout()
105 .unwrap_or(DEFAULT_SOURCE_TICK_TIMEOUT);
106 let handle = {
107 let mut worker = ConnectorSourceWorker::create(
108 source,
109 connector_properties,
110 DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
111 current_splits_ref.clone(),
112 metrics,
113 )
114 .await?;
115
116 tokio::time::timeout(sync_call_timeout, worker.tick())
118 .await
119 .with_context(|| {
120 format!(
121 "failed to fetch meta info for source {}, timeout {:?}",
122 source.id, DEFAULT_SOURCE_TICK_TIMEOUT
123 )
124 })??;
125
126 tokio::spawn(async move { worker.run(command_rx).await })
127 };
128 Ok(ConnectorSourceWorkerHandle {
129 handle,
130 command_tx,
131 splits,
132 enable_drop_split: enable_scale_in,
133 enable_adaptive_splits,
134 })
135}
136
137pub fn create_source_worker_async(
139 source: Source,
140 managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
141 metrics: Arc<MetaMetrics>,
142) -> MetaResult<()> {
143 tracing::info!("spawning new watcher for source {}", source.id);
144
145 let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
146 let current_splits_ref = splits.clone();
147 let source_id = source.id;
148
149 let connector_properties = extract_prop_from_existing_source(&source)?;
150
151 let enable_drop_split = connector_properties.enable_drop_split();
152 let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
153 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
154 let handle = tokio::spawn(async move {
155 let mut ticker = time::interval(DEFAULT_SOURCE_WORKER_TICK_INTERVAL);
156 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
157
158 let mut worker = loop {
159 ticker.tick().await;
160
161 match ConnectorSourceWorker::create(
162 &source,
163 connector_properties.clone(),
164 DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
165 current_splits_ref.clone(),
166 metrics.clone(),
167 )
168 .await
169 {
170 Ok(worker) => {
171 break worker;
172 }
173 Err(e) => {
174 tracing::warn!(error = %e.as_report(), "failed to create source worker");
175 }
176 }
177 };
178
179 worker.run(command_rx).await
180 });
181
182 managed_sources.insert(
183 source_id as SourceId,
184 ConnectorSourceWorkerHandle {
185 handle,
186 command_tx,
187 splits,
188 enable_drop_split,
189 enable_adaptive_splits,
190 },
191 );
192 Ok(())
193}
194
195const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);
196
197impl ConnectorSourceWorker {
198 async fn refresh(&mut self) -> MetaResult<()> {
200 let enumerator = self
201 .connector_properties
202 .clone()
203 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
204 metrics: self.metrics.source_enumerator_metrics.clone(),
205 info: SourceEnumeratorInfo {
206 source_id: self.source_id as u32,
207 },
208 }))
209 .await
210 .context("failed to create SplitEnumerator")?;
211 self.enumerator = enumerator;
212 self.fail_cnt = 0;
213 tracing::info!("refreshed source enumerator: {}", self.source_name);
214 Ok(())
215 }
216
217 pub async fn create(
220 source: &Source,
221 connector_properties: ConnectorProperties,
222 period: Duration,
223 splits: Arc<Mutex<SharedSplitMap>>,
224 metrics: Arc<MetaMetrics>,
225 ) -> MetaResult<Self> {
226 let enumerator = connector_properties
227 .clone()
228 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
229 metrics: metrics.source_enumerator_metrics.clone(),
230 info: SourceEnumeratorInfo {
231 source_id: source.id,
232 },
233 }))
234 .await
235 .context("failed to create SplitEnumerator")?;
236
237 let source_is_up = metrics
238 .source_is_up
239 .with_guarded_label_values(&[source.id.to_string().as_str(), &source.name]);
240
241 Ok(Self {
242 source_id: source.id as SourceId,
243 source_name: source.name.clone(),
244 current_splits: splits,
245 enumerator,
246 period,
247 metrics,
248 connector_properties,
249 fail_cnt: 0,
250 source_is_up,
251 debug_splits: {
252 let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
253 #[cfg(not(debug_assertions))]
254 {
255 if debug_splits.is_some() {
256 return Err(ConnectorError::from(anyhow::anyhow!(
257 "`debug_splits` is not allowed in release mode"
258 ))
259 .into());
260 }
261 None
262 }
263
264 #[cfg(debug_assertions)]
265 {
266 use risingwave_common::types::JsonbVal;
267 if let Some(debug_splits) = debug_splits {
268 let mut splits = Vec::new();
269 let debug_splits_value =
270 jsonbb::serde_json::from_str::<serde_json::Value>(debug_splits)
271 .context("failed to parse split impl")?;
272 for split_impl_value in debug_splits_value.as_array().unwrap() {
273 splits.push(SplitImpl::restore_from_json(JsonbVal::from(
274 split_impl_value.clone(),
275 ))?);
276 }
277 Some(splits)
278 } else {
279 None
280 }
281 }
282 },
283 })
284 }
285
286 pub async fn run(&mut self, mut command_rx: UnboundedReceiver<SourceWorkerCommand>) {
287 let mut interval = time::interval(self.period);
288 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
289 loop {
290 select! {
291 biased;
292 cmd = command_rx.borrow_mut().recv() => {
293 if let Some(cmd) = cmd {
294 match cmd {
295 SourceWorkerCommand::Tick(tx) => {
296 let _ = tx.send(self.tick().await);
297 }
298 SourceWorkerCommand::DropFragments(fragment_ids) => {
299 if let Err(e) = self.drop_fragments(fragment_ids).await {
300 tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
302 }
303 }
304 SourceWorkerCommand::FinishBackfill(fragment_ids) => {
305 if let Err(e) = self.finish_backfill(fragment_ids).await {
306 tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
308 }
309 }
310 SourceWorkerCommand::UpdateProps(new_props) => {
311 self.connector_properties = new_props;
312 if let Err(e) = self.refresh().await {
313 tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
314 }
315 tracing::debug!("source {} worker properties updated", self.source_name);
316 }
317 SourceWorkerCommand::Terminate => {
318 return;
319 }
320 }
321 }
322 }
323 _ = interval.tick() => {
324 if self.fail_cnt > MAX_FAIL_CNT
325 && let Err(e) = self.refresh().await {
326 tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
327 }
328 if let Err(e) = self.tick().await {
329 tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker");
330 }
331 }
332 }
333 }
334 }
335
336 async fn tick(&mut self) -> MetaResult<()> {
338 let source_is_up = |res: i64| {
339 self.source_is_up.set(res);
340 };
341
342 let splits = {
343 if let Some(debug_splits) = &self.debug_splits {
344 debug_splits.clone()
345 } else {
346 self.enumerator.list_splits().await.inspect_err(|_| {
347 source_is_up(0);
348 self.fail_cnt += 1;
349 })?
350 }
351 };
352
353 source_is_up(1);
354 self.fail_cnt = 0;
355 let mut current_splits = self.current_splits.lock().await;
356 current_splits.splits.replace(
357 splits
358 .into_iter()
359 .map(|split| (split.id(), split))
360 .collect(),
361 );
362 if let Err(e) = self.enumerator.on_tick().await {
364 tracing::error!(
365 "Failed to execute enumerator `on_tick` for source {}: {}",
366 self.source_id,
367 e.as_report()
368 );
369 }
370
371 Ok(())
372 }
373
374 async fn drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
375 self.enumerator.on_drop_fragments(fragment_ids).await?;
376 Ok(())
377 }
378
379 async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
380 self.enumerator.on_finish_backfill(fragment_ids).await?;
381 Ok(())
382 }
383}
384
385pub struct ConnectorSourceWorkerHandle {
387 #[expect(dead_code)]
388 handle: JoinHandle<()>,
389 command_tx: UnboundedSender<SourceWorkerCommand>,
390 pub splits: SharedSplitMapRef,
391 pub enable_drop_split: bool,
392 pub enable_adaptive_splits: bool,
393}
394
395impl ConnectorSourceWorkerHandle {
396 pub fn get_enable_adaptive_splits(&self) -> bool {
397 self.enable_adaptive_splits
398 }
399
400 pub async fn discovered_splits(
401 &self,
402 source_id: SourceId,
403 actors: &HashSet<ActorId>,
404 ) -> MetaResult<BTreeMap<Arc<str>, SplitImpl>> {
405 let Some(mut discovered_splits) = self.splits.lock().await.splits.clone() else {
407 tracing::info!(
408 "The discover loop for source {} is not ready yet; we'll wait for the next run",
409 source_id
410 );
411 return Ok(BTreeMap::new());
412 };
413 if discovered_splits.is_empty() {
414 tracing::warn!("No splits discovered for source {}", source_id);
415 }
416
417 if self.enable_adaptive_splits {
418 debug_assert!(self.enable_drop_split);
423 debug_assert!(discovered_splits.len() == 1);
424 discovered_splits =
425 fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?;
426 }
427
428 Ok(discovered_splits)
429 }
430
431 fn send_command(&self, command: SourceWorkerCommand) -> MetaResult<()> {
432 let cmd_str = format!("{:?}", command);
433 self.command_tx
434 .send(command)
435 .with_context(|| format!("failed to send {cmd_str} command to source worker"))?;
436 Ok(())
437 }
438
439 pub async fn force_tick(&self) -> MetaResult<()> {
441 let (tx, rx) = oneshot::channel();
442 self.send_command(SourceWorkerCommand::Tick(tx))?;
443 rx.await
444 .context("failed to receive tick command response from source worker")?
445 .context("source worker tick failed")?;
446 Ok(())
447 }
448
449 pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
450 tracing::debug!("drop_fragments: {:?}", fragment_ids);
451 if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
452 tracing::warn!(error = %e.as_report(), "failed to drop fragments");
454 }
455 }
456
457 pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
458 tracing::debug!("finish_backfill: {:?}", fragment_ids);
459 if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
460 tracing::warn!(error = %e.as_report(), "failed to finish backfill");
462 }
463 }
464
465 pub fn update_props(&self, new_props: ConnectorProperties) {
466 if let Err(e) = self.send_command(SourceWorkerCommand::UpdateProps(new_props)) {
467 tracing::warn!(error = %e.as_report(), "failed to update source worker properties");
469 }
470 }
471
472 pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
473 tracing::debug!("terminate: {:?}", dropped_fragments);
474 if let Some(dropped_fragments) = dropped_fragments {
475 self.drop_fragments(dropped_fragments.into_iter().collect());
476 }
477 if let Err(e) = self.send_command(SourceWorkerCommand::Terminate) {
478 tracing::warn!(error = %e.as_report(), "failed to terminate source worker");
480 }
481 }
482}
483
484#[derive(educe::Educe)]
485#[educe(Debug)]
486pub enum SourceWorkerCommand {
487 Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
489 DropFragments(Vec<FragmentId>),
491 FinishBackfill(Vec<FragmentId>),
493 Terminate,
495 UpdateProps(ConnectorProperties),
497}