risingwave_meta/stream/
source_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32    ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
33    SplitMetaData, fill_adaptive_split,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::SplitState;
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::MetaResult;
49use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
50use crate::manager::MetadataManager;
51use crate::model::{ActorId, FragmentId, StreamJobFragments};
52use crate::rpc::metrics::MetaMetrics;
53
54pub type SourceManagerRef = Arc<SourceManager>;
55pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
56pub type DiscoveredSourceSplits = HashMap<SourceId, Vec<SplitImpl>>;
57pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
58// ALTER CONNECTOR parameters, specifying the new parameters to be set for each job_id (source_id/sink_id)
59pub type ConnectorPropsChange = HashMap<u32, HashMap<String, String>>;
60
61const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
62
63/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`worker::ConnectorSourceWorker::tick`]),
64/// and sends a split assignment command if split changes detected ([`Self::tick`]).
65pub struct SourceManager {
66    pub paused: Mutex<()>,
67    barrier_scheduler: BarrierScheduler,
68    core: Mutex<SourceManagerCore>,
69    pub metrics: Arc<MetaMetrics>,
70}
71pub struct SourceManagerCore {
72    metadata_manager: MetadataManager,
73
74    /// Managed source loops
75    managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
76    /// Fragments associated with each source
77    source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
78    /// `source_id` -> `(fragment_id, upstream_fragment_id)`
79    backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
80
81    /// Splits assigned per actor,
82    /// incl. both `Source` and `SourceBackfill`.
83    actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
84}
85
86pub struct SourceManagerRunningInfo {
87    pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
88    pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
89    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
90}
91
92impl SourceManagerCore {
93    fn new(
94        metadata_manager: MetadataManager,
95        managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
96        source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
97        backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
98        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
99    ) -> Self {
100        Self {
101            metadata_manager,
102            managed_sources,
103            source_fragments,
104            backfill_fragments,
105            actor_splits,
106        }
107    }
108
109    /// Updates states after all kinds of source change.
110    pub fn apply_source_change(&mut self, source_change: SourceChange) {
111        let mut added_source_fragments = Default::default();
112        let mut added_backfill_fragments = Default::default();
113        let mut finished_backfill_fragments = Default::default();
114        let mut split_assignment = Default::default();
115        let mut dropped_actors = Default::default();
116        let mut fragment_replacements = Default::default();
117        let mut dropped_source_fragments = Default::default();
118        let mut dropped_source_ids = Default::default();
119        let mut recreate_source_id_map_new_props: Vec<(u32, HashMap<String, String>)> =
120            Default::default();
121
122        match source_change {
123            SourceChange::CreateJob {
124                added_source_fragments: added_source_fragments_,
125                added_backfill_fragments: added_backfill_fragments_,
126                split_assignment: split_assignment_,
127            } => {
128                added_source_fragments = added_source_fragments_;
129                added_backfill_fragments = added_backfill_fragments_;
130                split_assignment = split_assignment_;
131            }
132            SourceChange::CreateJobFinished {
133                finished_backfill_fragments: finished_backfill_fragments_,
134            } => {
135                finished_backfill_fragments = finished_backfill_fragments_;
136            }
137            SourceChange::SplitChange(split_assignment_) => {
138                split_assignment = split_assignment_;
139            }
140            SourceChange::DropMv {
141                dropped_source_fragments: dropped_source_fragments_,
142                dropped_actors: dropped_actors_,
143            } => {
144                dropped_source_fragments = dropped_source_fragments_;
145                dropped_actors = dropped_actors_;
146            }
147            SourceChange::ReplaceJob {
148                dropped_source_fragments: dropped_source_fragments_,
149                dropped_actors: dropped_actors_,
150                added_source_fragments: added_source_fragments_,
151                split_assignment: split_assignment_,
152                fragment_replacements: fragment_replacements_,
153            } => {
154                dropped_source_fragments = dropped_source_fragments_;
155                dropped_actors = dropped_actors_;
156                added_source_fragments = added_source_fragments_;
157                split_assignment = split_assignment_;
158                fragment_replacements = fragment_replacements_;
159            }
160            SourceChange::DropSource {
161                dropped_source_ids: dropped_source_ids_,
162            } => {
163                dropped_source_ids = dropped_source_ids_;
164            }
165            SourceChange::Reschedule {
166                split_assignment: split_assignment_,
167                dropped_actors: dropped_actors_,
168            } => {
169                split_assignment = split_assignment_;
170                dropped_actors = dropped_actors_;
171            }
172            SourceChange::UpdateSourceProps {
173                source_id_map_new_props,
174            } => {
175                for (source_id, new_props) in source_id_map_new_props {
176                    recreate_source_id_map_new_props.push((source_id, new_props));
177                }
178            }
179        }
180
181        for source_id in dropped_source_ids {
182            let dropped_fragments = self.source_fragments.remove(&source_id);
183
184            if let Some(handle) = self.managed_sources.remove(&source_id) {
185                handle.terminate(dropped_fragments);
186            }
187            if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
188                // TODO: enable this assertion after we implemented cleanup for backfill fragments
189                // debug_assert!(
190                //     fragments.is_empty(),
191                //     "when dropping source, there should be no backfill fragments, got: {:?}",
192                //     fragments
193                // );
194            }
195        }
196
197        for (source_id, fragments) in added_source_fragments {
198            self.source_fragments
199                .entry(source_id)
200                .or_default()
201                .extend(fragments);
202        }
203
204        for (source_id, fragments) in added_backfill_fragments {
205            self.backfill_fragments
206                .entry(source_id)
207                .or_default()
208                .extend(fragments);
209        }
210
211        for (source_id, fragments) in finished_backfill_fragments {
212            let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
213                panic!(
214                    "source {} not found when adding backfill fragments {:?}",
215                    source_id, fragments
216                );
217            });
218            handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
219        }
220
221        for (_, actor_splits) in split_assignment {
222            for (actor_id, splits) in actor_splits {
223                // override previous splits info
224                self.actor_splits.insert(actor_id, splits);
225            }
226        }
227
228        for actor_id in dropped_actors {
229            self.actor_splits.remove(&actor_id);
230        }
231
232        for (source_id, fragment_ids) in dropped_source_fragments {
233            self.drop_source_fragments(Some(source_id), fragment_ids);
234        }
235
236        for (old_fragment_id, new_fragment_id) in fragment_replacements {
237            // TODO: add source_id to the fragment_replacements to avoid iterating all sources
238            self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
239
240            for fragment_ids in self.backfill_fragments.values_mut() {
241                let mut new_backfill_fragment_ids = fragment_ids.clone();
242                for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
243                    assert_ne!(
244                        fragment_id, upstream_fragment_id,
245                        "backfill fragment should not be replaced"
246                    );
247                    if *upstream_fragment_id == old_fragment_id {
248                        new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
249                        new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
250                    }
251                }
252                *fragment_ids = new_backfill_fragment_ids;
253            }
254        }
255
256        for (source_id, new_props) in recreate_source_id_map_new_props {
257            tracing::info!("recreate source {source_id} in source manager");
258            if let Some(handle) = self.managed_sources.get_mut(&(source_id as _)) {
259                // the update here should not involve fragments change and split change
260                // Or we need to drop and recreate the source worker instead of updating inplace
261                let props_wrapper =
262                    WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
263                let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); // already checked when sending barrier
264                handle.update_props(props);
265            }
266        }
267    }
268
269    fn drop_source_fragments(
270        &mut self,
271        source_id: Option<SourceId>,
272        dropped_fragment_ids: BTreeSet<FragmentId>,
273    ) {
274        if let Some(source_id) = source_id {
275            if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
276                let mut dropped_ids = vec![];
277                let managed_fragment_ids = entry.get_mut();
278                for fragment_id in &dropped_fragment_ids {
279                    managed_fragment_ids.remove(fragment_id);
280                    dropped_ids.push(*fragment_id);
281                }
282                if let Some(handle) = self.managed_sources.get(&source_id) {
283                    handle.drop_fragments(dropped_ids);
284                } else {
285                    panic_if_debug!(
286                        "source {source_id} not found when dropping fragment {dropped_ids:?}",
287                    );
288                }
289                if managed_fragment_ids.is_empty() {
290                    entry.remove();
291                }
292            }
293        } else {
294            for (source_id, fragment_ids) in &mut self.source_fragments {
295                let mut dropped_ids = vec![];
296                for fragment_id in &dropped_fragment_ids {
297                    if fragment_ids.remove(fragment_id) {
298                        dropped_ids.push(*fragment_id);
299                    }
300                }
301                if !dropped_ids.is_empty() {
302                    if let Some(handle) = self.managed_sources.get(source_id) {
303                        handle.drop_fragments(dropped_ids);
304                    } else {
305                        panic_if_debug!(
306                            "source {source_id} not found when dropping fragment {dropped_ids:?}",
307                        );
308                    }
309                }
310            }
311        }
312    }
313
314    async fn update_source_splits(&self, source_id: SourceId) -> MetaResult<()> {
315        let handle_ref = self.managed_sources.get(&source_id).unwrap();
316
317        let discovered_splits = handle_ref.splits.lock().await.splits.clone();
318
319        if let Some(splits) = discovered_splits {
320            let source_splits =
321                HashMap::from([(source_id as _, splits.into_values().collect_vec())]);
322            self.metadata_manager
323                .update_source_splits(&source_splits)
324                .await?;
325        }
326        Ok(())
327    }
328}
329
330impl SourceManager {
331    const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
332
333    pub async fn new(
334        barrier_scheduler: BarrierScheduler,
335        metadata_manager: MetadataManager,
336        metrics: Arc<MetaMetrics>,
337    ) -> MetaResult<Self> {
338        let mut managed_sources = HashMap::new();
339        {
340            let sources = metadata_manager.list_sources().await?;
341            for source in sources {
342                create_source_worker_async(source, &mut managed_sources, metrics.clone())?
343            }
344        }
345
346        let source_fragments = metadata_manager
347            .catalog_controller
348            .load_source_fragment_ids()
349            .await?
350            .into_iter()
351            .map(|(source_id, fragment_ids)| {
352                (
353                    source_id as SourceId,
354                    fragment_ids.into_iter().map(|id| id as _).collect(),
355                )
356            })
357            .collect();
358        let backfill_fragments = metadata_manager
359            .catalog_controller
360            .load_backfill_fragment_ids()
361            .await?
362            .into_iter()
363            .map(|(source_id, fragment_ids)| {
364                (
365                    source_id as SourceId,
366                    fragment_ids
367                        .into_iter()
368                        .map(|(id, up_id)| (id as _, up_id as _))
369                        .collect(),
370                )
371            })
372            .collect();
373        let actor_splits = metadata_manager
374            .catalog_controller
375            .load_actor_splits()
376            .await?
377            .into_iter()
378            .map(|(actor_id, splits)| {
379                (
380                    actor_id as ActorId,
381                    splits
382                        .to_protobuf()
383                        .splits
384                        .iter()
385                        .map(|split| SplitImpl::try_from(split).unwrap())
386                        .collect(),
387                )
388            })
389            .collect();
390
391        let core = Mutex::new(SourceManagerCore::new(
392            metadata_manager,
393            managed_sources,
394            source_fragments,
395            backfill_fragments,
396            actor_splits,
397        ));
398
399        Ok(Self {
400            barrier_scheduler,
401            core,
402            paused: Mutex::new(()),
403            metrics,
404        })
405    }
406
407    pub async fn validate_source_once(
408        &self,
409        source_id: u32,
410        new_source_props: WithOptionsSecResolved,
411    ) -> MetaResult<()> {
412        let props = ConnectorProperties::extract(new_source_props, false).unwrap();
413
414        {
415            let mut enumerator = props
416                .create_split_enumerator(Arc::new(SourceEnumeratorContext {
417                    metrics: self.metrics.source_enumerator_metrics.clone(),
418                    info: SourceEnumeratorInfo { source_id },
419                }))
420                .await
421                .context("failed to create SplitEnumerator")?;
422
423            let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
424                .await
425                .context("failed to list splits")??;
426        }
427        Ok(())
428    }
429
430    /// For replacing job (alter table/source, create sink into table).
431    #[await_tree::instrument]
432    pub async fn handle_replace_job(
433        &self,
434        dropped_job_fragments: &StreamJobFragments,
435        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
436        split_assignment: SplitAssignment,
437        replace_plan: &ReplaceStreamJobPlan,
438    ) {
439        // Extract the fragments that include source operators.
440        let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
441
442        let fragments = &dropped_job_fragments.fragments;
443
444        let dropped_actors = dropped_source_fragments
445            .values()
446            .flatten()
447            .flat_map(|fragment_id| fragments.get(fragment_id).unwrap().actors.iter())
448            .map(|actor| actor.actor_id)
449            .collect::<HashSet<_>>();
450
451        self.apply_source_change(SourceChange::ReplaceJob {
452            dropped_source_fragments,
453            dropped_actors,
454            added_source_fragments,
455            split_assignment,
456            fragment_replacements: replace_plan.fragment_replacements(),
457        })
458        .await;
459    }
460
461    /// Updates states after all kinds of source change.
462    /// e.g., split change (`post_collect` barrier) or scaling (`post_apply_reschedule`).
463    #[await_tree::instrument("apply_source_change({source_change})")]
464    pub async fn apply_source_change(&self, source_change: SourceChange) {
465        let mut core = self.core.lock().await;
466        core.apply_source_change(source_change);
467    }
468
469    /// create and register connector worker for source.
470    #[await_tree::instrument("register_source({})", source.name)]
471    pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
472        tracing::debug!("register_source: {}", source.get_id());
473        let mut core = self.core.lock().await;
474        let source_id = source.get_id() as _;
475        if core.managed_sources.contains_key(&source_id) {
476            tracing::warn!("source {} already registered", source_id);
477            return Ok(());
478        }
479
480        let handle = create_source_worker(source, self.metrics.clone())
481            .await
482            .context("failed to create source worker")?;
483        core.managed_sources.insert(source_id, handle);
484        core.update_source_splits(source_id).await?;
485        Ok(())
486    }
487
488    /// register connector worker for source.
489    pub async fn register_source_with_handle(
490        &self,
491        source_id: SourceId,
492        handle: ConnectorSourceWorkerHandle,
493    ) -> MetaResult<()> {
494        let mut core = self.core.lock().await;
495        if core.managed_sources.contains_key(&source_id) {
496            tracing::warn!("source {} already registered", source_id);
497            return Ok(());
498        }
499        core.managed_sources.insert(source_id, handle);
500        core.update_source_splits(source_id).await?;
501
502        Ok(())
503    }
504
505    pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
506        let core = self.core.lock().await;
507        core.actor_splits.clone()
508    }
509
510    pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
511        let core = self.core.lock().await;
512        SourceManagerRunningInfo {
513            source_fragments: core.source_fragments.clone(),
514            backfill_fragments: core.backfill_fragments.clone(),
515            actor_splits: core.actor_splits.clone(),
516        }
517    }
518
519    /// Checks whether the external source metadata has changed, and sends a split assignment command
520    /// if it has.
521    ///
522    /// This is also how a newly created `SourceExecutor` is initialized.
523    /// (force `tick` in `Self::create_source_worker`)
524    ///
525    /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change`
526    /// to update states in `SourceManager`.
527    async fn tick(&self) -> MetaResult<()> {
528        let split_states = {
529            let core_guard = self.core.lock().await;
530            core_guard.reassign_splits().await?
531        };
532
533        for (database_id, split_state) in split_states {
534            if !split_state.split_assignment.is_empty() {
535                let command = Command::SourceChangeSplit(split_state);
536                tracing::info!(command = ?command, "pushing down split assignment command");
537                self.barrier_scheduler
538                    .run_command(database_id, command)
539                    .await?;
540            }
541        }
542
543        Ok(())
544    }
545
546    pub async fn run(&self) -> MetaResult<()> {
547        let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
548        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
549        loop {
550            ticker.tick().await;
551            let _pause_guard = self.paused.lock().await;
552            if let Err(e) = self.tick().await {
553                tracing::error!(
554                    error = %e.as_report(),
555                    "error happened while running source manager tick",
556                );
557            }
558        }
559    }
560
561    /// Pause the tick loop in source manager until the returned guard is dropped.
562    pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
563        tracing::debug!("pausing tick lock in source manager");
564        self.paused.lock().await
565    }
566}
567
568#[derive(strum::Display)]
569pub enum SourceChange {
570    /// `CREATE SOURCE` (shared), or `CREATE MV`.
571    /// This is applied after the job is successfully created (`post_collect` barrier).
572    CreateJob {
573        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
574        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
575        added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
576        split_assignment: SplitAssignment,
577    },
578    UpdateSourceProps {
579        // the new properties to be set for each source_id
580        // and the props should not affect split assignment and fragments
581        source_id_map_new_props: HashMap<u32, HashMap<String, String>>,
582    },
583    /// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
584    /// This is applied after `wait_streaming_job_finished`.
585    /// XXX: Should we merge `CreateJob` into this?
586    CreateJobFinished {
587        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
588        finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
589    },
590    SplitChange(SplitAssignment),
591    /// `DROP SOURCE` or `DROP MV`
592    DropSource {
593        dropped_source_ids: Vec<SourceId>,
594    },
595    DropMv {
596        // FIXME: we should consider source backfill fragments here for MV on shared source.
597        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
598        dropped_actors: HashSet<ActorId>,
599    },
600    ReplaceJob {
601        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
602        dropped_actors: HashSet<ActorId>,
603
604        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
605        split_assignment: SplitAssignment,
606        fragment_replacements: HashMap<FragmentId, FragmentId>,
607    },
608    Reschedule {
609        split_assignment: SplitAssignment,
610        dropped_actors: HashSet<ActorId>,
611    },
612}
613
614pub fn build_actor_connector_splits(
615    splits: &HashMap<ActorId, Vec<SplitImpl>>,
616) -> HashMap<u32, ConnectorSplits> {
617    splits
618        .iter()
619        .map(|(&actor_id, splits)| {
620            (
621                actor_id,
622                ConnectorSplits {
623                    splits: splits.iter().map(ConnectorSplit::from).collect(),
624                },
625            )
626        })
627        .collect()
628}
629
630pub fn build_actor_split_impls(
631    actor_splits: &HashMap<u32, ConnectorSplits>,
632) -> HashMap<ActorId, Vec<SplitImpl>> {
633    actor_splits
634        .iter()
635        .map(|(actor_id, ConnectorSplits { splits })| {
636            (
637                *actor_id,
638                splits
639                    .iter()
640                    .map(|split| SplitImpl::try_from(split).unwrap())
641                    .collect(),
642            )
643        })
644        .collect()
645}