risingwave_meta/stream/source_manager/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_connector::WithPropertiesExt;
16#[cfg(not(debug_assertions))]
17use risingwave_connector::error::ConnectorError;
18use risingwave_connector::source::AnySplitEnumerator;
19
20use super::*;
21
22const MAX_FAIL_CNT: u32 = 10;
23
24// The key used to load `SplitImpl` directly from source properties.
25// When this key is present, the enumerator will only return the given ones
26// instead of fetching them from the external source.
27// Only valid in debug builds - will return an error in release builds.
28const DEBUG_SPLITS_KEY: &str = "debug_splits";
29
30pub struct SharedSplitMap {
31    pub splits: Option<BTreeMap<SplitId, SplitImpl>>,
32}
33
34type SharedSplitMapRef = Arc<Mutex<SharedSplitMap>>;
35
36/// `ConnectorSourceWorker` keeps fetching the latest split metadata from the external source service ([`Self::tick`]),
37/// and maintains it in `current_splits`.
38pub struct ConnectorSourceWorker {
39    source_id: SourceId,
40    source_name: String,
41    current_splits: SharedSplitMapRef,
42    // XXX: box or arc?
43    enumerator: Box<dyn AnySplitEnumerator>,
44    period: Duration,
45    metrics: Arc<MetaMetrics>,
46    connector_properties: ConnectorProperties,
47    fail_cnt: u32,
48    source_is_up: LabelGuardedIntGauge,
49
50    debug_splits: Option<Vec<SplitImpl>>,
51}
52
53fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
54    let options_with_secret =
55        WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
56    let mut properties = ConnectorProperties::extract(options_with_secret, false)?;
57    properties.init_from_pb_source(source);
58    Ok(properties)
59}
60fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
61    let options_with_secret = WithOptionsSecResolved::new(
62        {
63            let mut with_properties = source.with_properties.clone();
64            let _removed = with_properties.remove(DEBUG_SPLITS_KEY);
65
66            #[cfg(not(debug_assertions))]
67            {
68                if _removed.is_some() {
69                    return Err(ConnectorError::from(anyhow::anyhow!(
70                        "`debug_splits` is not allowed in release mode"
71                    )));
72                }
73            }
74
75            with_properties
76        },
77        source.secret_refs.clone(),
78    );
79    let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
80    properties.init_from_pb_source(source);
81    Ok(properties)
82}
83
84/// Used to create a new [`ConnectorSourceWorkerHandle`] for a new source.
85///
86/// It will call [`ConnectorSourceWorker::tick()`] to fetch split metadata once before returning.
87pub async fn create_source_worker(
88    source: &Source,
89    metrics: Arc<MetaMetrics>,
90) -> MetaResult<ConnectorSourceWorkerHandle> {
91    tracing::info!("spawning new watcher for source {}", source.id);
92
93    let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
94    let current_splits_ref = splits.clone();
95
96    let connector_properties = extract_prop_from_new_source(source)?;
97    let enable_scale_in = connector_properties.enable_drop_split();
98    let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
99    let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
100    let sync_call_timeout = source
101        .with_properties
102        .get_sync_call_timeout()
103        .unwrap_or(DEFAULT_SOURCE_TICK_TIMEOUT);
104    let handle = {
105        let mut worker = ConnectorSourceWorker::create(
106            source,
107            connector_properties,
108            DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
109            current_splits_ref.clone(),
110            metrics,
111        )
112        .await?;
113
114        // if fail to fetch meta info, will refuse to create source
115        tokio::time::timeout(sync_call_timeout, worker.tick())
116            .await
117            .with_context(|| {
118                format!(
119                    "failed to fetch meta info for source {}, timeout {:?}",
120                    source.id, DEFAULT_SOURCE_TICK_TIMEOUT
121                )
122            })??;
123
124        tokio::spawn(async move { worker.run(command_rx).await })
125    };
126    Ok(ConnectorSourceWorkerHandle {
127        handle,
128        command_tx,
129        splits,
130        enable_drop_split: enable_scale_in,
131        enable_adaptive_splits,
132    })
133}
134
135/// Used on startup ([`SourceManager::new`]). Failed sources will not block meta startup.
136pub fn create_source_worker_async(
137    source: Source,
138    managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
139    metrics: Arc<MetaMetrics>,
140) -> MetaResult<()> {
141    tracing::info!("spawning new watcher for source {}", source.id);
142
143    let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
144    let current_splits_ref = splits.clone();
145    let source_id = source.id;
146
147    let connector_properties = extract_prop_from_existing_source(&source)?;
148
149    let enable_drop_split = connector_properties.enable_drop_split();
150    let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
151    let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
152    let handle = tokio::spawn(async move {
153        let mut ticker = time::interval(DEFAULT_SOURCE_WORKER_TICK_INTERVAL);
154        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
155
156        let mut worker = loop {
157            ticker.tick().await;
158
159            match ConnectorSourceWorker::create(
160                &source,
161                connector_properties.clone(),
162                DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
163                current_splits_ref.clone(),
164                metrics.clone(),
165            )
166            .await
167            {
168                Ok(worker) => {
169                    break worker;
170                }
171                Err(e) => {
172                    tracing::warn!(error = %e.as_report(), "failed to create source worker");
173                }
174            }
175        };
176
177        worker.run(command_rx).await
178    });
179
180    managed_sources.insert(
181        source_id as SourceId,
182        ConnectorSourceWorkerHandle {
183            handle,
184            command_tx,
185            splits,
186            enable_drop_split,
187            enable_adaptive_splits,
188        },
189    );
190    Ok(())
191}
192
193const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);
194
195impl ConnectorSourceWorker {
196    /// Recreate the `SplitEnumerator` to establish a new connection to the external source service.
197    async fn refresh(&mut self) -> MetaResult<()> {
198        let enumerator = self
199            .connector_properties
200            .clone()
201            .create_split_enumerator(Arc::new(SourceEnumeratorContext {
202                metrics: self.metrics.source_enumerator_metrics.clone(),
203                info: SourceEnumeratorInfo {
204                    source_id: self.source_id as u32,
205                },
206            }))
207            .await
208            .context("failed to create SplitEnumerator")?;
209        self.enumerator = enumerator;
210        self.fail_cnt = 0;
211        tracing::info!("refreshed source enumerator: {}", self.source_name);
212        Ok(())
213    }
214
215    /// On creation, connection to the external source service will be established, but `splits`
216    /// will not be updated until `tick` is called.
217    pub async fn create(
218        source: &Source,
219        connector_properties: ConnectorProperties,
220        period: Duration,
221        splits: Arc<Mutex<SharedSplitMap>>,
222        metrics: Arc<MetaMetrics>,
223    ) -> MetaResult<Self> {
224        let enumerator = connector_properties
225            .clone()
226            .create_split_enumerator(Arc::new(SourceEnumeratorContext {
227                metrics: metrics.source_enumerator_metrics.clone(),
228                info: SourceEnumeratorInfo {
229                    source_id: source.id,
230                },
231            }))
232            .await
233            .context("failed to create SplitEnumerator")?;
234
235        let source_is_up = metrics
236            .source_is_up
237            .with_guarded_label_values(&[source.id.to_string().as_str(), &source.name]);
238
239        Ok(Self {
240            source_id: source.id as SourceId,
241            source_name: source.name.clone(),
242            current_splits: splits,
243            enumerator,
244            period,
245            metrics,
246            connector_properties,
247            fail_cnt: 0,
248            source_is_up,
249            debug_splits: {
250                let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
251                #[cfg(not(debug_assertions))]
252                {
253                    if debug_splits.is_some() {
254                        return Err(ConnectorError::from(anyhow::anyhow!(
255                            "`debug_splits` is not allowed in release mode"
256                        ))
257                        .into());
258                    }
259                    None
260                }
261
262                #[cfg(debug_assertions)]
263                {
264                    use risingwave_common::types::JsonbVal;
265                    if let Some(debug_splits) = debug_splits {
266                        let mut splits = Vec::new();
267                        let debug_splits_value =
268                            jsonbb::serde_json::from_str::<serde_json::Value>(debug_splits)
269                                .context("failed to parse split impl")?;
270                        for split_impl_value in debug_splits_value.as_array().unwrap() {
271                            splits.push(SplitImpl::restore_from_json(JsonbVal::from(
272                                split_impl_value.clone(),
273                            ))?);
274                        }
275                        Some(splits)
276                    } else {
277                        None
278                    }
279                }
280            },
281        })
282    }
283
284    pub async fn run(&mut self, mut command_rx: UnboundedReceiver<SourceWorkerCommand>) {
285        let mut interval = time::interval(self.period);
286        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
287        loop {
288            select! {
289                biased;
290                cmd = command_rx.borrow_mut().recv() => {
291                    if let Some(cmd) = cmd {
292                        match cmd {
293                            SourceWorkerCommand::Tick(tx) => {
294                                let _ = tx.send(self.tick().await);
295                            }
296                            SourceWorkerCommand::DropFragments(fragment_ids) => {
297                                if let Err(e) = self.drop_fragments(fragment_ids).await {
298                                    // when error happens, we just log it and ignore
299                                    tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
300                                }
301                            }
302                            SourceWorkerCommand::FinishBackfill(fragment_ids) => {
303                                if let Err(e) = self.finish_backfill(fragment_ids).await {
304                                    // when error happens, we just log it and ignore
305                                    tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
306                                }
307                            }
308                            SourceWorkerCommand::UpdateProps(new_props) => {
309                                self.connector_properties = new_props;
310                                if let Err(e) = self.refresh().await {
311                                    tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
312                                }
313                                tracing::debug!("source {} worker properties updated", self.source_name);
314                            }
315                            SourceWorkerCommand::Terminate => {
316                                return;
317                            }
318                        }
319                    }
320                }
321                _ = interval.tick() => {
322                    if self.fail_cnt > MAX_FAIL_CNT
323                        && let Err(e) = self.refresh().await {
324                            tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
325                        }
326                    if let Err(e) = self.tick().await {
327                        tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker");
328                    }
329                }
330            }
331        }
332    }
333
334    /// Uses [`risingwave_connector::source::SplitEnumerator`] to fetch the latest split metadata from the external source service.
335    async fn tick(&mut self) -> MetaResult<()> {
336        let source_is_up = |res: i64| {
337            self.source_is_up.set(res);
338        };
339
340        let splits = {
341            if let Some(debug_splits) = &self.debug_splits {
342                debug_splits.clone()
343            } else {
344                self.enumerator.list_splits().await.inspect_err(|_| {
345                    source_is_up(0);
346                    self.fail_cnt += 1;
347                })?
348            }
349        };
350
351        source_is_up(1);
352        self.fail_cnt = 0;
353        let mut current_splits = self.current_splits.lock().await;
354        current_splits.splits.replace(
355            splits
356                .into_iter()
357                .map(|split| (split.id(), split))
358                .collect(),
359        );
360
361        Ok(())
362    }
363
364    async fn drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
365        self.enumerator.on_drop_fragments(fragment_ids).await?;
366        Ok(())
367    }
368
369    async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
370        self.enumerator.on_finish_backfill(fragment_ids).await?;
371        Ok(())
372    }
373}
374
375/// Handle for a running [`ConnectorSourceWorker`].
376pub struct ConnectorSourceWorkerHandle {
377    #[expect(dead_code)]
378    handle: JoinHandle<()>,
379    command_tx: UnboundedSender<SourceWorkerCommand>,
380    pub splits: SharedSplitMapRef,
381    pub enable_drop_split: bool,
382    pub enable_adaptive_splits: bool,
383}
384
385impl ConnectorSourceWorkerHandle {
386    pub fn get_enable_adaptive_splits(&self) -> bool {
387        self.enable_adaptive_splits
388    }
389
390    pub async fn discovered_splits(
391        &self,
392        source_id: SourceId,
393        actors: &HashSet<ActorId>,
394    ) -> MetaResult<BTreeMap<Arc<str>, SplitImpl>> {
395        // XXX: when is this None? Can we remove the Option?
396        let Some(mut discovered_splits) = self.splits.lock().await.splits.clone() else {
397            tracing::info!(
398                "The discover loop for source {} is not ready yet; we'll wait for the next run",
399                source_id
400            );
401            return Ok(BTreeMap::new());
402        };
403        if discovered_splits.is_empty() {
404            tracing::warn!("No splits discovered for source {}", source_id);
405        }
406
407        if self.enable_adaptive_splits {
408            // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment.
409            // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id.
410            // And prev splits record should be dropped via CN.
411
412            debug_assert!(self.enable_drop_split);
413            debug_assert!(discovered_splits.len() == 1);
414            discovered_splits =
415                fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?;
416        }
417
418        Ok(discovered_splits)
419    }
420
421    fn send_command(&self, command: SourceWorkerCommand) -> MetaResult<()> {
422        let cmd_str = format!("{:?}", command);
423        self.command_tx
424            .send(command)
425            .with_context(|| format!("failed to send {cmd_str} command to source worker"))?;
426        Ok(())
427    }
428
429    /// Force [`ConnectorSourceWorker::tick()`] to be called.
430    pub async fn force_tick(&self) -> MetaResult<()> {
431        let (tx, rx) = oneshot::channel();
432        self.send_command(SourceWorkerCommand::Tick(tx))?;
433        rx.await
434            .context("failed to receive tick command response from source worker")?
435            .context("source worker tick failed")?;
436        Ok(())
437    }
438
439    pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
440        tracing::debug!("drop_fragments: {:?}", fragment_ids);
441        if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
442            // ignore drop fragment error, just log it
443            tracing::warn!(error = %e.as_report(), "failed to drop fragments");
444        }
445    }
446
447    pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
448        tracing::debug!("finish_backfill: {:?}", fragment_ids);
449        if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
450            // ignore error, just log it
451            tracing::warn!(error = %e.as_report(), "failed to finish backfill");
452        }
453    }
454
455    pub fn update_props(&self, new_props: ConnectorProperties) {
456        if let Err(e) = self.send_command(SourceWorkerCommand::UpdateProps(new_props)) {
457            // ignore update props error, just log it
458            tracing::warn!(error = %e.as_report(), "failed to update source worker properties");
459        }
460    }
461
462    pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
463        tracing::debug!("terminate: {:?}", dropped_fragments);
464        if let Some(dropped_fragments) = dropped_fragments {
465            self.drop_fragments(dropped_fragments.into_iter().collect());
466        }
467        if let Err(e) = self.send_command(SourceWorkerCommand::Terminate) {
468            // ignore terminate error, just log it
469            tracing::warn!(error = %e.as_report(), "failed to terminate source worker");
470        }
471    }
472}
473
474#[derive(educe::Educe)]
475#[educe(Debug)]
476pub enum SourceWorkerCommand {
477    /// Sync command to force [`ConnectorSourceWorker::tick()`] to be called.
478    Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
479    /// Async command to drop a fragment.
480    DropFragments(Vec<FragmentId>),
481    /// Async command to finish backfill.
482    FinishBackfill(Vec<FragmentId>),
483    /// Terminate the worker task.
484    Terminate,
485    /// Update the properties of the source worker.
486    UpdateProps(ConnectorProperties),
487}