1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::id::ObjectId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32 ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
33 SplitMetaData,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::{SplitDiffOptions, SplitState, align_splits, reassign_splits};
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
49use crate::manager::{MetaSrvEnv, MetadataManager};
50use crate::model::{ActorId, FragmentId, StreamJobFragments};
51use crate::rpc::metrics::MetaMetrics;
52use crate::{MetaError, MetaResult};
53
54pub type SourceManagerRef = Arc<SourceManager>;
55pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
57
58pub type SourceSplitAssignment = HashMap<SourceId, DiscoveredSplits>;
64
65#[derive(Debug, Clone)]
70pub enum DiscoveredSplits {
71 Fixed(BTreeMap<Arc<str>, SplitImpl>),
73 Adaptive(SplitImpl),
76}
77
78#[derive(Debug, Clone)]
84pub enum ReplaceJobSplitPlan {
85 Discovered(SourceSplitAssignment),
89 AlignFromPrevious,
95}
96
97pub type ConnectorPropsChange = HashMap<ObjectId, HashMap<String, String>>;
99
100const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
101
102pub struct SourceManager {
105 pub paused: Mutex<()>,
106 barrier_scheduler: BarrierScheduler,
107 core: Mutex<SourceManagerCore>,
108 pub metrics: Arc<MetaMetrics>,
109}
110pub struct SourceManagerCore {
111 metadata_manager: MetadataManager,
112
113 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
115 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
117 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
119
120 env: MetaSrvEnv,
121}
122
123pub struct SourceManagerRunningInfo {
124 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
125 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
126}
127
128impl SourceManagerCore {
129 fn new(
130 metadata_manager: MetadataManager,
131 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
132 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
133 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
134 env: MetaSrvEnv,
135 ) -> Self {
136 Self {
137 metadata_manager,
138 managed_sources,
139 source_fragments,
140 backfill_fragments,
141 env,
142 }
143 }
144
145 pub fn apply_source_change(&mut self, source_change: SourceChange) {
147 let mut added_source_fragments = Default::default();
148 let mut added_backfill_fragments = Default::default();
149 let mut finished_backfill_fragments = Default::default();
150 let mut fragment_replacements = Default::default();
151 let mut dropped_source_fragments = Default::default();
152 let mut dropped_source_ids = Default::default();
153 let mut recreate_source_id_map_new_props: Vec<(SourceId, HashMap<String, String>)> =
154 Default::default();
155
156 match source_change {
157 SourceChange::CreateJob {
158 added_source_fragments: added_source_fragments_,
159 added_backfill_fragments: added_backfill_fragments_,
160 } => {
161 added_source_fragments = added_source_fragments_;
162 added_backfill_fragments = added_backfill_fragments_;
163 }
164 SourceChange::CreateJobFinished {
165 finished_backfill_fragments: finished_backfill_fragments_,
166 } => {
167 finished_backfill_fragments = finished_backfill_fragments_;
168 }
169
170 SourceChange::DropMv {
171 dropped_source_fragments: dropped_source_fragments_,
172 } => {
173 dropped_source_fragments = dropped_source_fragments_;
174 }
175 SourceChange::ReplaceJob {
176 dropped_source_fragments: dropped_source_fragments_,
177 added_source_fragments: added_source_fragments_,
178 fragment_replacements: fragment_replacements_,
179 } => {
180 dropped_source_fragments = dropped_source_fragments_;
181 added_source_fragments = added_source_fragments_;
182 fragment_replacements = fragment_replacements_;
183 }
184 SourceChange::DropSource {
185 dropped_source_ids: dropped_source_ids_,
186 } => {
187 dropped_source_ids = dropped_source_ids_;
188 }
189
190 SourceChange::UpdateSourceProps {
191 source_id_map_new_props,
192 } => {
193 for (source_id, new_props) in source_id_map_new_props {
194 recreate_source_id_map_new_props.push((source_id, new_props));
195 }
196 }
197 }
198
199 for source_id in dropped_source_ids {
200 let dropped_fragments = self.source_fragments.remove(&source_id);
201
202 if let Some(handle) = self.managed_sources.remove(&source_id) {
203 handle.terminate(dropped_fragments);
204 }
205 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
206 }
213 }
214
215 for (source_id, fragments) in added_source_fragments {
216 self.source_fragments
217 .entry(source_id)
218 .or_default()
219 .extend(fragments);
220 }
221
222 for (source_id, fragments) in added_backfill_fragments {
223 self.backfill_fragments
224 .entry(source_id)
225 .or_default()
226 .extend(fragments);
227 }
228
229 for (source_id, fragments) in finished_backfill_fragments {
230 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
231 panic!(
232 "source {} not found when adding backfill fragments {:?}",
233 source_id, fragments
234 );
235 });
236 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
237 }
238
239 for (source_id, fragment_ids) in dropped_source_fragments {
240 self.drop_source_fragments(Some(source_id), fragment_ids);
241 }
242
243 for (old_fragment_id, new_fragment_id) in fragment_replacements {
244 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
246
247 for fragment_ids in self.backfill_fragments.values_mut() {
248 let mut new_backfill_fragment_ids = fragment_ids.clone();
249 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
250 assert_ne!(
251 fragment_id, upstream_fragment_id,
252 "backfill fragment should not be replaced"
253 );
254 if *upstream_fragment_id == old_fragment_id {
255 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
256 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
257 }
258 }
259 *fragment_ids = new_backfill_fragment_ids;
260 }
261 }
262
263 for (source_id, new_props) in recreate_source_id_map_new_props {
264 if let Some(handle) = self.managed_sources.get_mut(&source_id) {
265 let props_wrapper =
268 WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
269 let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); handle.update_props(props);
271 tracing::info!("update source {source_id} properties in source manager");
272 } else {
273 tracing::info!("job id {source_id} is not registered in source manager");
274 }
275 }
276 }
277
278 fn drop_source_fragments(
279 &mut self,
280 source_id: Option<SourceId>,
281 dropped_fragment_ids: BTreeSet<FragmentId>,
282 ) {
283 if let Some(source_id) = source_id {
284 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
285 let mut dropped_ids = vec![];
286 let managed_fragment_ids = entry.get_mut();
287 for fragment_id in &dropped_fragment_ids {
288 managed_fragment_ids.remove(fragment_id);
289 dropped_ids.push(*fragment_id);
290 }
291 if let Some(handle) = self.managed_sources.get(&source_id) {
292 handle.drop_fragments(dropped_ids);
293 } else {
294 panic_if_debug!(
295 "source {source_id} not found when dropping fragment {dropped_ids:?}",
296 );
297 }
298 if managed_fragment_ids.is_empty() {
299 entry.remove();
300 }
301 }
302 } else {
303 for (source_id, fragment_ids) in &mut self.source_fragments {
304 let mut dropped_ids = vec![];
305 for fragment_id in &dropped_fragment_ids {
306 if fragment_ids.remove(fragment_id) {
307 dropped_ids.push(*fragment_id);
308 }
309 }
310 if !dropped_ids.is_empty() {
311 if let Some(handle) = self.managed_sources.get(source_id) {
312 handle.drop_fragments(dropped_ids);
313 } else {
314 panic_if_debug!(
315 "source {source_id} not found when dropping fragment {dropped_ids:?}",
316 );
317 }
318 }
319 }
320 }
321 }
322}
323
324impl SourceManager {
325 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
326
327 pub async fn new(
328 barrier_scheduler: BarrierScheduler,
329 metadata_manager: MetadataManager,
330 metrics: Arc<MetaMetrics>,
331 env: MetaSrvEnv,
332 ) -> MetaResult<Self> {
333 let mut managed_sources = HashMap::new();
334 {
335 let sources = metadata_manager.list_sources().await?;
336 for source in sources {
337 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
338 }
339 }
340
341 let source_fragments = metadata_manager
342 .catalog_controller
343 .load_source_fragment_ids()
344 .await?
345 .into_iter()
346 .map(|(source_id, fragment_ids)| {
347 (
348 source_id as SourceId,
349 fragment_ids.into_iter().map(|id| id as _).collect(),
350 )
351 })
352 .collect();
353 let backfill_fragments = metadata_manager
354 .catalog_controller
355 .load_backfill_fragment_ids()
356 .await?;
357
358 let core = Mutex::new(SourceManagerCore::new(
359 metadata_manager,
360 managed_sources,
361 source_fragments,
362 backfill_fragments,
363 env,
364 ));
365
366 Ok(Self {
367 barrier_scheduler,
368 core,
369 paused: Mutex::new(()),
370 metrics,
371 })
372 }
373
374 pub async fn validate_source_once(
375 &self,
376 source_id: SourceId,
377 new_source_props: WithOptionsSecResolved,
378 ) -> MetaResult<()> {
379 let props = ConnectorProperties::extract(new_source_props, false).unwrap();
380
381 {
382 let mut enumerator = props
383 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
384 metrics: self.metrics.source_enumerator_metrics.clone(),
385 info: SourceEnumeratorInfo { source_id },
386 }))
387 .await
388 .context("failed to create SplitEnumerator")?;
389
390 let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
391 .await
392 .context("failed to list splits")??;
393 }
394 Ok(())
395 }
396
397 #[await_tree::instrument]
399 pub async fn handle_replace_job(
400 &self,
401 dropped_job_fragments: &StreamJobFragments,
402 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
403 replace_plan: &ReplaceStreamJobPlan,
404 ) {
405 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments();
407
408 self.apply_source_change(SourceChange::ReplaceJob {
409 dropped_source_fragments,
410 added_source_fragments,
411 fragment_replacements: replace_plan.fragment_replacements(),
412 })
413 .await;
414 }
415
416 #[await_tree::instrument("apply_source_change({source_change})")]
419 pub async fn apply_source_change(&self, source_change: SourceChange) {
420 let need_force_tick = matches!(source_change, SourceChange::UpdateSourceProps { .. });
421 let updated_source_ids = if let SourceChange::UpdateSourceProps {
422 ref source_id_map_new_props,
423 } = source_change
424 {
425 source_id_map_new_props.keys().cloned().collect::<Vec<_>>()
426 } else {
427 Vec::new()
428 };
429
430 {
431 let mut core = self.core.lock().await;
432 core.apply_source_change(source_change);
433 }
434
435 if need_force_tick {
437 self.force_tick_updated_sources(updated_source_ids).await;
438 }
439 }
440
441 #[await_tree::instrument("register_source({})", source.name)]
443 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
444 tracing::debug!("register_source: {}", source.get_id());
445 let mut core = self.core.lock().await;
446 let source_id = source.get_id();
447 if core.managed_sources.contains_key(&source_id) {
448 tracing::warn!("source {} already registered", source_id);
449 return Ok(());
450 }
451
452 let handle = create_source_worker(source, self.metrics.clone())
453 .await
454 .context("failed to create source worker")?;
455
456 core.managed_sources.insert(source_id, handle);
457
458 Ok(())
459 }
460
461 pub async fn register_source_with_handle(
463 &self,
464 source_id: SourceId,
465 handle: ConnectorSourceWorkerHandle,
466 ) {
467 let mut core = self.core.lock().await;
468 if core.managed_sources.contains_key(&source_id) {
469 tracing::warn!("source {} already registered", source_id);
470 return;
471 }
472
473 core.managed_sources.insert(source_id, handle);
474 }
475
476 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
477 let core = self.core.lock().await;
478
479 SourceManagerRunningInfo {
480 source_fragments: core.source_fragments.clone(),
481 backfill_fragments: core.backfill_fragments.clone(),
482 }
483 }
484
485 async fn tick(&self) -> MetaResult<()> {
494 let split_states = {
495 let core_guard = self.core.lock().await;
496 core_guard.reassign_splits().await?
497 };
498
499 for (database_id, split_state) in split_states {
500 if !split_state.split_assignment.is_empty() {
501 let command = Command::SourceChangeSplit(split_state);
502 tracing::info!(command = ?command, "pushing down split assignment command");
503 self.barrier_scheduler
504 .run_command(database_id, command)
505 .await?;
506 }
507 }
508
509 Ok(())
510 }
511
512 pub async fn run(&self) -> MetaResult<()> {
513 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
514 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
515 loop {
516 ticker.tick().await;
517 let _pause_guard = self.paused.lock().await;
518 if let Err(e) = self.tick().await {
519 tracing::error!(
520 error = %e.as_report(),
521 "error happened while running source manager tick",
522 );
523 }
524 }
525 }
526
527 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
529 tracing::debug!("pausing tick lock in source manager");
530 self.paused.lock().await
531 }
532
533 async fn force_tick_updated_sources(&self, updated_source_ids: Vec<SourceId>) {
535 let core = self.core.lock().await;
536 for source_id in updated_source_ids {
537 if let Some(handle) = core.managed_sources.get(&source_id) {
538 tracing::info!("forcing tick for updated source {}", source_id);
539 if let Err(e) = handle.force_tick().await {
540 tracing::warn!(
541 error = %e.as_report(),
542 "failed to force tick for source {} after properties update",
543 source_id
544 );
545 }
546 } else {
547 tracing::warn!(
548 "source {} not found when trying to force tick after update",
549 source_id
550 );
551 }
552 }
553 }
554
555 pub async fn reset_source_splits(&self, source_id: SourceId) -> MetaResult<()> {
559 tracing::warn!(
560 %source_id,
561 "UNSAFE: Resetting source splits - clearing cached state and triggering re-discovery"
562 );
563
564 let core = self.core.lock().await;
565 if let Some(handle) = core.managed_sources.get(&source_id) {
566 {
568 let mut splits_guard = handle.splits.lock().await;
569 tracing::info!(
570 %source_id,
571 prev_splits = ?splits_guard.splits.as_ref().map(|s| s.len()),
572 "Clearing cached splits"
573 );
574 splits_guard.splits = None;
575 }
576
577 tracing::info!(
579 %source_id,
580 "Triggering split re-discovery via force_tick"
581 );
582 handle.force_tick().await.with_context(|| {
583 format!(
584 "failed to force tick for source {} after split reset",
585 source_id
586 )
587 })?;
588
589 tracing::info!(
590 %source_id,
591 "Split reset completed - new splits will be assigned on next tick"
592 );
593 Ok(())
594 } else {
595 Err(anyhow::anyhow!("source {} not found in source manager", source_id).into())
596 }
597 }
598
599 pub async fn validate_inject_source_offsets(
606 &self,
607 source_id: SourceId,
608 split_offsets: &HashMap<String, String>,
609 ) -> MetaResult<Vec<String>> {
610 let (fragment_ids, env) = {
611 let core = self.core.lock().await;
612
613 let _ = core.managed_sources.get(&source_id).ok_or_else(|| {
615 MetaError::invalid_parameter(format!(
616 "source {} not found in source manager",
617 source_id
618 ))
619 })?;
620
621 let mut ids = Vec::new();
622 if let Some(src_frags) = core.source_fragments.get(&source_id) {
623 ids.extend(src_frags.iter().copied());
624 }
625 if let Some(backfill_frags) = core.backfill_fragments.get(&source_id) {
626 ids.extend(
627 backfill_frags
628 .iter()
629 .flat_map(|(id, upstream)| [*id, *upstream]),
630 );
631 }
632 (ids, core.env.clone())
633 };
634
635 if fragment_ids.is_empty() {
636 return Err(MetaError::invalid_parameter(format!(
637 "source {} has no running fragments",
638 source_id
639 )));
640 }
641
642 let guard = env.shared_actor_infos().read_guard();
643 let mut assigned_split_ids = HashSet::new();
644 for fragment_id in fragment_ids {
645 if let Some(fragment) = guard.get_fragment(fragment_id) {
646 for actor in fragment.actors.values() {
647 for split in &actor.splits {
648 assigned_split_ids.insert(split.id().to_string());
649 }
650 }
651 }
652 }
653
654 let mut invalid_splits = Vec::new();
656 for split_id in split_offsets.keys() {
657 if !assigned_split_ids.contains(split_id) {
658 invalid_splits.push(split_id.clone());
659 }
660 }
661
662 if !invalid_splits.is_empty() {
663 return Err(MetaError::invalid_parameter(format!(
664 "invalid split IDs for source {}: {:?}. Valid splits are: {:?}",
665 source_id,
666 invalid_splits,
667 assigned_split_ids.iter().collect::<Vec<_>>()
668 )));
669 }
670
671 tracing::info!(
672 source_id = %source_id,
673 num_splits = split_offsets.len(),
674 "Validated inject source offsets request"
675 );
676
677 Ok(split_offsets.keys().cloned().collect())
678 }
679}
680
681#[derive(strum::Display, Debug)]
682pub enum SourceChange {
683 CreateJob {
686 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
687 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
689 },
690 UpdateSourceProps {
691 source_id_map_new_props: HashMap<SourceId, HashMap<String, String>>,
694 },
695 CreateJobFinished {
699 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
701 },
702 DropSource { dropped_source_ids: Vec<SourceId> },
704 DropMv {
705 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
707 },
708 ReplaceJob {
709 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
710 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
711 fragment_replacements: HashMap<FragmentId, FragmentId>,
712 },
713}
714
715pub fn build_actor_connector_splits(
716 splits: &HashMap<ActorId, Vec<SplitImpl>>,
717) -> HashMap<ActorId, ConnectorSplits> {
718 splits
719 .iter()
720 .map(|(&actor_id, splits)| {
721 (
722 actor_id,
723 ConnectorSplits {
724 splits: splits.iter().map(ConnectorSplit::from).collect(),
725 },
726 )
727 })
728 .collect()
729}
730
731pub fn build_actor_split_impls(
732 actor_splits: &HashMap<ActorId, ConnectorSplits>,
733) -> HashMap<ActorId, Vec<SplitImpl>> {
734 actor_splits
735 .iter()
736 .map(|(actor_id, ConnectorSplits { splits })| {
737 (
738 *actor_id,
739 splits
740 .iter()
741 .map(|split| SplitImpl::try_from(split).unwrap())
742 .collect(),
743 )
744 })
745 .collect()
746}