1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::id::ObjectId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32 AnySplitEnumerator, ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo,
33 SplitId, SplitImpl, SplitMetaData,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::{SplitDiffOptions, SplitState, align_splits, reassign_splits};
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
49use crate::manager::{MetaSrvEnv, MetadataManager};
50use crate::model::{ActorId, FragmentId, StreamJobFragments};
51use crate::rpc::metrics::MetaMetrics;
52use crate::{MetaError, MetaResult};
53
54pub type SourceManagerRef = Arc<SourceManager>;
55pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
57
58pub type SourceSplitAssignment = HashMap<SourceId, DiscoveredSplits>;
64
65#[derive(Debug, Clone)]
70pub enum DiscoveredSplits {
71 Fixed(BTreeMap<Arc<str>, SplitImpl>),
73 Adaptive(SplitImpl),
76}
77
78#[derive(Debug, Clone)]
84pub enum ReplaceJobSplitPlan {
85 Discovered(SourceSplitAssignment),
89 AlignFromPrevious,
95}
96
97pub type ConnectorPropsChange = HashMap<ObjectId, HashMap<String, String>>;
99
100const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
101
102pub struct SourceManager {
105 pub paused: Mutex<()>,
106 barrier_scheduler: BarrierScheduler,
107 core: Mutex<SourceManagerCore>,
108 pub metrics: Arc<MetaMetrics>,
109}
110pub struct SourceManagerCore {
111 metadata_manager: MetadataManager,
112
113 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
115 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
117 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
119
120 env: MetaSrvEnv,
121}
122
123pub struct SourceManagerRunningInfo {
124 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
125 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
126}
127
128impl SourceManagerCore {
129 fn new(
130 metadata_manager: MetadataManager,
131 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
132 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
133 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
134 env: MetaSrvEnv,
135 ) -> Self {
136 Self {
137 metadata_manager,
138 managed_sources,
139 source_fragments,
140 backfill_fragments,
141 env,
142 }
143 }
144
145 pub fn apply_source_change(&mut self, source_change: SourceChange) {
147 let mut added_source_fragments = Default::default();
148 let mut added_backfill_fragments = Default::default();
149 let mut finished_backfill_fragments = Default::default();
150 let mut fragment_replacements = Default::default();
151 let mut dropped_source_fragments = Default::default();
152 let mut dropped_source_ids = Default::default();
153 let mut recreate_source_id_map_new_props: Vec<(SourceId, HashMap<String, String>)> =
154 Default::default();
155
156 match source_change {
157 SourceChange::CreateJob {
158 added_source_fragments: added_source_fragments_,
159 added_backfill_fragments: added_backfill_fragments_,
160 } => {
161 added_source_fragments = added_source_fragments_;
162 added_backfill_fragments = added_backfill_fragments_;
163 }
164 SourceChange::CreateJobFinished {
165 finished_backfill_fragments: finished_backfill_fragments_,
166 } => {
167 finished_backfill_fragments = finished_backfill_fragments_;
168 }
169
170 SourceChange::DropMv {
171 dropped_source_fragments: dropped_source_fragments_,
172 } => {
173 dropped_source_fragments = dropped_source_fragments_;
174 }
175 SourceChange::ReplaceJob {
176 dropped_source_fragments: dropped_source_fragments_,
177 added_source_fragments: added_source_fragments_,
178 fragment_replacements: fragment_replacements_,
179 } => {
180 dropped_source_fragments = dropped_source_fragments_;
181 added_source_fragments = added_source_fragments_;
182 fragment_replacements = fragment_replacements_;
183 }
184 SourceChange::DropSource {
185 dropped_source_ids: dropped_source_ids_,
186 } => {
187 dropped_source_ids = dropped_source_ids_;
188 }
189
190 SourceChange::UpdateSourceProps {
191 source_id_map_new_props,
192 } => {
193 for (source_id, new_props) in source_id_map_new_props {
194 recreate_source_id_map_new_props.push((source_id, new_props));
195 }
196 }
197 }
198
199 for source_id in dropped_source_ids {
200 let dropped_fragments = self.source_fragments.remove(&source_id);
201
202 if let Some(handle) = self.managed_sources.remove(&source_id) {
203 handle.terminate(dropped_fragments);
204 }
205 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
206 }
213 }
214
215 for (source_id, fragments) in added_source_fragments {
216 self.source_fragments
217 .entry(source_id)
218 .or_default()
219 .extend(fragments);
220 }
221
222 for (source_id, fragments) in added_backfill_fragments {
223 self.backfill_fragments
224 .entry(source_id)
225 .or_default()
226 .extend(fragments);
227 }
228
229 for (source_id, fragments) in finished_backfill_fragments {
230 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
231 panic!(
232 "source {} not found when adding backfill fragments {:?}",
233 source_id, fragments
234 );
235 });
236 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
237 }
238
239 for (source_id, fragment_ids) in dropped_source_fragments {
240 self.drop_source_fragments(Some(source_id), fragment_ids);
241 }
242
243 for (old_fragment_id, new_fragment_id) in fragment_replacements {
244 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
246
247 for fragment_ids in self.backfill_fragments.values_mut() {
248 let mut new_backfill_fragment_ids = fragment_ids.clone();
249 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
250 assert_ne!(
251 fragment_id, upstream_fragment_id,
252 "backfill fragment should not be replaced"
253 );
254 if *upstream_fragment_id == old_fragment_id {
255 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
256 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
257 }
258 }
259 *fragment_ids = new_backfill_fragment_ids;
260 }
261 }
262
263 for (source_id, new_props) in recreate_source_id_map_new_props {
264 if let Some(handle) = self.managed_sources.get_mut(&source_id) {
265 let props_wrapper =
268 WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
269 let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); handle.update_props(props);
271 tracing::info!("update source {source_id} properties in source manager");
272 } else {
273 tracing::info!("job id {source_id} is not registered in source manager");
274 }
275 }
276 }
277
278 fn drop_source_fragments(
279 &mut self,
280 source_id: Option<SourceId>,
281 dropped_fragment_ids: BTreeSet<FragmentId>,
282 ) {
283 if let Some(source_id) = source_id {
284 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
285 let mut dropped_ids = vec![];
286 let managed_fragment_ids = entry.get_mut();
287 for fragment_id in &dropped_fragment_ids {
288 managed_fragment_ids.remove(fragment_id);
289 dropped_ids.push(*fragment_id);
290 }
291 if let Some(handle) = self.managed_sources.get(&source_id) {
292 handle.drop_fragments(dropped_ids);
293 } else {
294 panic_if_debug!(
295 "source {source_id} not found when dropping fragment {dropped_ids:?}",
296 );
297 }
298 if managed_fragment_ids.is_empty() {
299 entry.remove();
300 }
301 }
302 } else {
303 for (source_id, fragment_ids) in &mut self.source_fragments {
304 let mut dropped_ids = vec![];
305 for fragment_id in &dropped_fragment_ids {
306 if fragment_ids.remove(fragment_id) {
307 dropped_ids.push(*fragment_id);
308 }
309 }
310 if !dropped_ids.is_empty() {
311 if let Some(handle) = self.managed_sources.get(source_id) {
312 handle.drop_fragments(dropped_ids);
313 } else {
314 panic_if_debug!(
315 "source {source_id} not found when dropping fragment {dropped_ids:?}",
316 );
317 }
318 }
319 }
320 }
321 }
322}
323
324impl SourceManager {
325 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
326
327 pub async fn new(
328 barrier_scheduler: BarrierScheduler,
329 metadata_manager: MetadataManager,
330 metrics: Arc<MetaMetrics>,
331 env: MetaSrvEnv,
332 ) -> MetaResult<Self> {
333 let mut managed_sources = HashMap::new();
334 {
335 let sources = metadata_manager.list_sources().await?;
336 for source in sources {
337 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
338 }
339 }
340
341 let source_fragments = metadata_manager
342 .catalog_controller
343 .load_source_fragment_ids()
344 .await?
345 .into_iter()
346 .map(|(source_id, fragment_ids)| {
347 (
348 source_id as SourceId,
349 fragment_ids.into_iter().map(|id| id as _).collect(),
350 )
351 })
352 .collect();
353 let backfill_fragments = metadata_manager
354 .catalog_controller
355 .load_backfill_fragment_ids()
356 .await?;
357
358 let core = Mutex::new(SourceManagerCore::new(
359 metadata_manager,
360 managed_sources,
361 source_fragments,
362 backfill_fragments,
363 env,
364 ));
365
366 Ok(Self {
367 barrier_scheduler,
368 core,
369 paused: Mutex::new(()),
370 metrics,
371 })
372 }
373
374 pub async fn validate_source_once(
375 &self,
376 source_id: SourceId,
377 new_source_props: WithOptionsSecResolved,
378 ) -> MetaResult<()> {
379 let props = ConnectorProperties::extract(new_source_props, false).unwrap();
380
381 {
382 let mut enumerator = props
383 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
384 metrics: self.metrics.source_enumerator_metrics.clone(),
385 info: SourceEnumeratorInfo { source_id },
386 }))
387 .await
388 .context("failed to create SplitEnumerator")?;
389
390 validate_enumerator_once(&mut *enumerator).await?;
391 }
392 Ok(())
393 }
394
395 #[await_tree::instrument]
397 pub async fn handle_replace_job(
398 &self,
399 dropped_job_fragments: &StreamJobFragments,
400 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
401 replace_plan: &ReplaceStreamJobPlan,
402 ) {
403 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments();
405
406 self.apply_source_change(SourceChange::ReplaceJob {
407 dropped_source_fragments,
408 added_source_fragments,
409 fragment_replacements: replace_plan.fragment_replacements(),
410 })
411 .await;
412 }
413
414 #[await_tree::instrument("apply_source_change({source_change})")]
417 pub async fn apply_source_change(&self, source_change: SourceChange) {
418 let need_force_tick = matches!(source_change, SourceChange::UpdateSourceProps { .. });
419 let updated_source_ids = if let SourceChange::UpdateSourceProps {
420 ref source_id_map_new_props,
421 } = source_change
422 {
423 source_id_map_new_props.keys().cloned().collect::<Vec<_>>()
424 } else {
425 Vec::new()
426 };
427
428 {
429 let mut core = self.core.lock().await;
430 core.apply_source_change(source_change);
431 }
432
433 if need_force_tick {
435 self.force_tick_updated_sources(updated_source_ids).await;
436 }
437 }
438
439 #[await_tree::instrument("register_source({})", source.name)]
441 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
442 tracing::debug!("register_source: {}", source.get_id());
443 let mut core = self.core.lock().await;
444 let source_id = source.get_id();
445 if core.managed_sources.contains_key(&source_id) {
446 tracing::warn!("source {} already registered", source_id);
447 return Ok(());
448 }
449
450 let handle = create_source_worker(source, self.metrics.clone())
451 .await
452 .context("failed to create source worker")?;
453
454 core.managed_sources.insert(source_id, handle);
455
456 Ok(())
457 }
458
459 pub async fn register_source_with_handle(
461 &self,
462 source_id: SourceId,
463 handle: ConnectorSourceWorkerHandle,
464 ) {
465 let mut core = self.core.lock().await;
466 if core.managed_sources.contains_key(&source_id) {
467 tracing::warn!("source {} already registered", source_id);
468 return;
469 }
470
471 core.managed_sources.insert(source_id, handle);
472 }
473
474 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
475 let core = self.core.lock().await;
476
477 SourceManagerRunningInfo {
478 source_fragments: core.source_fragments.clone(),
479 backfill_fragments: core.backfill_fragments.clone(),
480 }
481 }
482
483 async fn tick(&self) -> MetaResult<()> {
492 let split_states = {
493 let core_guard = self.core.lock().await;
494 core_guard.reassign_splits().await?
495 };
496
497 for (database_id, split_state) in split_states {
498 if !split_state.split_assignment.is_empty() {
499 let command = Command::SourceChangeSplit(split_state);
500 tracing::info!(command = ?command, "pushing down split assignment command");
501 self.barrier_scheduler
502 .run_command(database_id, command)
503 .await?;
504 }
505 }
506
507 Ok(())
508 }
509
510 pub async fn run(&self) -> MetaResult<()> {
511 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
512 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
513 loop {
514 ticker.tick().await;
515 let _pause_guard = self.paused.lock().await;
516 if let Err(e) = self.tick().await {
517 tracing::error!(
518 error = %e.as_report(),
519 "error happened while running source manager tick",
520 );
521 }
522 }
523 }
524
525 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
527 tracing::debug!("pausing tick lock in source manager");
528 self.paused.lock().await
529 }
530
531 async fn force_tick_updated_sources(&self, updated_source_ids: Vec<SourceId>) {
533 let core = self.core.lock().await;
534 for source_id in updated_source_ids {
535 if let Some(handle) = core.managed_sources.get(&source_id) {
536 tracing::info!("forcing tick for updated source {}", source_id);
537 if let Err(e) = handle.force_tick().await {
538 tracing::warn!(
539 error = %e.as_report(),
540 "failed to force tick for source {} after properties update",
541 source_id
542 );
543 }
544 } else {
545 tracing::warn!(
546 "source {} not found when trying to force tick after update",
547 source_id
548 );
549 }
550 }
551 }
552
553 pub async fn reset_source_splits(&self, source_id: SourceId) -> MetaResult<()> {
557 tracing::warn!(
558 %source_id,
559 "UNSAFE: Resetting source splits - clearing cached state and triggering re-discovery"
560 );
561
562 let core = self.core.lock().await;
563 if let Some(handle) = core.managed_sources.get(&source_id) {
564 {
566 let mut splits_guard = handle.splits.lock().await;
567 tracing::info!(
568 %source_id,
569 prev_splits = ?splits_guard.splits.as_ref().map(|s| s.len()),
570 "Clearing cached splits"
571 );
572 splits_guard.splits = None;
573 }
574
575 tracing::info!(
577 %source_id,
578 "Triggering split re-discovery via force_tick"
579 );
580 handle.force_tick().await.with_context(|| {
581 format!(
582 "failed to force tick for source {} after split reset",
583 source_id
584 )
585 })?;
586
587 tracing::info!(
588 %source_id,
589 "Split reset completed - new splits will be assigned on next tick"
590 );
591 Ok(())
592 } else {
593 Err(anyhow::anyhow!("source {} not found in source manager", source_id).into())
594 }
595 }
596
597 pub async fn validate_inject_source_offsets(
604 &self,
605 source_id: SourceId,
606 split_offsets: &HashMap<String, String>,
607 ) -> MetaResult<Vec<String>> {
608 let (fragment_ids, env) = {
609 let core = self.core.lock().await;
610
611 let _ = core.managed_sources.get(&source_id).ok_or_else(|| {
613 MetaError::invalid_parameter(format!(
614 "source {} not found in source manager",
615 source_id
616 ))
617 })?;
618
619 let mut ids = Vec::new();
620 if let Some(src_frags) = core.source_fragments.get(&source_id) {
621 ids.extend(src_frags.iter().copied());
622 }
623 if let Some(backfill_frags) = core.backfill_fragments.get(&source_id) {
624 ids.extend(
625 backfill_frags
626 .iter()
627 .flat_map(|(id, upstream)| [*id, *upstream]),
628 );
629 }
630 (ids, core.env.clone())
631 };
632
633 if fragment_ids.is_empty() {
634 return Err(MetaError::invalid_parameter(format!(
635 "source {} has no running fragments",
636 source_id
637 )));
638 }
639
640 let guard = env.shared_actor_infos().read_guard();
641 let mut assigned_split_ids = HashSet::new();
642 for fragment_id in fragment_ids {
643 if let Some(fragment) = guard.get_fragment(fragment_id) {
644 for actor in fragment.actors.values() {
645 for split in &actor.splits {
646 assigned_split_ids.insert(split.id().to_string());
647 }
648 }
649 }
650 }
651
652 let mut invalid_splits = Vec::new();
654 for split_id in split_offsets.keys() {
655 if !assigned_split_ids.contains(split_id) {
656 invalid_splits.push(split_id.clone());
657 }
658 }
659
660 if !invalid_splits.is_empty() {
661 return Err(MetaError::invalid_parameter(format!(
662 "invalid split IDs for source {}: {:?}. Valid splits are: {:?}",
663 source_id,
664 invalid_splits,
665 assigned_split_ids.iter().collect::<Vec<_>>()
666 )));
667 }
668
669 tracing::info!(
670 source_id = %source_id,
671 num_splits = split_offsets.len(),
672 "Validated inject source offsets request"
673 );
674
675 Ok(split_offsets.keys().cloned().collect())
676 }
677}
678
679async fn validate_enumerator_once(enumerator: &mut dyn AnySplitEnumerator) -> MetaResult<()> {
680 let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
681 .await
682 .context("failed to list splits")??;
683 Ok(())
684}
685
686#[derive(strum::Display, Debug)]
687pub enum SourceChange {
688 CreateJob {
691 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
692 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
694 },
695 UpdateSourceProps {
696 source_id_map_new_props: HashMap<SourceId, HashMap<String, String>>,
699 },
700 CreateJobFinished {
704 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
706 },
707 DropSource { dropped_source_ids: Vec<SourceId> },
709 DropMv {
710 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
712 },
713 ReplaceJob {
714 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
715 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
716 fragment_replacements: HashMap<FragmentId, FragmentId>,
717 },
718}
719
720pub fn build_actor_connector_splits(
721 splits: &HashMap<ActorId, Vec<SplitImpl>>,
722) -> HashMap<ActorId, ConnectorSplits> {
723 splits
724 .iter()
725 .map(|(&actor_id, splits)| {
726 (
727 actor_id,
728 ConnectorSplits {
729 splits: splits.iter().map(ConnectorSplit::from).collect(),
730 },
731 )
732 })
733 .collect()
734}
735
736pub fn build_actor_split_impls(
737 actor_splits: &HashMap<ActorId, ConnectorSplits>,
738) -> HashMap<ActorId, Vec<SplitImpl>> {
739 actor_splits
740 .iter()
741 .map(|(actor_id, ConnectorSplits { splits })| {
742 (
743 *actor_id,
744 splits
745 .iter()
746 .map(|split| SplitImpl::try_from(split).unwrap())
747 .collect(),
748 )
749 })
750 .collect()
751}