risingwave_meta/stream/source_manager/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_connector::WithPropertiesExt;
16#[cfg(not(debug_assertions))]
17use risingwave_connector::error::ConnectorError;
18use risingwave_connector::source::AnySplitEnumerator;
19
20use super::*;
21
22const MAX_FAIL_CNT: u32 = 10;
23const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
24
25// The key used to load `SplitImpl` directly from source properties.
26// When this key is present, the enumerator will only return the given ones
27// instead of fetching them from the external source.
28// Only valid in debug builds - will return an error in release builds.
29const DEBUG_SPLITS_KEY: &str = "debug_splits";
30
31pub struct SharedSplitMap {
32    pub splits: Option<BTreeMap<SplitId, SplitImpl>>,
33}
34
35type SharedSplitMapRef = Arc<Mutex<SharedSplitMap>>;
36
37/// `ConnectorSourceWorker` keeps fetching the latest split metadata from the external source service ([`Self::tick`]),
38/// and maintains it in `current_splits`.
39pub struct ConnectorSourceWorker {
40    source_id: SourceId,
41    source_name: String,
42    current_splits: SharedSplitMapRef,
43    // XXX: box or arc?
44    enumerator: Box<dyn AnySplitEnumerator>,
45    period: Duration,
46    metrics: Arc<MetaMetrics>,
47    connector_properties: ConnectorProperties,
48    fail_cnt: u32,
49    source_is_up: LabelGuardedIntGauge<2>,
50
51    debug_splits: Option<Vec<SplitImpl>>,
52}
53
54fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
55    let options_with_secret =
56        WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
57    let mut properties = ConnectorProperties::extract(options_with_secret, false)?;
58    properties.init_from_pb_source(source);
59    Ok(properties)
60}
61fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
62    let options_with_secret = WithOptionsSecResolved::new(
63        {
64            let mut with_properties = source.with_properties.clone();
65            let _removed = with_properties.remove(DEBUG_SPLITS_KEY);
66
67            #[cfg(not(debug_assertions))]
68            {
69                if _removed.is_some() {
70                    return Err(ConnectorError::from(anyhow::anyhow!(
71                        "`debug_splits` is not allowed in release mode"
72                    )));
73                }
74            }
75
76            with_properties
77        },
78        source.secret_refs.clone(),
79    );
80    let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
81    properties.init_from_pb_source(source);
82    Ok(properties)
83}
84
85/// Used to create a new [`ConnectorSourceWorkerHandle`] for a new source.
86///
87/// It will call [`ConnectorSourceWorker::tick()`] to fetch split metadata once before returning.
88pub async fn create_source_worker(
89    source: &Source,
90    metrics: Arc<MetaMetrics>,
91) -> MetaResult<ConnectorSourceWorkerHandle> {
92    tracing::info!("spawning new watcher for source {}", source.id);
93
94    let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
95    let current_splits_ref = splits.clone();
96
97    let connector_properties = extract_prop_from_new_source(source)?;
98    let enable_scale_in = connector_properties.enable_drop_split();
99    let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
100    let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
101    let sync_call_timeout = source
102        .with_properties
103        .get_sync_call_timeout()
104        .unwrap_or(DEFAULT_SOURCE_TICK_TIMEOUT);
105    let handle = {
106        let mut worker = ConnectorSourceWorker::create(
107            source,
108            connector_properties,
109            DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
110            current_splits_ref.clone(),
111            metrics,
112        )
113        .await?;
114
115        // if fail to fetch meta info, will refuse to create source
116        tokio::time::timeout(sync_call_timeout, worker.tick())
117            .await
118            .with_context(|| {
119                format!(
120                    "failed to fetch meta info for source {}, timeout {:?}",
121                    source.id, DEFAULT_SOURCE_TICK_TIMEOUT
122                )
123            })??;
124
125        tokio::spawn(async move { worker.run(command_rx).await })
126    };
127    Ok(ConnectorSourceWorkerHandle {
128        handle,
129        command_tx,
130        splits,
131        enable_drop_split: enable_scale_in,
132        enable_adaptive_splits,
133    })
134}
135
136/// Used on startup ([`SourceManager::new`]). Failed sources will not block meta startup.
137pub fn create_source_worker_async(
138    source: Source,
139    managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
140    metrics: Arc<MetaMetrics>,
141) -> MetaResult<()> {
142    tracing::info!("spawning new watcher for source {}", source.id);
143
144    let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
145    let current_splits_ref = splits.clone();
146    let source_id = source.id;
147
148    let connector_properties = extract_prop_from_existing_source(&source)?;
149
150    let enable_drop_split = connector_properties.enable_drop_split();
151    let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
152    let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
153    let handle = tokio::spawn(async move {
154        let mut ticker = time::interval(DEFAULT_SOURCE_WORKER_TICK_INTERVAL);
155        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
156
157        let mut worker = loop {
158            ticker.tick().await;
159
160            match ConnectorSourceWorker::create(
161                &source,
162                connector_properties.clone(),
163                DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
164                current_splits_ref.clone(),
165                metrics.clone(),
166            )
167            .await
168            {
169                Ok(worker) => {
170                    break worker;
171                }
172                Err(e) => {
173                    tracing::warn!(error = %e.as_report(), "failed to create source worker");
174                }
175            }
176        };
177
178        worker.run(command_rx).await
179    });
180
181    managed_sources.insert(
182        source_id as SourceId,
183        ConnectorSourceWorkerHandle {
184            handle,
185            command_tx,
186            splits,
187            enable_drop_split,
188            enable_adaptive_splits,
189        },
190    );
191    Ok(())
192}
193
194const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);
195
196impl ConnectorSourceWorker {
197    /// Recreate the `SplitEnumerator` to establish a new connection to the external source service.
198    async fn refresh(&mut self) -> MetaResult<()> {
199        let enumerator = self
200            .connector_properties
201            .clone()
202            .create_split_enumerator(Arc::new(SourceEnumeratorContext {
203                metrics: self.metrics.source_enumerator_metrics.clone(),
204                info: SourceEnumeratorInfo {
205                    source_id: self.source_id as u32,
206                },
207            }))
208            .await
209            .context("failed to create SplitEnumerator")?;
210        self.enumerator = enumerator;
211        self.fail_cnt = 0;
212        tracing::info!("refreshed source enumerator: {}", self.source_name);
213        Ok(())
214    }
215
216    /// On creation, connection to the external source service will be established, but `splits`
217    /// will not be updated until `tick` is called.
218    pub async fn create(
219        source: &Source,
220        connector_properties: ConnectorProperties,
221        period: Duration,
222        splits: Arc<Mutex<SharedSplitMap>>,
223        metrics: Arc<MetaMetrics>,
224    ) -> MetaResult<Self> {
225        let enumerator = connector_properties
226            .clone()
227            .create_split_enumerator(Arc::new(SourceEnumeratorContext {
228                metrics: metrics.source_enumerator_metrics.clone(),
229                info: SourceEnumeratorInfo {
230                    source_id: source.id,
231                },
232            }))
233            .await
234            .context("failed to create SplitEnumerator")?;
235
236        let source_is_up = metrics
237            .source_is_up
238            .with_guarded_label_values(&[source.id.to_string().as_str(), &source.name]);
239
240        Ok(Self {
241            source_id: source.id as SourceId,
242            source_name: source.name.clone(),
243            current_splits: splits,
244            enumerator,
245            period,
246            metrics,
247            connector_properties,
248            fail_cnt: 0,
249            source_is_up,
250            debug_splits: {
251                let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
252                #[cfg(not(debug_assertions))]
253                {
254                    if debug_splits.is_some() {
255                        return Err(ConnectorError::from(anyhow::anyhow!(
256                            "`debug_splits` is not allowed in release mode"
257                        ))
258                        .into());
259                    }
260                    None
261                }
262
263                #[cfg(debug_assertions)]
264                {
265                    use risingwave_common::types::JsonbVal;
266                    if let Some(debug_splits) = debug_splits {
267                        let mut splits = Vec::new();
268                        let debug_splits_value =
269                            jsonbb::serde_json::from_str::<serde_json::Value>(debug_splits)
270                                .context("failed to parse split impl")?;
271                        for split_impl_value in debug_splits_value.as_array().unwrap() {
272                            splits.push(SplitImpl::restore_from_json(JsonbVal::from(
273                                split_impl_value.clone(),
274                            ))?);
275                        }
276                        Some(splits)
277                    } else {
278                        None
279                    }
280                }
281            },
282        })
283    }
284
285    pub async fn run(&mut self, mut command_rx: UnboundedReceiver<SourceWorkerCommand>) {
286        let mut interval = time::interval(self.period);
287        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
288        loop {
289            select! {
290                biased;
291                cmd = command_rx.borrow_mut().recv() => {
292                    if let Some(cmd) = cmd {
293                        match cmd {
294                            SourceWorkerCommand::Tick(tx) => {
295                                let _ = tx.send(self.tick().await);
296                            }
297                            SourceWorkerCommand::DropFragments(fragment_ids) => {
298                                if let Err(e) = self.drop_fragments(fragment_ids).await {
299                                    // when error happens, we just log it and ignore
300                                    tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
301                                }
302                            }
303                            SourceWorkerCommand::FinishBackfill(fragment_ids) => {
304                                if let Err(e) = self.finish_backfill(fragment_ids).await {
305                                    // when error happens, we just log it and ignore
306                                    tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
307                                }
308                            }
309                            SourceWorkerCommand::Terminate => {
310                                return;
311                            }
312                        }
313                    }
314                }
315                _ = interval.tick() => {
316                    if self.fail_cnt > MAX_FAIL_CNT {
317                        if let Err(e) = self.refresh().await {
318                            tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
319                        }
320                    }
321                    if let Err(e) = self.tick().await {
322                        tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker");
323                    }
324                }
325            }
326        }
327    }
328
329    /// Uses [`risingwave_connector::source::SplitEnumerator`] to fetch the latest split metadata from the external source service.
330    async fn tick(&mut self) -> MetaResult<()> {
331        let source_is_up = |res: i64| {
332            self.source_is_up.set(res);
333        };
334
335        let splits = {
336            if let Some(debug_splits) = &self.debug_splits {
337                debug_splits.clone()
338            } else {
339                self.enumerator.list_splits().await.inspect_err(|_| {
340                    source_is_up(0);
341                    self.fail_cnt += 1;
342                })?
343            }
344        };
345
346        source_is_up(1);
347        self.fail_cnt = 0;
348        let mut current_splits = self.current_splits.lock().await;
349        current_splits.splits.replace(
350            splits
351                .into_iter()
352                .map(|split| (split.id(), split))
353                .collect(),
354        );
355
356        Ok(())
357    }
358
359    async fn drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
360        self.enumerator.on_drop_fragments(fragment_ids).await?;
361        Ok(())
362    }
363
364    async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
365        self.enumerator.on_finish_backfill(fragment_ids).await?;
366        Ok(())
367    }
368}
369
370/// Handle for a running [`ConnectorSourceWorker`].
371pub struct ConnectorSourceWorkerHandle {
372    #[expect(dead_code)]
373    handle: JoinHandle<()>,
374    command_tx: UnboundedSender<SourceWorkerCommand>,
375    pub splits: SharedSplitMapRef,
376    pub enable_drop_split: bool,
377    pub enable_adaptive_splits: bool,
378}
379
380impl ConnectorSourceWorkerHandle {
381    pub fn get_enable_adaptive_splits(&self) -> bool {
382        self.enable_adaptive_splits
383    }
384
385    pub async fn discovered_splits(
386        &self,
387        source_id: SourceId,
388        actors: &HashSet<ActorId>,
389    ) -> MetaResult<BTreeMap<Arc<str>, SplitImpl>> {
390        // XXX: when is this None? Can we remove the Option?
391        let Some(mut discovered_splits) = self.splits.lock().await.splits.clone() else {
392            tracing::info!(
393                "The discover loop for source {} is not ready yet; we'll wait for the next run",
394                source_id
395            );
396            return Ok(BTreeMap::new());
397        };
398        if discovered_splits.is_empty() {
399            tracing::warn!("No splits discovered for source {}", source_id);
400        }
401
402        if self.enable_adaptive_splits {
403            // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment.
404            // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id.
405            // And prev splits record should be dropped via CN.
406
407            debug_assert!(self.enable_drop_split);
408            debug_assert!(discovered_splits.len() == 1);
409            discovered_splits =
410                fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?;
411        }
412
413        Ok(discovered_splits)
414    }
415
416    fn send_command(&self, command: SourceWorkerCommand) -> MetaResult<()> {
417        let cmd_str = format!("{:?}", command);
418        self.command_tx
419            .send(command)
420            .with_context(|| format!("failed to send {cmd_str} command to source worker"))?;
421        Ok(())
422    }
423
424    /// Force [`ConnectorSourceWorker::tick()`] to be called.
425    pub async fn force_tick(&self) -> MetaResult<()> {
426        let (tx, rx) = oneshot::channel();
427        self.send_command(SourceWorkerCommand::Tick(tx))?;
428        rx.await
429            .context("failed to receive tick command response from source worker")?
430            .context("source worker tick failed")?;
431        Ok(())
432    }
433
434    pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
435        tracing::debug!("drop_fragments: {:?}", fragment_ids);
436        if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
437            // ignore drop fragment error, just log it
438            tracing::warn!(error = %e.as_report(), "failed to drop fragments");
439        }
440    }
441
442    pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
443        tracing::debug!("finish_backfill: {:?}", fragment_ids);
444        if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
445            // ignore error, just log it
446            tracing::warn!(error = %e.as_report(), "failed to finish backfill");
447        }
448    }
449
450    pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
451        tracing::debug!("terminate: {:?}", dropped_fragments);
452        if let Some(dropped_fragments) = dropped_fragments {
453            self.drop_fragments(dropped_fragments.into_iter().collect());
454        }
455        if let Err(e) = self.send_command(SourceWorkerCommand::Terminate) {
456            // ignore terminate error, just log it
457            tracing::warn!(error = %e.as_report(), "failed to terminate source worker");
458        }
459    }
460}
461
462#[derive(educe::Educe)]
463#[educe(Debug)]
464pub enum SourceWorkerCommand {
465    /// Sync command to force [`ConnectorSourceWorker::tick()`] to be called.
466    Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
467    /// Async command to drop a fragment.
468    DropFragments(Vec<FragmentId>),
469    /// Async command to finish backfill.
470    FinishBackfill(Vec<FragmentId>),
471    /// Terminate the worker task.
472    Terminate,
473}