risingwave_meta/stream/
source_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::panic_if_debug;
28use risingwave_connector::WithOptionsSecResolved;
29use risingwave_connector::error::ConnectorResult;
30use risingwave_connector::source::{
31    ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
32    SplitMetaData, fill_adaptive_split,
33};
34use risingwave_meta_model::SourceId;
35use risingwave_pb::catalog::Source;
36use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{Mutex, MutexGuard, oneshot};
40use tokio::task::JoinHandle;
41use tokio::time::MissedTickBehavior;
42use tokio::{select, time};
43pub use worker::create_source_worker;
44use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
45
46use crate::MetaResult;
47use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
48use crate::manager::MetadataManager;
49use crate::model::{ActorId, FragmentId, StreamJobFragments};
50use crate::rpc::metrics::MetaMetrics;
51
52pub type SourceManagerRef = Arc<SourceManager>;
53pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
54pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
55// ALTER CONNECTOR parameters, specifying the new parameters to be set for each job_id (source_id/sink_id)
56pub type ConnectorPropsChange = HashMap<u32, HashMap<String, String>>;
57
58const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
59
60/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`worker::ConnectorSourceWorker::tick`]),
61/// and sends a split assignment command if split changes detected ([`Self::tick`]).
62pub struct SourceManager {
63    pub paused: Mutex<()>,
64    barrier_scheduler: BarrierScheduler,
65    core: Mutex<SourceManagerCore>,
66    pub metrics: Arc<MetaMetrics>,
67}
68pub struct SourceManagerCore {
69    metadata_manager: MetadataManager,
70
71    /// Managed source loops
72    managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
73    /// Fragments associated with each source
74    source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
75    /// `source_id` -> `(fragment_id, upstream_fragment_id)`
76    backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
77
78    /// Splits assigned per actor,
79    /// incl. both `Source` and `SourceBackfill`.
80    actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
81}
82
83pub struct SourceManagerRunningInfo {
84    pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
85    pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
86    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
87}
88
89impl SourceManagerCore {
90    fn new(
91        metadata_manager: MetadataManager,
92        managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
93        source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
94        backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
95        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
96    ) -> Self {
97        Self {
98            metadata_manager,
99            managed_sources,
100            source_fragments,
101            backfill_fragments,
102            actor_splits,
103        }
104    }
105
106    /// Updates states after all kinds of source change.
107    pub fn apply_source_change(&mut self, source_change: SourceChange) {
108        let mut added_source_fragments = Default::default();
109        let mut added_backfill_fragments = Default::default();
110        let mut finished_backfill_fragments = Default::default();
111        let mut split_assignment = Default::default();
112        let mut dropped_actors = Default::default();
113        let mut fragment_replacements = Default::default();
114        let mut dropped_source_fragments = Default::default();
115        let mut dropped_source_ids = Default::default();
116        let mut recreate_source_id_map_new_props: Vec<(u32, HashMap<String, String>)> =
117            Default::default();
118
119        match source_change {
120            SourceChange::CreateJob {
121                added_source_fragments: added_source_fragments_,
122                added_backfill_fragments: added_backfill_fragments_,
123                split_assignment: split_assignment_,
124            } => {
125                added_source_fragments = added_source_fragments_;
126                added_backfill_fragments = added_backfill_fragments_;
127                split_assignment = split_assignment_;
128            }
129            SourceChange::CreateJobFinished {
130                finished_backfill_fragments: finished_backfill_fragments_,
131            } => {
132                finished_backfill_fragments = finished_backfill_fragments_;
133            }
134            SourceChange::SplitChange(split_assignment_) => {
135                split_assignment = split_assignment_;
136            }
137            SourceChange::DropMv {
138                dropped_source_fragments: dropped_source_fragments_,
139                dropped_actors: dropped_actors_,
140            } => {
141                dropped_source_fragments = dropped_source_fragments_;
142                dropped_actors = dropped_actors_;
143            }
144            SourceChange::ReplaceJob {
145                dropped_source_fragments: dropped_source_fragments_,
146                dropped_actors: dropped_actors_,
147                added_source_fragments: added_source_fragments_,
148                split_assignment: split_assignment_,
149                fragment_replacements: fragment_replacements_,
150            } => {
151                dropped_source_fragments = dropped_source_fragments_;
152                dropped_actors = dropped_actors_;
153                added_source_fragments = added_source_fragments_;
154                split_assignment = split_assignment_;
155                fragment_replacements = fragment_replacements_;
156            }
157            SourceChange::DropSource {
158                dropped_source_ids: dropped_source_ids_,
159            } => {
160                dropped_source_ids = dropped_source_ids_;
161            }
162            SourceChange::Reschedule {
163                split_assignment: split_assignment_,
164                dropped_actors: dropped_actors_,
165            } => {
166                split_assignment = split_assignment_;
167                dropped_actors = dropped_actors_;
168            }
169            SourceChange::UpdateSourceProps {
170                source_id_map_new_props,
171            } => {
172                for (source_id, new_props) in source_id_map_new_props {
173                    recreate_source_id_map_new_props.push((source_id, new_props));
174                }
175            }
176        }
177
178        for source_id in dropped_source_ids {
179            let dropped_fragments = self.source_fragments.remove(&source_id);
180
181            if let Some(handle) = self.managed_sources.remove(&source_id) {
182                handle.terminate(dropped_fragments);
183            }
184            if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
185                // TODO: enable this assertion after we implemented cleanup for backfill fragments
186                // debug_assert!(
187                //     fragments.is_empty(),
188                //     "when dropping source, there should be no backfill fragments, got: {:?}",
189                //     fragments
190                // );
191            }
192        }
193
194        for (source_id, fragments) in added_source_fragments {
195            self.source_fragments
196                .entry(source_id)
197                .or_default()
198                .extend(fragments);
199        }
200
201        for (source_id, fragments) in added_backfill_fragments {
202            self.backfill_fragments
203                .entry(source_id)
204                .or_default()
205                .extend(fragments);
206        }
207
208        for (source_id, fragments) in finished_backfill_fragments {
209            let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
210                panic!(
211                    "source {} not found when adding backfill fragments {:?}",
212                    source_id, fragments
213                );
214            });
215            handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
216        }
217
218        for (_, actor_splits) in split_assignment {
219            for (actor_id, splits) in actor_splits {
220                // override previous splits info
221                self.actor_splits.insert(actor_id, splits);
222            }
223        }
224
225        for actor_id in dropped_actors {
226            self.actor_splits.remove(&actor_id);
227        }
228
229        for (source_id, fragment_ids) in dropped_source_fragments {
230            self.drop_source_fragments(Some(source_id), fragment_ids);
231        }
232
233        for (old_fragment_id, new_fragment_id) in fragment_replacements {
234            // TODO: add source_id to the fragment_replacements to avoid iterating all sources
235            self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
236
237            for fragment_ids in self.backfill_fragments.values_mut() {
238                let mut new_backfill_fragment_ids = fragment_ids.clone();
239                for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
240                    assert_ne!(
241                        fragment_id, upstream_fragment_id,
242                        "backfill fragment should not be replaced"
243                    );
244                    if *upstream_fragment_id == old_fragment_id {
245                        new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
246                        new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
247                    }
248                }
249                *fragment_ids = new_backfill_fragment_ids;
250            }
251        }
252
253        for (source_id, new_props) in recreate_source_id_map_new_props {
254            tracing::info!("recreate source {source_id} in source manager");
255            if let Some(handle) = self.managed_sources.get_mut(&(source_id as _)) {
256                // the update here should not involve fragments change and split change
257                // Or we need to drop and recreate the source worker instead of updating inplace
258                let props_wrapper =
259                    WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
260                let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); // already checked when sending barrier
261                handle.update_props(props);
262            }
263        }
264    }
265
266    fn drop_source_fragments(
267        &mut self,
268        source_id: Option<SourceId>,
269        dropped_fragment_ids: BTreeSet<FragmentId>,
270    ) {
271        if let Some(source_id) = source_id {
272            if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
273                let mut dropped_ids = vec![];
274                let managed_fragment_ids = entry.get_mut();
275                for fragment_id in &dropped_fragment_ids {
276                    managed_fragment_ids.remove(fragment_id);
277                    dropped_ids.push(*fragment_id);
278                }
279                if let Some(handle) = self.managed_sources.get(&source_id) {
280                    handle.drop_fragments(dropped_ids);
281                } else {
282                    panic_if_debug!(
283                        "source {source_id} not found when dropping fragment {dropped_ids:?}",
284                    );
285                }
286                if managed_fragment_ids.is_empty() {
287                    entry.remove();
288                }
289            }
290        } else {
291            for (source_id, fragment_ids) in &mut self.source_fragments {
292                let mut dropped_ids = vec![];
293                for fragment_id in &dropped_fragment_ids {
294                    if fragment_ids.remove(fragment_id) {
295                        dropped_ids.push(*fragment_id);
296                    }
297                }
298                if !dropped_ids.is_empty() {
299                    if let Some(handle) = self.managed_sources.get(source_id) {
300                        handle.drop_fragments(dropped_ids);
301                    } else {
302                        panic_if_debug!(
303                            "source {source_id} not found when dropping fragment {dropped_ids:?}",
304                        );
305                    }
306                }
307            }
308        }
309    }
310}
311
312impl SourceManager {
313    const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
314
315    pub async fn new(
316        barrier_scheduler: BarrierScheduler,
317        metadata_manager: MetadataManager,
318        metrics: Arc<MetaMetrics>,
319    ) -> MetaResult<Self> {
320        let mut managed_sources = HashMap::new();
321        {
322            let sources = metadata_manager.list_sources().await?;
323            for source in sources {
324                create_source_worker_async(source, &mut managed_sources, metrics.clone())?
325            }
326        }
327
328        let source_fragments = metadata_manager
329            .catalog_controller
330            .load_source_fragment_ids()
331            .await?
332            .into_iter()
333            .map(|(source_id, fragment_ids)| {
334                (
335                    source_id as SourceId,
336                    fragment_ids.into_iter().map(|id| id as _).collect(),
337                )
338            })
339            .collect();
340        let backfill_fragments = metadata_manager
341            .catalog_controller
342            .load_backfill_fragment_ids()
343            .await?
344            .into_iter()
345            .map(|(source_id, fragment_ids)| {
346                (
347                    source_id as SourceId,
348                    fragment_ids
349                        .into_iter()
350                        .map(|(id, up_id)| (id as _, up_id as _))
351                        .collect(),
352                )
353            })
354            .collect();
355        let actor_splits = metadata_manager
356            .catalog_controller
357            .load_actor_splits()
358            .await?
359            .into_iter()
360            .map(|(actor_id, splits)| {
361                (
362                    actor_id as ActorId,
363                    splits
364                        .to_protobuf()
365                        .splits
366                        .iter()
367                        .map(|split| SplitImpl::try_from(split).unwrap())
368                        .collect(),
369                )
370            })
371            .collect();
372
373        let core = Mutex::new(SourceManagerCore::new(
374            metadata_manager,
375            managed_sources,
376            source_fragments,
377            backfill_fragments,
378            actor_splits,
379        ));
380
381        Ok(Self {
382            barrier_scheduler,
383            core,
384            paused: Mutex::new(()),
385            metrics,
386        })
387    }
388
389    pub async fn validate_source_once(
390        &self,
391        source_id: u32,
392        new_source_props: WithOptionsSecResolved,
393    ) -> MetaResult<()> {
394        let props = ConnectorProperties::extract(new_source_props, false).unwrap();
395
396        {
397            let mut enumerator = props
398                .create_split_enumerator(Arc::new(SourceEnumeratorContext {
399                    metrics: self.metrics.source_enumerator_metrics.clone(),
400                    info: SourceEnumeratorInfo { source_id },
401                }))
402                .await
403                .context("failed to create SplitEnumerator")?;
404
405            let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
406                .await
407                .context("failed to list splits")??;
408        }
409        Ok(())
410    }
411
412    /// For replacing job (alter table/source, create sink into table).
413    #[await_tree::instrument]
414    pub async fn handle_replace_job(
415        &self,
416        dropped_job_fragments: &StreamJobFragments,
417        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
418        split_assignment: SplitAssignment,
419        replace_plan: &ReplaceStreamJobPlan,
420    ) {
421        // Extract the fragments that include source operators.
422        let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
423
424        let fragments = &dropped_job_fragments.fragments;
425
426        let dropped_actors = dropped_source_fragments
427            .values()
428            .flatten()
429            .flat_map(|fragment_id| fragments.get(fragment_id).unwrap().actors.iter())
430            .map(|actor| actor.actor_id)
431            .collect::<HashSet<_>>();
432
433        self.apply_source_change(SourceChange::ReplaceJob {
434            dropped_source_fragments,
435            dropped_actors,
436            added_source_fragments,
437            split_assignment,
438            fragment_replacements: replace_plan.fragment_replacements(),
439        })
440        .await;
441    }
442
443    /// Updates states after all kinds of source change.
444    /// e.g., split change (`post_collect` barrier) or scaling (`post_apply_reschedule`).
445    #[await_tree::instrument("apply_source_change({source_change})")]
446    pub async fn apply_source_change(&self, source_change: SourceChange) {
447        let mut core = self.core.lock().await;
448        core.apply_source_change(source_change);
449    }
450
451    /// create and register connector worker for source.
452    #[await_tree::instrument("register_source({})", source.name)]
453    pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
454        tracing::debug!("register_source: {}", source.get_id());
455        let mut core = self.core.lock().await;
456        if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) {
457            let handle = create_source_worker(source, self.metrics.clone())
458                .await
459                .context("failed to create source worker")?;
460            e.insert(handle);
461        } else {
462            tracing::warn!("source {} already registered", source.get_id());
463        }
464        Ok(())
465    }
466
467    /// register connector worker for source.
468    pub async fn register_source_with_handle(
469        &self,
470        source_id: SourceId,
471        handle: ConnectorSourceWorkerHandle,
472    ) {
473        let mut core = self.core.lock().await;
474        if let Entry::Vacant(e) = core.managed_sources.entry(source_id) {
475            e.insert(handle);
476        } else {
477            tracing::warn!("source {} already registered", source_id);
478        }
479    }
480
481    pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
482        let core = self.core.lock().await;
483        core.actor_splits.clone()
484    }
485
486    pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
487        let core = self.core.lock().await;
488        SourceManagerRunningInfo {
489            source_fragments: core.source_fragments.clone(),
490            backfill_fragments: core.backfill_fragments.clone(),
491            actor_splits: core.actor_splits.clone(),
492        }
493    }
494
495    /// Checks whether the external source metadata has changed, and sends a split assignment command
496    /// if it has.
497    ///
498    /// This is also how a newly created `SourceExecutor` is initialized.
499    /// (force `tick` in `Self::create_source_worker`)
500    ///
501    /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change`
502    /// to update states in `SourceManager`.
503    async fn tick(&self) -> MetaResult<()> {
504        let split_assignment = {
505            let core_guard = self.core.lock().await;
506            core_guard.reassign_splits().await?
507        };
508
509        for (database_id, split_assignment) in split_assignment {
510            if !split_assignment.is_empty() {
511                let command = Command::SourceChangeSplit(split_assignment);
512                tracing::info!(command = ?command, "pushing down split assignment command");
513                self.barrier_scheduler
514                    .run_command(database_id, command)
515                    .await?;
516            }
517        }
518
519        Ok(())
520    }
521
522    pub async fn run(&self) -> MetaResult<()> {
523        let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
524        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
525        loop {
526            ticker.tick().await;
527            let _pause_guard = self.paused.lock().await;
528            if let Err(e) = self.tick().await {
529                tracing::error!(
530                    error = %e.as_report(),
531                    "error happened while running source manager tick",
532                );
533            }
534        }
535    }
536
537    /// Pause the tick loop in source manager until the returned guard is dropped.
538    pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
539        tracing::debug!("pausing tick lock in source manager");
540        self.paused.lock().await
541    }
542}
543
544#[derive(strum::Display)]
545pub enum SourceChange {
546    /// `CREATE SOURCE` (shared), or `CREATE MV`.
547    /// This is applied after the job is successfully created (`post_collect` barrier).
548    CreateJob {
549        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
550        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
551        added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
552        split_assignment: SplitAssignment,
553    },
554    UpdateSourceProps {
555        // the new properties to be set for each source_id
556        // and the props should not affect split assignment and fragments
557        source_id_map_new_props: HashMap<u32, HashMap<String, String>>,
558    },
559    /// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
560    /// This is applied after `wait_streaming_job_finished`.
561    /// XXX: Should we merge `CreateJob` into this?
562    CreateJobFinished {
563        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
564        finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
565    },
566    SplitChange(SplitAssignment),
567    /// `DROP SOURCE` or `DROP MV`
568    DropSource {
569        dropped_source_ids: Vec<SourceId>,
570    },
571    DropMv {
572        // FIXME: we should consider source backfill fragments here for MV on shared source.
573        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
574        dropped_actors: HashSet<ActorId>,
575    },
576    ReplaceJob {
577        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
578        dropped_actors: HashSet<ActorId>,
579
580        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
581        split_assignment: SplitAssignment,
582        fragment_replacements: HashMap<FragmentId, FragmentId>,
583    },
584    Reschedule {
585        split_assignment: SplitAssignment,
586        dropped_actors: HashSet<ActorId>,
587    },
588}
589
590pub fn build_actor_connector_splits(
591    splits: &HashMap<ActorId, Vec<SplitImpl>>,
592) -> HashMap<u32, ConnectorSplits> {
593    splits
594        .iter()
595        .map(|(&actor_id, splits)| {
596            (
597                actor_id,
598                ConnectorSplits {
599                    splits: splits.iter().map(ConnectorSplit::from).collect(),
600                },
601            )
602        })
603        .collect()
604}
605
606pub fn build_actor_split_impls(
607    actor_splits: &HashMap<u32, ConnectorSplits>,
608) -> HashMap<ActorId, Vec<SplitImpl>> {
609    actor_splits
610        .iter()
611        .map(|(actor_id, ConnectorSplits { splits })| {
612            (
613                *actor_id,
614                splits
615                    .iter()
616                    .map(|split| SplitImpl::try_from(split).unwrap())
617                    .collect(),
618            )
619        })
620        .collect()
621}