risingwave_meta/stream/
source_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::panic_if_debug;
28use risingwave_connector::WithOptionsSecResolved;
29use risingwave_connector::error::ConnectorResult;
30use risingwave_connector::source::{
31    ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
32    SplitMetaData, fill_adaptive_split,
33};
34use risingwave_meta_model::SourceId;
35use risingwave_pb::catalog::Source;
36use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{Mutex, MutexGuard, oneshot};
40use tokio::task::JoinHandle;
41use tokio::time::MissedTickBehavior;
42use tokio::{select, time};
43pub use worker::create_source_worker;
44use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
45
46use crate::MetaResult;
47use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
48use crate::manager::MetadataManager;
49use crate::model::{ActorId, FragmentId, StreamJobFragments};
50use crate::rpc::metrics::MetaMetrics;
51
52pub type SourceManagerRef = Arc<SourceManager>;
53pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
54pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
55
56/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`worker::ConnectorSourceWorker::tick`]),
57/// and sends a split assignment command if split changes detected ([`Self::tick`]).
58pub struct SourceManager {
59    pub paused: Mutex<()>,
60    barrier_scheduler: BarrierScheduler,
61    core: Mutex<SourceManagerCore>,
62    pub metrics: Arc<MetaMetrics>,
63}
64pub struct SourceManagerCore {
65    metadata_manager: MetadataManager,
66
67    /// Managed source loops
68    managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
69    /// Fragments associated with each source
70    source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
71    /// `source_id` -> `(fragment_id, upstream_fragment_id)`
72    backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
73
74    /// Splits assigned per actor,
75    /// incl. both `Source` and `SourceBackfill`.
76    actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
77}
78
79pub struct SourceManagerRunningInfo {
80    pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
81    pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
82    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
83}
84
85impl SourceManagerCore {
86    fn new(
87        metadata_manager: MetadataManager,
88        managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
89        source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
90        backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
91        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
92    ) -> Self {
93        Self {
94            metadata_manager,
95            managed_sources,
96            source_fragments,
97            backfill_fragments,
98            actor_splits,
99        }
100    }
101
102    /// Updates states after all kinds of source change.
103    pub fn apply_source_change(&mut self, source_change: SourceChange) {
104        let mut added_source_fragments = Default::default();
105        let mut added_backfill_fragments = Default::default();
106        let mut finished_backfill_fragments = Default::default();
107        let mut split_assignment = Default::default();
108        let mut dropped_actors = Default::default();
109        let mut fragment_replacements = Default::default();
110        let mut dropped_source_fragments = Default::default();
111        let mut dropped_source_ids = Default::default();
112
113        match source_change {
114            SourceChange::CreateJob {
115                added_source_fragments: added_source_fragments_,
116                added_backfill_fragments: added_backfill_fragments_,
117                split_assignment: split_assignment_,
118            } => {
119                added_source_fragments = added_source_fragments_;
120                added_backfill_fragments = added_backfill_fragments_;
121                split_assignment = split_assignment_;
122            }
123            SourceChange::CreateJobFinished {
124                finished_backfill_fragments: finished_backfill_fragments_,
125            } => {
126                finished_backfill_fragments = finished_backfill_fragments_;
127            }
128            SourceChange::SplitChange(split_assignment_) => {
129                split_assignment = split_assignment_;
130            }
131            SourceChange::DropMv {
132                dropped_source_fragments: dropped_source_fragments_,
133                dropped_actors: dropped_actors_,
134            } => {
135                dropped_source_fragments = dropped_source_fragments_;
136                dropped_actors = dropped_actors_;
137            }
138            SourceChange::ReplaceJob {
139                dropped_source_fragments: dropped_source_fragments_,
140                dropped_actors: dropped_actors_,
141                added_source_fragments: added_source_fragments_,
142                split_assignment: split_assignment_,
143                fragment_replacements: fragment_replacements_,
144            } => {
145                dropped_source_fragments = dropped_source_fragments_;
146                dropped_actors = dropped_actors_;
147                added_source_fragments = added_source_fragments_;
148                split_assignment = split_assignment_;
149                fragment_replacements = fragment_replacements_;
150            }
151            SourceChange::DropSource {
152                dropped_source_ids: dropped_source_ids_,
153            } => {
154                dropped_source_ids = dropped_source_ids_;
155            }
156            SourceChange::Reschedule {
157                split_assignment: split_assignment_,
158                dropped_actors: dropped_actors_,
159            } => {
160                split_assignment = split_assignment_;
161                dropped_actors = dropped_actors_;
162            }
163        }
164
165        for source_id in dropped_source_ids {
166            let dropped_fragments = self.source_fragments.remove(&source_id);
167
168            if let Some(handle) = self.managed_sources.remove(&source_id) {
169                handle.terminate(dropped_fragments);
170            }
171            if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
172                // TODO: enable this assertion after we implemented cleanup for backfill fragments
173                // debug_assert!(
174                //     fragments.is_empty(),
175                //     "when dropping source, there should be no backfill fragments, got: {:?}",
176                //     fragments
177                // );
178            }
179        }
180
181        for (source_id, fragments) in added_source_fragments {
182            self.source_fragments
183                .entry(source_id)
184                .or_default()
185                .extend(fragments);
186        }
187
188        for (source_id, fragments) in added_backfill_fragments {
189            self.backfill_fragments
190                .entry(source_id)
191                .or_default()
192                .extend(fragments);
193        }
194
195        for (source_id, fragments) in finished_backfill_fragments {
196            let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
197                panic!(
198                    "source {} not found when adding backfill fragments {:?}",
199                    source_id, fragments
200                );
201            });
202            handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
203        }
204
205        for (_, actor_splits) in split_assignment {
206            for (actor_id, splits) in actor_splits {
207                // override previous splits info
208                self.actor_splits.insert(actor_id, splits);
209            }
210        }
211
212        for actor_id in dropped_actors {
213            self.actor_splits.remove(&actor_id);
214        }
215
216        for (source_id, fragment_ids) in dropped_source_fragments {
217            self.drop_source_fragments(Some(source_id), fragment_ids);
218        }
219
220        for (old_fragment_id, new_fragment_id) in fragment_replacements {
221            // TODO: add source_id to the fragment_replacements to avoid iterating all sources
222            self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
223
224            for fragment_ids in self.backfill_fragments.values_mut() {
225                let mut new_backfill_fragment_ids = fragment_ids.clone();
226                for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
227                    assert_ne!(
228                        fragment_id, upstream_fragment_id,
229                        "backfill fragment should not be replaced"
230                    );
231                    if *upstream_fragment_id == old_fragment_id {
232                        new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
233                        new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
234                    }
235                }
236                *fragment_ids = new_backfill_fragment_ids;
237            }
238        }
239    }
240
241    fn drop_source_fragments(
242        &mut self,
243        source_id: Option<SourceId>,
244        dropped_fragment_ids: BTreeSet<FragmentId>,
245    ) {
246        if let Some(source_id) = source_id {
247            if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
248                let mut dropped_ids = vec![];
249                let managed_fragment_ids = entry.get_mut();
250                for fragment_id in &dropped_fragment_ids {
251                    managed_fragment_ids.remove(fragment_id);
252                    dropped_ids.push(*fragment_id);
253                }
254                if let Some(handle) = self.managed_sources.get(&source_id) {
255                    handle.drop_fragments(dropped_ids);
256                } else {
257                    panic_if_debug!(
258                        "source {source_id} not found when dropping fragment {dropped_ids:?}",
259                    );
260                }
261                if managed_fragment_ids.is_empty() {
262                    entry.remove();
263                }
264            }
265        } else {
266            for (source_id, fragment_ids) in &mut self.source_fragments {
267                let mut dropped_ids = vec![];
268                for fragment_id in &dropped_fragment_ids {
269                    if fragment_ids.remove(fragment_id) {
270                        dropped_ids.push(*fragment_id);
271                    }
272                }
273                if !dropped_ids.is_empty() {
274                    if let Some(handle) = self.managed_sources.get(source_id) {
275                        handle.drop_fragments(dropped_ids);
276                    } else {
277                        panic_if_debug!(
278                            "source {source_id} not found when dropping fragment {dropped_ids:?}",
279                        );
280                    }
281                }
282            }
283        }
284    }
285}
286
287impl SourceManager {
288    const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
289
290    pub async fn new(
291        barrier_scheduler: BarrierScheduler,
292        metadata_manager: MetadataManager,
293        metrics: Arc<MetaMetrics>,
294    ) -> MetaResult<Self> {
295        let mut managed_sources = HashMap::new();
296        {
297            let sources = metadata_manager.list_sources().await?;
298            for source in sources {
299                create_source_worker_async(source, &mut managed_sources, metrics.clone())?
300            }
301        }
302
303        let source_fragments = metadata_manager
304            .catalog_controller
305            .load_source_fragment_ids()
306            .await?
307            .into_iter()
308            .map(|(source_id, fragment_ids)| {
309                (
310                    source_id as SourceId,
311                    fragment_ids.into_iter().map(|id| id as _).collect(),
312                )
313            })
314            .collect();
315        let backfill_fragments = metadata_manager
316            .catalog_controller
317            .load_backfill_fragment_ids()
318            .await?
319            .into_iter()
320            .map(|(source_id, fragment_ids)| {
321                (
322                    source_id as SourceId,
323                    fragment_ids
324                        .into_iter()
325                        .map(|(id, up_id)| (id as _, up_id as _))
326                        .collect(),
327                )
328            })
329            .collect();
330        let actor_splits = metadata_manager
331            .catalog_controller
332            .load_actor_splits()
333            .await?
334            .into_iter()
335            .map(|(actor_id, splits)| {
336                (
337                    actor_id as ActorId,
338                    splits
339                        .to_protobuf()
340                        .splits
341                        .iter()
342                        .map(|split| SplitImpl::try_from(split).unwrap())
343                        .collect(),
344                )
345            })
346            .collect();
347
348        let core = Mutex::new(SourceManagerCore::new(
349            metadata_manager,
350            managed_sources,
351            source_fragments,
352            backfill_fragments,
353            actor_splits,
354        ));
355
356        Ok(Self {
357            barrier_scheduler,
358            core,
359            paused: Mutex::new(()),
360            metrics,
361        })
362    }
363
364    /// For replacing job (alter table/source, create sink into table).
365    pub async fn handle_replace_job(
366        &self,
367        dropped_job_fragments: &StreamJobFragments,
368        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
369        split_assignment: SplitAssignment,
370        replace_plan: &ReplaceStreamJobPlan,
371    ) {
372        // Extract the fragments that include source operators.
373        let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
374
375        let fragments = &dropped_job_fragments.fragments;
376
377        let dropped_actors = dropped_source_fragments
378            .values()
379            .flatten()
380            .flat_map(|fragment_id| fragments.get(fragment_id).unwrap().actors.iter())
381            .map(|actor| actor.actor_id)
382            .collect::<HashSet<_>>();
383
384        self.apply_source_change(SourceChange::ReplaceJob {
385            dropped_source_fragments,
386            dropped_actors,
387            added_source_fragments,
388            split_assignment,
389            fragment_replacements: replace_plan.fragment_replacements(),
390        })
391        .await;
392    }
393
394    /// Updates states after all kinds of source change.
395    /// e.g., split change (`post_collect` barrier) or scaling (`post_apply_reschedule`).
396    pub async fn apply_source_change(&self, source_change: SourceChange) {
397        let mut core = self.core.lock().await;
398        core.apply_source_change(source_change);
399    }
400
401    /// create and register connector worker for source.
402    pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
403        tracing::debug!("register_source: {}", source.get_id());
404        let mut core = self.core.lock().await;
405        if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) {
406            let handle = create_source_worker(source, self.metrics.clone())
407                .await
408                .context("failed to create source worker")?;
409            e.insert(handle);
410        } else {
411            tracing::warn!("source {} already registered", source.get_id());
412        }
413        Ok(())
414    }
415
416    /// register connector worker for source.
417    pub async fn register_source_with_handle(
418        &self,
419        source_id: SourceId,
420        handle: ConnectorSourceWorkerHandle,
421    ) {
422        let mut core = self.core.lock().await;
423        if let Entry::Vacant(e) = core.managed_sources.entry(source_id) {
424            e.insert(handle);
425        } else {
426            tracing::warn!("source {} already registered", source_id);
427        }
428    }
429
430    pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
431        let core = self.core.lock().await;
432        core.actor_splits.clone()
433    }
434
435    pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
436        let core = self.core.lock().await;
437        SourceManagerRunningInfo {
438            source_fragments: core.source_fragments.clone(),
439            backfill_fragments: core.backfill_fragments.clone(),
440            actor_splits: core.actor_splits.clone(),
441        }
442    }
443
444    /// Checks whether the external source metadata has changed, and sends a split assignment command
445    /// if it has.
446    ///
447    /// This is also how a newly created `SourceExecutor` is initialized.
448    /// (force `tick` in `Self::create_source_worker`)
449    ///
450    /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change`
451    /// to update states in `SourceManager`.
452    async fn tick(&self) -> MetaResult<()> {
453        let split_assignment = {
454            let core_guard = self.core.lock().await;
455            core_guard.reassign_splits().await?
456        };
457
458        for (database_id, split_assignment) in split_assignment {
459            if !split_assignment.is_empty() {
460                let command = Command::SourceChangeSplit(split_assignment);
461                tracing::info!(command = ?command, "pushing down split assignment command");
462                self.barrier_scheduler
463                    .run_command(database_id, command)
464                    .await?;
465            }
466        }
467
468        Ok(())
469    }
470
471    pub async fn run(&self) -> MetaResult<()> {
472        let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
473        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
474        loop {
475            ticker.tick().await;
476            let _pause_guard = self.paused.lock().await;
477            if let Err(e) = self.tick().await {
478                tracing::error!(
479                    error = %e.as_report(),
480                    "error happened while running source manager tick",
481                );
482            }
483        }
484    }
485
486    /// Pause the tick loop in source manager until the returned guard is dropped.
487    pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
488        tracing::debug!("pausing tick lock in source manager");
489        self.paused.lock().await
490    }
491}
492
493pub enum SourceChange {
494    /// `CREATE SOURCE` (shared), or `CREATE MV`.
495    /// This is applied after the job is successfully created (`post_collect` barrier).
496    CreateJob {
497        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
498        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
499        added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
500        split_assignment: SplitAssignment,
501    },
502    /// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
503    /// This is applied after `wait_streaming_job_finished`.
504    /// XXX: Should we merge `CreateJob` into this?
505    CreateJobFinished {
506        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
507        finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
508    },
509    SplitChange(SplitAssignment),
510    /// `DROP SOURCE` or `DROP MV`
511    DropSource {
512        dropped_source_ids: Vec<SourceId>,
513    },
514    DropMv {
515        // FIXME: we should consider source backfill fragments here for MV on shared source.
516        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
517        dropped_actors: HashSet<ActorId>,
518    },
519    ReplaceJob {
520        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
521        dropped_actors: HashSet<ActorId>,
522
523        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
524        split_assignment: SplitAssignment,
525        fragment_replacements: HashMap<FragmentId, FragmentId>,
526    },
527    Reschedule {
528        split_assignment: SplitAssignment,
529        dropped_actors: HashSet<ActorId>,
530    },
531}
532
533pub fn build_actor_connector_splits(
534    splits: &HashMap<ActorId, Vec<SplitImpl>>,
535) -> HashMap<u32, ConnectorSplits> {
536    splits
537        .iter()
538        .map(|(&actor_id, splits)| {
539            (
540                actor_id,
541                ConnectorSplits {
542                    splits: splits.iter().map(ConnectorSplit::from).collect(),
543                },
544            )
545        })
546        .collect()
547}
548
549pub fn build_actor_split_impls(
550    actor_splits: &HashMap<u32, ConnectorSplits>,
551) -> HashMap<ActorId, Vec<SplitImpl>> {
552    actor_splits
553        .iter()
554        .map(|(actor_id, ConnectorSplits { splits })| {
555            (
556                *actor_id,
557                splits
558                    .iter()
559                    .map(|split| SplitImpl::try_from(split).unwrap())
560                    .collect(),
561            )
562        })
563        .collect()
564}