risingwave_meta/stream/source_manager/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_connector::WithPropertiesExt;
17#[cfg(not(debug_assertions))]
18use risingwave_connector::error::ConnectorError;
19use risingwave_connector::source::AnySplitEnumerator;
20use risingwave_connector::source::base::ConnectorProperties;
21
22use super::*;
23
24const MAX_FAIL_CNT: u32 = 10;
25
26// The key used to load `SplitImpl` directly from source properties.
27// When this key is present, the enumerator will only return the given ones
28// instead of fetching them from the external source.
29// Only valid in debug builds - will return an error in release builds.
30const DEBUG_SPLITS_KEY: &str = "debug_splits";
31
32pub struct SharedSplitMap {
33    pub splits: Option<BTreeMap<SplitId, SplitImpl>>,
34}
35
36type SharedSplitMapRef = Arc<Mutex<SharedSplitMap>>;
37
38/// `ConnectorSourceWorker` keeps fetching the latest split metadata from the external source service ([`Self::tick`]),
39/// and maintains it in `current_splits`.
40pub struct ConnectorSourceWorker {
41    source_id: SourceId,
42    source_name: String,
43    current_splits: SharedSplitMapRef,
44    // XXX: box or arc?
45    enumerator: Box<dyn AnySplitEnumerator>,
46    period: Duration,
47    metrics: Arc<MetaMetrics>,
48    connector_properties: ConnectorProperties,
49    fail_cnt: u32,
50    source_is_up: LabelGuardedIntGauge,
51
52    debug_splits: Option<Vec<SplitImpl>>,
53}
54
55fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
56    let options_with_secret =
57        WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
58    let mut properties = ConnectorProperties::extract(options_with_secret, false)?;
59    properties.init_from_pb_source(source);
60    Ok(properties)
61}
62fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
63    let options_with_secret = WithOptionsSecResolved::new(
64        {
65            let mut with_properties = source.with_properties.clone();
66            let _removed = with_properties.remove(DEBUG_SPLITS_KEY);
67
68            #[cfg(not(debug_assertions))]
69            {
70                if _removed.is_some() {
71                    return Err(ConnectorError::from(anyhow::anyhow!(
72                        "`debug_splits` is not allowed in release mode"
73                    )));
74                }
75            }
76
77            with_properties
78        },
79        source.secret_refs.clone(),
80    );
81    let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
82    properties.init_from_pb_source(source);
83    Ok(properties)
84}
85
86/// Used to create a new [`ConnectorSourceWorkerHandle`] for a new source.
87///
88/// It will call [`ConnectorSourceWorker::tick()`] to fetch split metadata once before returning.
89pub async fn create_source_worker(
90    source: &Source,
91    metrics: Arc<MetaMetrics>,
92) -> MetaResult<ConnectorSourceWorkerHandle> {
93    tracing::info!("spawning new watcher for source {}", source.id);
94
95    let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
96    let current_splits_ref = splits.clone();
97
98    let connector_properties = extract_prop_from_new_source(source)?;
99    let enable_scale_in = connector_properties.enable_drop_split();
100    let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
101    let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
102    let sync_call_timeout = source
103        .with_properties
104        .get_sync_call_timeout()
105        .unwrap_or(DEFAULT_SOURCE_TICK_TIMEOUT);
106    let handle = {
107        let mut worker = ConnectorSourceWorker::create(
108            source,
109            connector_properties,
110            DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
111            current_splits_ref.clone(),
112            metrics,
113        )
114        .await?;
115
116        // if fail to fetch meta info, will refuse to create source
117        tokio::time::timeout(sync_call_timeout, worker.tick())
118            .await
119            .with_context(|| {
120                format!(
121                    "failed to fetch meta info for source {}, timeout {:?}",
122                    source.id, DEFAULT_SOURCE_TICK_TIMEOUT
123                )
124            })??;
125
126        tokio::spawn(async move { worker.run(command_rx).await })
127    };
128    Ok(ConnectorSourceWorkerHandle {
129        handle,
130        command_tx,
131        splits,
132        enable_drop_split: enable_scale_in,
133        enable_adaptive_splits,
134    })
135}
136
137/// Used on startup ([`SourceManager::new`]). Failed sources will not block meta startup.
138pub fn create_source_worker_async(
139    source: Source,
140    managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
141    metrics: Arc<MetaMetrics>,
142) -> MetaResult<()> {
143    tracing::info!("spawning new watcher for source {}", source.id);
144
145    let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
146    let current_splits_ref = splits.clone();
147    let source_id = source.id;
148
149    let connector_properties = extract_prop_from_existing_source(&source)?;
150
151    let enable_drop_split = connector_properties.enable_drop_split();
152    let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
153    let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
154    let handle = tokio::spawn(async move {
155        let mut ticker = time::interval(DEFAULT_SOURCE_WORKER_TICK_INTERVAL);
156        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
157
158        let mut worker = loop {
159            ticker.tick().await;
160
161            match ConnectorSourceWorker::create(
162                &source,
163                connector_properties.clone(),
164                DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
165                current_splits_ref.clone(),
166                metrics.clone(),
167            )
168            .await
169            {
170                Ok(worker) => {
171                    break worker;
172                }
173                Err(e) => {
174                    tracing::warn!(error = %e.as_report(), "failed to create source worker");
175                }
176            }
177        };
178
179        worker.run(command_rx).await
180    });
181
182    managed_sources.insert(
183        source_id as SourceId,
184        ConnectorSourceWorkerHandle {
185            handle,
186            command_tx,
187            splits,
188            enable_drop_split,
189            enable_adaptive_splits,
190        },
191    );
192    Ok(())
193}
194
195const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);
196
197impl ConnectorSourceWorker {
198    /// Recreate the `SplitEnumerator` to establish a new connection to the external source service.
199    async fn refresh(&mut self) -> MetaResult<()> {
200        let enumerator = self
201            .connector_properties
202            .clone()
203            .create_split_enumerator(Arc::new(SourceEnumeratorContext {
204                metrics: self.metrics.source_enumerator_metrics.clone(),
205                info: SourceEnumeratorInfo {
206                    source_id: self.source_id as u32,
207                },
208            }))
209            .await
210            .context("failed to create SplitEnumerator")?;
211        self.enumerator = enumerator;
212        self.fail_cnt = 0;
213        tracing::info!("refreshed source enumerator: {}", self.source_name);
214        Ok(())
215    }
216
217    /// On creation, connection to the external source service will be established, but `splits`
218    /// will not be updated until `tick` is called.
219    pub async fn create(
220        source: &Source,
221        connector_properties: ConnectorProperties,
222        period: Duration,
223        splits: Arc<Mutex<SharedSplitMap>>,
224        metrics: Arc<MetaMetrics>,
225    ) -> MetaResult<Self> {
226        let enumerator = connector_properties
227            .clone()
228            .create_split_enumerator(Arc::new(SourceEnumeratorContext {
229                metrics: metrics.source_enumerator_metrics.clone(),
230                info: SourceEnumeratorInfo {
231                    source_id: source.id,
232                },
233            }))
234            .await
235            .context("failed to create SplitEnumerator")?;
236
237        let source_is_up = metrics
238            .source_is_up
239            .with_guarded_label_values(&[source.id.to_string().as_str(), &source.name]);
240
241        Ok(Self {
242            source_id: source.id as SourceId,
243            source_name: source.name.clone(),
244            current_splits: splits,
245            enumerator,
246            period,
247            metrics,
248            connector_properties,
249            fail_cnt: 0,
250            source_is_up,
251            debug_splits: {
252                let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
253                #[cfg(not(debug_assertions))]
254                {
255                    if debug_splits.is_some() {
256                        return Err(ConnectorError::from(anyhow::anyhow!(
257                            "`debug_splits` is not allowed in release mode"
258                        ))
259                        .into());
260                    }
261                    None
262                }
263
264                #[cfg(debug_assertions)]
265                {
266                    use risingwave_common::types::JsonbVal;
267                    if let Some(debug_splits) = debug_splits {
268                        let mut splits = Vec::new();
269                        let debug_splits_value =
270                            jsonbb::serde_json::from_str::<serde_json::Value>(debug_splits)
271                                .context("failed to parse split impl")?;
272                        for split_impl_value in debug_splits_value.as_array().unwrap() {
273                            splits.push(SplitImpl::restore_from_json(JsonbVal::from(
274                                split_impl_value.clone(),
275                            ))?);
276                        }
277                        Some(splits)
278                    } else {
279                        None
280                    }
281                }
282            },
283        })
284    }
285
286    pub async fn run(&mut self, mut command_rx: UnboundedReceiver<SourceWorkerCommand>) {
287        let mut interval = time::interval(self.period);
288        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
289        loop {
290            select! {
291                biased;
292                cmd = command_rx.borrow_mut().recv() => {
293                    if let Some(cmd) = cmd {
294                        match cmd {
295                            SourceWorkerCommand::Tick(tx) => {
296                                let _ = tx.send(self.tick().await);
297                            }
298                            SourceWorkerCommand::DropFragments(fragment_ids) => {
299                                if let Err(e) = self.drop_fragments(fragment_ids).await {
300                                    // when error happens, we just log it and ignore
301                                    tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
302                                }
303                            }
304                            SourceWorkerCommand::FinishBackfill(fragment_ids) => {
305                                if let Err(e) = self.finish_backfill(fragment_ids).await {
306                                    // when error happens, we just log it and ignore
307                                    tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
308                                }
309                            }
310                            SourceWorkerCommand::UpdateProps(new_props) => {
311                                self.connector_properties = new_props;
312                                if let Err(e) = self.refresh().await {
313                                    tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
314                                }
315                                tracing::debug!("source {} worker properties updated", self.source_name);
316                            }
317                            SourceWorkerCommand::Terminate => {
318                                return;
319                            }
320                        }
321                    }
322                }
323                _ = interval.tick() => {
324                    if self.fail_cnt > MAX_FAIL_CNT
325                        && let Err(e) = self.refresh().await {
326                            tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
327                        }
328                    if let Err(e) = self.tick().await {
329                        tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker");
330                    }
331                }
332            }
333        }
334    }
335
336    /// Uses [`risingwave_connector::source::SplitEnumerator`] to fetch the latest split metadata from the external source service.
337    async fn tick(&mut self) -> MetaResult<()> {
338        let source_is_up = |res: i64| {
339            self.source_is_up.set(res);
340        };
341
342        let splits = {
343            if let Some(debug_splits) = &self.debug_splits {
344                debug_splits.clone()
345            } else {
346                self.enumerator.list_splits().await.inspect_err(|_| {
347                    source_is_up(0);
348                    self.fail_cnt += 1;
349                })?
350            }
351        };
352
353        source_is_up(1);
354        self.fail_cnt = 0;
355        let mut current_splits = self.current_splits.lock().await;
356        current_splits.splits.replace(
357            splits
358                .into_iter()
359                .map(|split| (split.id(), split))
360                .collect(),
361        );
362        // Call enumerator's `on_tick` method for monitoring tasks
363        if let Err(e) = self.enumerator.on_tick().await {
364            tracing::error!(
365                "Failed to execute enumerator `on_tick` for source {}: {}",
366                self.source_id,
367                e.as_report()
368            );
369        }
370
371        Ok(())
372    }
373
374    async fn drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
375        self.enumerator.on_drop_fragments(fragment_ids).await?;
376        Ok(())
377    }
378
379    async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
380        self.enumerator.on_finish_backfill(fragment_ids).await?;
381        Ok(())
382    }
383}
384
385/// Handle for a running [`ConnectorSourceWorker`].
386pub struct ConnectorSourceWorkerHandle {
387    #[expect(dead_code)]
388    handle: JoinHandle<()>,
389    command_tx: UnboundedSender<SourceWorkerCommand>,
390    pub splits: SharedSplitMapRef,
391    pub enable_drop_split: bool,
392    pub enable_adaptive_splits: bool,
393}
394
395impl ConnectorSourceWorkerHandle {
396    pub fn get_enable_adaptive_splits(&self) -> bool {
397        self.enable_adaptive_splits
398    }
399
400    pub async fn discovered_splits(
401        &self,
402        source_id: SourceId,
403        actors: &HashSet<ActorId>,
404    ) -> MetaResult<BTreeMap<Arc<str>, SplitImpl>> {
405        // XXX: when is this None? Can we remove the Option?
406        let Some(mut discovered_splits) = self.splits.lock().await.splits.clone() else {
407            tracing::info!(
408                "The discover loop for source {} is not ready yet; we'll wait for the next run",
409                source_id
410            );
411            return Ok(BTreeMap::new());
412        };
413        if discovered_splits.is_empty() {
414            tracing::warn!("No splits discovered for source {}", source_id);
415        }
416
417        if self.enable_adaptive_splits {
418            // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment.
419            // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id.
420            // And prev splits record should be dropped via CN.
421
422            debug_assert!(self.enable_drop_split);
423            debug_assert!(discovered_splits.len() == 1);
424            discovered_splits =
425                fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?;
426        }
427
428        Ok(discovered_splits)
429    }
430
431    fn send_command(&self, command: SourceWorkerCommand) -> MetaResult<()> {
432        let cmd_str = format!("{:?}", command);
433        self.command_tx
434            .send(command)
435            .with_context(|| format!("failed to send {cmd_str} command to source worker"))?;
436        Ok(())
437    }
438
439    /// Force [`ConnectorSourceWorker::tick()`] to be called.
440    pub async fn force_tick(&self) -> MetaResult<()> {
441        let (tx, rx) = oneshot::channel();
442        self.send_command(SourceWorkerCommand::Tick(tx))?;
443        rx.await
444            .context("failed to receive tick command response from source worker")?
445            .context("source worker tick failed")?;
446        Ok(())
447    }
448
449    pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
450        tracing::debug!("drop_fragments: {:?}", fragment_ids);
451        if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
452            // ignore drop fragment error, just log it
453            tracing::warn!(error = %e.as_report(), "failed to drop fragments");
454        }
455    }
456
457    pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
458        tracing::debug!("finish_backfill: {:?}", fragment_ids);
459        if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
460            // ignore error, just log it
461            tracing::warn!(error = %e.as_report(), "failed to finish backfill");
462        }
463    }
464
465    pub fn update_props(&self, new_props: ConnectorProperties) {
466        if let Err(e) = self.send_command(SourceWorkerCommand::UpdateProps(new_props)) {
467            // ignore update props error, just log it
468            tracing::warn!(error = %e.as_report(), "failed to update source worker properties");
469        }
470    }
471
472    pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
473        tracing::debug!("terminate: {:?}", dropped_fragments);
474        if let Some(dropped_fragments) = dropped_fragments {
475            self.drop_fragments(dropped_fragments.into_iter().collect());
476        }
477        if let Err(e) = self.send_command(SourceWorkerCommand::Terminate) {
478            // ignore terminate error, just log it
479            tracing::warn!(error = %e.as_report(), "failed to terminate source worker");
480        }
481    }
482}
483
484#[derive(educe::Educe)]
485#[educe(Debug)]
486pub enum SourceWorkerCommand {
487    /// Sync command to force [`ConnectorSourceWorker::tick()`] to be called.
488    Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
489    /// Async command to drop a fragment.
490    DropFragments(Vec<FragmentId>),
491    /// Async command to finish backfill.
492    FinishBackfill(Vec<FragmentId>),
493    /// Terminate the worker task.
494    Terminate,
495    /// Update the properties of the source worker.
496    UpdateProps(ConnectorProperties),
497}