risingwave_meta/stream/
source_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::panic_if_debug;
28use risingwave_connector::WithOptionsSecResolved;
29use risingwave_connector::error::ConnectorResult;
30use risingwave_connector::source::{
31    ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
32    SplitMetaData, fill_adaptive_split,
33};
34use risingwave_meta_model::SourceId;
35use risingwave_pb::catalog::Source;
36use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{Mutex, MutexGuard, oneshot};
40use tokio::task::JoinHandle;
41use tokio::time::MissedTickBehavior;
42use tokio::{select, time};
43pub use worker::create_source_worker;
44use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
45
46use crate::MetaResult;
47use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
48use crate::manager::MetadataManager;
49use crate::model::{ActorId, FragmentId, StreamJobFragments};
50use crate::rpc::metrics::MetaMetrics;
51
52pub type SourceManagerRef = Arc<SourceManager>;
53pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
54pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
55// ALTER CONNECTOR parameters, specifying the new parameters to be set for each connector_id
56pub type ConnectorPropsChange = HashMap<u32, HashMap<String, String>>;
57
58/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`worker::ConnectorSourceWorker::tick`]),
59/// and sends a split assignment command if split changes detected ([`Self::tick`]).
60pub struct SourceManager {
61    pub paused: Mutex<()>,
62    barrier_scheduler: BarrierScheduler,
63    core: Mutex<SourceManagerCore>,
64    pub metrics: Arc<MetaMetrics>,
65}
66pub struct SourceManagerCore {
67    metadata_manager: MetadataManager,
68
69    /// Managed source loops
70    managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
71    /// Fragments associated with each source
72    source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
73    /// `source_id` -> `(fragment_id, upstream_fragment_id)`
74    backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
75
76    /// Splits assigned per actor,
77    /// incl. both `Source` and `SourceBackfill`.
78    actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
79}
80
81pub struct SourceManagerRunningInfo {
82    pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
83    pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
84    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
85}
86
87impl SourceManagerCore {
88    fn new(
89        metadata_manager: MetadataManager,
90        managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
91        source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
92        backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
93        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
94    ) -> Self {
95        Self {
96            metadata_manager,
97            managed_sources,
98            source_fragments,
99            backfill_fragments,
100            actor_splits,
101        }
102    }
103
104    /// Updates states after all kinds of source change.
105    pub fn apply_source_change(&mut self, source_change: SourceChange) {
106        let mut added_source_fragments = Default::default();
107        let mut added_backfill_fragments = Default::default();
108        let mut finished_backfill_fragments = Default::default();
109        let mut split_assignment = Default::default();
110        let mut dropped_actors = Default::default();
111        let mut fragment_replacements = Default::default();
112        let mut dropped_source_fragments = Default::default();
113        let mut dropped_source_ids = Default::default();
114
115        match source_change {
116            SourceChange::CreateJob {
117                added_source_fragments: added_source_fragments_,
118                added_backfill_fragments: added_backfill_fragments_,
119                split_assignment: split_assignment_,
120            } => {
121                added_source_fragments = added_source_fragments_;
122                added_backfill_fragments = added_backfill_fragments_;
123                split_assignment = split_assignment_;
124            }
125            SourceChange::CreateJobFinished {
126                finished_backfill_fragments: finished_backfill_fragments_,
127            } => {
128                finished_backfill_fragments = finished_backfill_fragments_;
129            }
130            SourceChange::SplitChange(split_assignment_) => {
131                split_assignment = split_assignment_;
132            }
133            SourceChange::DropMv {
134                dropped_source_fragments: dropped_source_fragments_,
135                dropped_actors: dropped_actors_,
136            } => {
137                dropped_source_fragments = dropped_source_fragments_;
138                dropped_actors = dropped_actors_;
139            }
140            SourceChange::ReplaceJob {
141                dropped_source_fragments: dropped_source_fragments_,
142                dropped_actors: dropped_actors_,
143                added_source_fragments: added_source_fragments_,
144                split_assignment: split_assignment_,
145                fragment_replacements: fragment_replacements_,
146            } => {
147                dropped_source_fragments = dropped_source_fragments_;
148                dropped_actors = dropped_actors_;
149                added_source_fragments = added_source_fragments_;
150                split_assignment = split_assignment_;
151                fragment_replacements = fragment_replacements_;
152            }
153            SourceChange::DropSource {
154                dropped_source_ids: dropped_source_ids_,
155            } => {
156                dropped_source_ids = dropped_source_ids_;
157            }
158            SourceChange::Reschedule {
159                split_assignment: split_assignment_,
160                dropped_actors: dropped_actors_,
161            } => {
162                split_assignment = split_assignment_;
163                dropped_actors = dropped_actors_;
164            }
165        }
166
167        for source_id in dropped_source_ids {
168            let dropped_fragments = self.source_fragments.remove(&source_id);
169
170            if let Some(handle) = self.managed_sources.remove(&source_id) {
171                handle.terminate(dropped_fragments);
172            }
173            if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
174                // TODO: enable this assertion after we implemented cleanup for backfill fragments
175                // debug_assert!(
176                //     fragments.is_empty(),
177                //     "when dropping source, there should be no backfill fragments, got: {:?}",
178                //     fragments
179                // );
180            }
181        }
182
183        for (source_id, fragments) in added_source_fragments {
184            self.source_fragments
185                .entry(source_id)
186                .or_default()
187                .extend(fragments);
188        }
189
190        for (source_id, fragments) in added_backfill_fragments {
191            self.backfill_fragments
192                .entry(source_id)
193                .or_default()
194                .extend(fragments);
195        }
196
197        for (source_id, fragments) in finished_backfill_fragments {
198            let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
199                panic!(
200                    "source {} not found when adding backfill fragments {:?}",
201                    source_id, fragments
202                );
203            });
204            handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
205        }
206
207        for (_, actor_splits) in split_assignment {
208            for (actor_id, splits) in actor_splits {
209                // override previous splits info
210                self.actor_splits.insert(actor_id, splits);
211            }
212        }
213
214        for actor_id in dropped_actors {
215            self.actor_splits.remove(&actor_id);
216        }
217
218        for (source_id, fragment_ids) in dropped_source_fragments {
219            self.drop_source_fragments(Some(source_id), fragment_ids);
220        }
221
222        for (old_fragment_id, new_fragment_id) in fragment_replacements {
223            // TODO: add source_id to the fragment_replacements to avoid iterating all sources
224            self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
225
226            for fragment_ids in self.backfill_fragments.values_mut() {
227                let mut new_backfill_fragment_ids = fragment_ids.clone();
228                for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
229                    assert_ne!(
230                        fragment_id, upstream_fragment_id,
231                        "backfill fragment should not be replaced"
232                    );
233                    if *upstream_fragment_id == old_fragment_id {
234                        new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
235                        new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
236                    }
237                }
238                *fragment_ids = new_backfill_fragment_ids;
239            }
240        }
241    }
242
243    fn drop_source_fragments(
244        &mut self,
245        source_id: Option<SourceId>,
246        dropped_fragment_ids: BTreeSet<FragmentId>,
247    ) {
248        if let Some(source_id) = source_id {
249            if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
250                let mut dropped_ids = vec![];
251                let managed_fragment_ids = entry.get_mut();
252                for fragment_id in &dropped_fragment_ids {
253                    managed_fragment_ids.remove(fragment_id);
254                    dropped_ids.push(*fragment_id);
255                }
256                if let Some(handle) = self.managed_sources.get(&source_id) {
257                    handle.drop_fragments(dropped_ids);
258                } else {
259                    panic_if_debug!(
260                        "source {source_id} not found when dropping fragment {dropped_ids:?}",
261                    );
262                }
263                if managed_fragment_ids.is_empty() {
264                    entry.remove();
265                }
266            }
267        } else {
268            for (source_id, fragment_ids) in &mut self.source_fragments {
269                let mut dropped_ids = vec![];
270                for fragment_id in &dropped_fragment_ids {
271                    if fragment_ids.remove(fragment_id) {
272                        dropped_ids.push(*fragment_id);
273                    }
274                }
275                if !dropped_ids.is_empty() {
276                    if let Some(handle) = self.managed_sources.get(source_id) {
277                        handle.drop_fragments(dropped_ids);
278                    } else {
279                        panic_if_debug!(
280                            "source {source_id} not found when dropping fragment {dropped_ids:?}",
281                        );
282                    }
283                }
284            }
285        }
286    }
287}
288
289impl SourceManager {
290    const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
291
292    pub async fn new(
293        barrier_scheduler: BarrierScheduler,
294        metadata_manager: MetadataManager,
295        metrics: Arc<MetaMetrics>,
296    ) -> MetaResult<Self> {
297        let mut managed_sources = HashMap::new();
298        {
299            let sources = metadata_manager.list_sources().await?;
300            for source in sources {
301                create_source_worker_async(source, &mut managed_sources, metrics.clone())?
302            }
303        }
304
305        let source_fragments = metadata_manager
306            .catalog_controller
307            .load_source_fragment_ids()
308            .await?
309            .into_iter()
310            .map(|(source_id, fragment_ids)| {
311                (
312                    source_id as SourceId,
313                    fragment_ids.into_iter().map(|id| id as _).collect(),
314                )
315            })
316            .collect();
317        let backfill_fragments = metadata_manager
318            .catalog_controller
319            .load_backfill_fragment_ids()
320            .await?
321            .into_iter()
322            .map(|(source_id, fragment_ids)| {
323                (
324                    source_id as SourceId,
325                    fragment_ids
326                        .into_iter()
327                        .map(|(id, up_id)| (id as _, up_id as _))
328                        .collect(),
329                )
330            })
331            .collect();
332        let actor_splits = metadata_manager
333            .catalog_controller
334            .load_actor_splits()
335            .await?
336            .into_iter()
337            .map(|(actor_id, splits)| {
338                (
339                    actor_id as ActorId,
340                    splits
341                        .to_protobuf()
342                        .splits
343                        .iter()
344                        .map(|split| SplitImpl::try_from(split).unwrap())
345                        .collect(),
346                )
347            })
348            .collect();
349
350        let core = Mutex::new(SourceManagerCore::new(
351            metadata_manager,
352            managed_sources,
353            source_fragments,
354            backfill_fragments,
355            actor_splits,
356        ));
357
358        Ok(Self {
359            barrier_scheduler,
360            core,
361            paused: Mutex::new(()),
362            metrics,
363        })
364    }
365
366    /// For replacing job (alter table/source, create sink into table).
367    #[await_tree::instrument]
368    pub async fn handle_replace_job(
369        &self,
370        dropped_job_fragments: &StreamJobFragments,
371        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
372        split_assignment: SplitAssignment,
373        replace_plan: &ReplaceStreamJobPlan,
374    ) {
375        // Extract the fragments that include source operators.
376        let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
377
378        let fragments = &dropped_job_fragments.fragments;
379
380        let dropped_actors = dropped_source_fragments
381            .values()
382            .flatten()
383            .flat_map(|fragment_id| fragments.get(fragment_id).unwrap().actors.iter())
384            .map(|actor| actor.actor_id)
385            .collect::<HashSet<_>>();
386
387        self.apply_source_change(SourceChange::ReplaceJob {
388            dropped_source_fragments,
389            dropped_actors,
390            added_source_fragments,
391            split_assignment,
392            fragment_replacements: replace_plan.fragment_replacements(),
393        })
394        .await;
395    }
396
397    /// Updates states after all kinds of source change.
398    /// e.g., split change (`post_collect` barrier) or scaling (`post_apply_reschedule`).
399    #[await_tree::instrument("apply_source_change({source_change})")]
400    pub async fn apply_source_change(&self, source_change: SourceChange) {
401        let mut core = self.core.lock().await;
402        core.apply_source_change(source_change);
403    }
404
405    /// create and register connector worker for source.
406    #[await_tree::instrument("register_source({})", source.name)]
407    pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
408        tracing::debug!("register_source: {}", source.get_id());
409        let mut core = self.core.lock().await;
410        if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) {
411            let handle = create_source_worker(source, self.metrics.clone())
412                .await
413                .context("failed to create source worker")?;
414            e.insert(handle);
415        } else {
416            tracing::warn!("source {} already registered", source.get_id());
417        }
418        Ok(())
419    }
420
421    /// register connector worker for source.
422    pub async fn register_source_with_handle(
423        &self,
424        source_id: SourceId,
425        handle: ConnectorSourceWorkerHandle,
426    ) {
427        let mut core = self.core.lock().await;
428        if let Entry::Vacant(e) = core.managed_sources.entry(source_id) {
429            e.insert(handle);
430        } else {
431            tracing::warn!("source {} already registered", source_id);
432        }
433    }
434
435    pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
436        let core = self.core.lock().await;
437        core.actor_splits.clone()
438    }
439
440    pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
441        let core = self.core.lock().await;
442        SourceManagerRunningInfo {
443            source_fragments: core.source_fragments.clone(),
444            backfill_fragments: core.backfill_fragments.clone(),
445            actor_splits: core.actor_splits.clone(),
446        }
447    }
448
449    /// Checks whether the external source metadata has changed, and sends a split assignment command
450    /// if it has.
451    ///
452    /// This is also how a newly created `SourceExecutor` is initialized.
453    /// (force `tick` in `Self::create_source_worker`)
454    ///
455    /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change`
456    /// to update states in `SourceManager`.
457    async fn tick(&self) -> MetaResult<()> {
458        let split_assignment = {
459            let core_guard = self.core.lock().await;
460            core_guard.reassign_splits().await?
461        };
462
463        for (database_id, split_assignment) in split_assignment {
464            if !split_assignment.is_empty() {
465                let command = Command::SourceChangeSplit(split_assignment);
466                tracing::info!(command = ?command, "pushing down split assignment command");
467                self.barrier_scheduler
468                    .run_command(database_id, command)
469                    .await?;
470            }
471        }
472
473        Ok(())
474    }
475
476    pub async fn run(&self) -> MetaResult<()> {
477        let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
478        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
479        loop {
480            ticker.tick().await;
481            let _pause_guard = self.paused.lock().await;
482            if let Err(e) = self.tick().await {
483                tracing::error!(
484                    error = %e.as_report(),
485                    "error happened while running source manager tick",
486                );
487            }
488        }
489    }
490
491    /// Pause the tick loop in source manager until the returned guard is dropped.
492    pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
493        tracing::debug!("pausing tick lock in source manager");
494        self.paused.lock().await
495    }
496}
497
498#[derive(strum::Display)]
499pub enum SourceChange {
500    /// `CREATE SOURCE` (shared), or `CREATE MV`.
501    /// This is applied after the job is successfully created (`post_collect` barrier).
502    CreateJob {
503        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
504        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
505        added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
506        split_assignment: SplitAssignment,
507    },
508    /// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
509    /// This is applied after `wait_streaming_job_finished`.
510    /// XXX: Should we merge `CreateJob` into this?
511    CreateJobFinished {
512        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
513        finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
514    },
515    SplitChange(SplitAssignment),
516    /// `DROP SOURCE` or `DROP MV`
517    DropSource {
518        dropped_source_ids: Vec<SourceId>,
519    },
520    DropMv {
521        // FIXME: we should consider source backfill fragments here for MV on shared source.
522        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
523        dropped_actors: HashSet<ActorId>,
524    },
525    ReplaceJob {
526        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
527        dropped_actors: HashSet<ActorId>,
528
529        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
530        split_assignment: SplitAssignment,
531        fragment_replacements: HashMap<FragmentId, FragmentId>,
532    },
533    Reschedule {
534        split_assignment: SplitAssignment,
535        dropped_actors: HashSet<ActorId>,
536    },
537}
538
539pub fn build_actor_connector_splits(
540    splits: &HashMap<ActorId, Vec<SplitImpl>>,
541) -> HashMap<u32, ConnectorSplits> {
542    splits
543        .iter()
544        .map(|(&actor_id, splits)| {
545            (
546                actor_id,
547                ConnectorSplits {
548                    splits: splits.iter().map(ConnectorSplit::from).collect(),
549                },
550            )
551        })
552        .collect()
553}
554
555pub fn build_actor_split_impls(
556    actor_splits: &HashMap<u32, ConnectorSplits>,
557) -> HashMap<ActorId, Vec<SplitImpl>> {
558    actor_splits
559        .iter()
560        .map(|(actor_id, ConnectorSplits { splits })| {
561            (
562                *actor_id,
563                splits
564                    .iter()
565                    .map(|split| SplitImpl::try_from(split).unwrap())
566                    .collect(),
567            )
568        })
569        .collect()
570}