1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::id::ObjectId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32 ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
33 SplitMetaData, fill_adaptive_split,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::{SplitDiffOptions, SplitState, reassign_splits};
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan, SharedActorInfos};
49use crate::manager::{MetaSrvEnv, MetadataManager};
50use crate::model::{ActorId, FragmentId, StreamJobFragments};
51use crate::rpc::metrics::MetaMetrics;
52use crate::{MetaError, MetaResult};
53
54pub type SourceManagerRef = Arc<SourceManager>;
55pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
56pub type DiscoveredSourceSplits = HashMap<SourceId, Vec<SplitImpl>>;
57
58pub type ConnectorPropsChange = HashMap<ObjectId, HashMap<String, String>>;
60
61const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
62
63pub struct SourceManager {
66 pub paused: Mutex<()>,
67 barrier_scheduler: BarrierScheduler,
68 core: Mutex<SourceManagerCore>,
69 pub metrics: Arc<MetaMetrics>,
70}
71pub struct SourceManagerCore {
72 metadata_manager: MetadataManager,
73
74 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
76 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
78 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
80
81 env: MetaSrvEnv,
82}
83
84pub struct SourceManagerRunningInfo {
85 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
86 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
87}
88
89impl SourceManagerCore {
90 fn new(
91 metadata_manager: MetadataManager,
92 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
93 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
94 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
95 env: MetaSrvEnv,
96 ) -> Self {
97 Self {
98 metadata_manager,
99 managed_sources,
100 source_fragments,
101 backfill_fragments,
102 env,
103 }
104 }
105
106 pub fn apply_source_change(&mut self, source_change: SourceChange) {
108 let mut added_source_fragments = Default::default();
109 let mut added_backfill_fragments = Default::default();
110 let mut finished_backfill_fragments = Default::default();
111 let mut fragment_replacements = Default::default();
112 let mut dropped_source_fragments = Default::default();
113 let mut dropped_source_ids = Default::default();
114 let mut recreate_source_id_map_new_props: Vec<(SourceId, HashMap<String, String>)> =
115 Default::default();
116
117 match source_change {
118 SourceChange::CreateJob {
119 added_source_fragments: added_source_fragments_,
120 added_backfill_fragments: added_backfill_fragments_,
121 } => {
122 added_source_fragments = added_source_fragments_;
123 added_backfill_fragments = added_backfill_fragments_;
124 }
125 SourceChange::CreateJobFinished {
126 finished_backfill_fragments: finished_backfill_fragments_,
127 } => {
128 finished_backfill_fragments = finished_backfill_fragments_;
129 }
130
131 SourceChange::DropMv {
132 dropped_source_fragments: dropped_source_fragments_,
133 } => {
134 dropped_source_fragments = dropped_source_fragments_;
135 }
136 SourceChange::ReplaceJob {
137 dropped_source_fragments: dropped_source_fragments_,
138 added_source_fragments: added_source_fragments_,
139 fragment_replacements: fragment_replacements_,
140 } => {
141 dropped_source_fragments = dropped_source_fragments_;
142 added_source_fragments = added_source_fragments_;
143 fragment_replacements = fragment_replacements_;
144 }
145 SourceChange::DropSource {
146 dropped_source_ids: dropped_source_ids_,
147 } => {
148 dropped_source_ids = dropped_source_ids_;
149 }
150
151 SourceChange::UpdateSourceProps {
152 source_id_map_new_props,
153 } => {
154 for (source_id, new_props) in source_id_map_new_props {
155 recreate_source_id_map_new_props.push((source_id, new_props));
156 }
157 }
158 }
159
160 for source_id in dropped_source_ids {
161 let dropped_fragments = self.source_fragments.remove(&source_id);
162
163 if let Some(handle) = self.managed_sources.remove(&source_id) {
164 handle.terminate(dropped_fragments);
165 }
166 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
167 }
174 }
175
176 for (source_id, fragments) in added_source_fragments {
177 self.source_fragments
178 .entry(source_id)
179 .or_default()
180 .extend(fragments);
181 }
182
183 for (source_id, fragments) in added_backfill_fragments {
184 self.backfill_fragments
185 .entry(source_id)
186 .or_default()
187 .extend(fragments);
188 }
189
190 for (source_id, fragments) in finished_backfill_fragments {
191 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
192 panic!(
193 "source {} not found when adding backfill fragments {:?}",
194 source_id, fragments
195 );
196 });
197 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
198 }
199
200 for (source_id, fragment_ids) in dropped_source_fragments {
201 self.drop_source_fragments(Some(source_id), fragment_ids);
202 }
203
204 for (old_fragment_id, new_fragment_id) in fragment_replacements {
205 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
207
208 for fragment_ids in self.backfill_fragments.values_mut() {
209 let mut new_backfill_fragment_ids = fragment_ids.clone();
210 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
211 assert_ne!(
212 fragment_id, upstream_fragment_id,
213 "backfill fragment should not be replaced"
214 );
215 if *upstream_fragment_id == old_fragment_id {
216 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
217 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
218 }
219 }
220 *fragment_ids = new_backfill_fragment_ids;
221 }
222 }
223
224 for (source_id, new_props) in recreate_source_id_map_new_props {
225 if let Some(handle) = self.managed_sources.get_mut(&source_id) {
226 let props_wrapper =
229 WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
230 let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); handle.update_props(props);
232 tracing::info!("update source {source_id} properties in source manager");
233 } else {
234 tracing::info!("job id {source_id} is not registered in source manager");
235 }
236 }
237 }
238
239 fn drop_source_fragments(
240 &mut self,
241 source_id: Option<SourceId>,
242 dropped_fragment_ids: BTreeSet<FragmentId>,
243 ) {
244 if let Some(source_id) = source_id {
245 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
246 let mut dropped_ids = vec![];
247 let managed_fragment_ids = entry.get_mut();
248 for fragment_id in &dropped_fragment_ids {
249 managed_fragment_ids.remove(fragment_id);
250 dropped_ids.push(*fragment_id);
251 }
252 if let Some(handle) = self.managed_sources.get(&source_id) {
253 handle.drop_fragments(dropped_ids);
254 } else {
255 panic_if_debug!(
256 "source {source_id} not found when dropping fragment {dropped_ids:?}",
257 );
258 }
259 if managed_fragment_ids.is_empty() {
260 entry.remove();
261 }
262 }
263 } else {
264 for (source_id, fragment_ids) in &mut self.source_fragments {
265 let mut dropped_ids = vec![];
266 for fragment_id in &dropped_fragment_ids {
267 if fragment_ids.remove(fragment_id) {
268 dropped_ids.push(*fragment_id);
269 }
270 }
271 if !dropped_ids.is_empty() {
272 if let Some(handle) = self.managed_sources.get(source_id) {
273 handle.drop_fragments(dropped_ids);
274 } else {
275 panic_if_debug!(
276 "source {source_id} not found when dropping fragment {dropped_ids:?}",
277 );
278 }
279 }
280 }
281 }
282 }
283}
284
285impl SourceManager {
286 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
287
288 pub async fn new(
289 barrier_scheduler: BarrierScheduler,
290 metadata_manager: MetadataManager,
291 metrics: Arc<MetaMetrics>,
292 env: MetaSrvEnv,
293 ) -> MetaResult<Self> {
294 let mut managed_sources = HashMap::new();
295 {
296 let sources = metadata_manager.list_sources().await?;
297 for source in sources {
298 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
299 }
300 }
301
302 let source_fragments = metadata_manager
303 .catalog_controller
304 .load_source_fragment_ids()
305 .await?
306 .into_iter()
307 .map(|(source_id, fragment_ids)| {
308 (
309 source_id as SourceId,
310 fragment_ids.into_iter().map(|id| id as _).collect(),
311 )
312 })
313 .collect();
314 let backfill_fragments = metadata_manager
315 .catalog_controller
316 .load_backfill_fragment_ids()
317 .await?;
318
319 let core = Mutex::new(SourceManagerCore::new(
320 metadata_manager,
321 managed_sources,
322 source_fragments,
323 backfill_fragments,
324 env,
325 ));
326
327 Ok(Self {
328 barrier_scheduler,
329 core,
330 paused: Mutex::new(()),
331 metrics,
332 })
333 }
334
335 pub async fn validate_source_once(
336 &self,
337 source_id: SourceId,
338 new_source_props: WithOptionsSecResolved,
339 ) -> MetaResult<()> {
340 let props = ConnectorProperties::extract(new_source_props, false).unwrap();
341
342 {
343 let mut enumerator = props
344 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
345 metrics: self.metrics.source_enumerator_metrics.clone(),
346 info: SourceEnumeratorInfo { source_id },
347 }))
348 .await
349 .context("failed to create SplitEnumerator")?;
350
351 let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
352 .await
353 .context("failed to list splits")??;
354 }
355 Ok(())
356 }
357
358 #[await_tree::instrument]
360 pub async fn handle_replace_job(
361 &self,
362 dropped_job_fragments: &StreamJobFragments,
363 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
364 replace_plan: &ReplaceStreamJobPlan,
365 ) {
366 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments();
368
369 self.apply_source_change(SourceChange::ReplaceJob {
370 dropped_source_fragments,
371 added_source_fragments,
372 fragment_replacements: replace_plan.fragment_replacements(),
373 })
374 .await;
375 }
376
377 #[await_tree::instrument("apply_source_change({source_change})")]
380 pub async fn apply_source_change(&self, source_change: SourceChange) {
381 let need_force_tick = matches!(source_change, SourceChange::UpdateSourceProps { .. });
382 let updated_source_ids = if let SourceChange::UpdateSourceProps {
383 ref source_id_map_new_props,
384 } = source_change
385 {
386 source_id_map_new_props.keys().cloned().collect::<Vec<_>>()
387 } else {
388 Vec::new()
389 };
390
391 {
392 let mut core = self.core.lock().await;
393 core.apply_source_change(source_change);
394 }
395
396 if need_force_tick {
398 self.force_tick_updated_sources(updated_source_ids).await;
399 }
400 }
401
402 #[await_tree::instrument("register_source({})", source.name)]
404 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
405 tracing::debug!("register_source: {}", source.get_id());
406 let mut core = self.core.lock().await;
407 let source_id = source.get_id();
408 if core.managed_sources.contains_key(&source_id) {
409 tracing::warn!("source {} already registered", source_id);
410 return Ok(());
411 }
412
413 let handle = create_source_worker(source, self.metrics.clone())
414 .await
415 .context("failed to create source worker")?;
416
417 core.managed_sources.insert(source_id, handle);
418
419 Ok(())
420 }
421
422 pub async fn register_source_with_handle(
424 &self,
425 source_id: SourceId,
426 handle: ConnectorSourceWorkerHandle,
427 ) {
428 let mut core = self.core.lock().await;
429 if core.managed_sources.contains_key(&source_id) {
430 tracing::warn!("source {} already registered", source_id);
431 return;
432 }
433
434 core.managed_sources.insert(source_id, handle);
435 }
436
437 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
438 let core = self.core.lock().await;
439
440 SourceManagerRunningInfo {
441 source_fragments: core.source_fragments.clone(),
442 backfill_fragments: core.backfill_fragments.clone(),
443 }
444 }
445
446 async fn tick(&self) -> MetaResult<()> {
455 let split_states = {
456 let core_guard = self.core.lock().await;
457 core_guard.reassign_splits().await?
458 };
459
460 for (database_id, split_state) in split_states {
461 if !split_state.split_assignment.is_empty() {
462 let command = Command::SourceChangeSplit(split_state);
463 tracing::info!(command = ?command, "pushing down split assignment command");
464 self.barrier_scheduler
465 .run_command(database_id, command)
466 .await?;
467 }
468 }
469
470 Ok(())
471 }
472
473 pub async fn run(&self) -> MetaResult<()> {
474 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
475 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
476 loop {
477 ticker.tick().await;
478 let _pause_guard = self.paused.lock().await;
479 if let Err(e) = self.tick().await {
480 tracing::error!(
481 error = %e.as_report(),
482 "error happened while running source manager tick",
483 );
484 }
485 }
486 }
487
488 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
490 tracing::debug!("pausing tick lock in source manager");
491 self.paused.lock().await
492 }
493
494 async fn force_tick_updated_sources(&self, updated_source_ids: Vec<SourceId>) {
496 let core = self.core.lock().await;
497 for source_id in updated_source_ids {
498 if let Some(handle) = core.managed_sources.get(&source_id) {
499 tracing::info!("forcing tick for updated source {}", source_id.as_raw_id());
500 if let Err(e) = handle.force_tick().await {
501 tracing::warn!(
502 error = %e.as_report(),
503 "failed to force tick for source {} after properties update",
504 source_id.as_raw_id()
505 );
506 }
507 } else {
508 tracing::warn!(
509 "source {} not found when trying to force tick after update",
510 source_id.as_raw_id()
511 );
512 }
513 }
514 }
515
516 pub async fn reset_source_splits(&self, source_id: SourceId) -> MetaResult<()> {
520 tracing::warn!(
521 source_id = source_id.as_raw_id(),
522 "UNSAFE: Resetting source splits - clearing cached state and triggering re-discovery"
523 );
524
525 let core = self.core.lock().await;
526 if let Some(handle) = core.managed_sources.get(&source_id) {
527 {
529 let mut splits_guard = handle.splits.lock().await;
530 tracing::info!(
531 source_id = source_id.as_raw_id(),
532 prev_splits = ?splits_guard.splits.as_ref().map(|s| s.len()),
533 "Clearing cached splits"
534 );
535 splits_guard.splits = None;
536 }
537
538 tracing::info!(
540 source_id = source_id.as_raw_id(),
541 "Triggering split re-discovery via force_tick"
542 );
543 handle.force_tick().await.with_context(|| {
544 format!(
545 "failed to force tick for source {} after split reset",
546 source_id.as_raw_id()
547 )
548 })?;
549
550 tracing::info!(
551 source_id = source_id.as_raw_id(),
552 "Split reset completed - new splits will be assigned on next tick"
553 );
554 Ok(())
555 } else {
556 Err(anyhow::anyhow!(
557 "source {} not found in source manager",
558 source_id.as_raw_id()
559 )
560 .into())
561 }
562 }
563
564 pub async fn validate_inject_source_offsets(
571 &self,
572 source_id: SourceId,
573 split_offsets: &HashMap<String, String>,
574 ) -> MetaResult<Vec<String>> {
575 let (fragment_ids, env) = {
576 let core = self.core.lock().await;
577
578 let _ = core.managed_sources.get(&source_id).ok_or_else(|| {
580 MetaError::invalid_parameter(format!(
581 "source {} not found in source manager",
582 source_id.as_raw_id()
583 ))
584 })?;
585
586 let mut ids = Vec::new();
587 if let Some(src_frags) = core.source_fragments.get(&source_id) {
588 ids.extend(src_frags.iter().copied());
589 }
590 if let Some(backfill_frags) = core.backfill_fragments.get(&source_id) {
591 ids.extend(
592 backfill_frags
593 .iter()
594 .flat_map(|(id, upstream)| [*id, *upstream]),
595 );
596 }
597 (ids, core.env.clone())
598 };
599
600 if fragment_ids.is_empty() {
601 return Err(MetaError::invalid_parameter(format!(
602 "source {} has no running fragments",
603 source_id.as_raw_id()
604 )));
605 }
606
607 let guard = env.shared_actor_infos().read_guard();
608 let mut assigned_split_ids = HashSet::new();
609 for fragment_id in fragment_ids {
610 if let Some(fragment) = guard.get_fragment(fragment_id) {
611 for actor in fragment.actors.values() {
612 for split in &actor.splits {
613 assigned_split_ids.insert(split.id().to_string());
614 }
615 }
616 }
617 }
618
619 let mut invalid_splits = Vec::new();
621 for split_id in split_offsets.keys() {
622 if !assigned_split_ids.contains(split_id) {
623 invalid_splits.push(split_id.clone());
624 }
625 }
626
627 if !invalid_splits.is_empty() {
628 return Err(MetaError::invalid_parameter(format!(
629 "invalid split IDs for source {}: {:?}. Valid splits are: {:?}",
630 source_id.as_raw_id(),
631 invalid_splits,
632 assigned_split_ids.iter().collect::<Vec<_>>()
633 )));
634 }
635
636 tracing::info!(
637 source_id = source_id.as_raw_id(),
638 num_splits = split_offsets.len(),
639 "Validated inject source offsets request"
640 );
641
642 Ok(split_offsets.keys().cloned().collect())
643 }
644}
645
646#[derive(strum::Display, Debug)]
647pub enum SourceChange {
648 CreateJob {
651 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
652 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
654 },
655 UpdateSourceProps {
656 source_id_map_new_props: HashMap<SourceId, HashMap<String, String>>,
659 },
660 CreateJobFinished {
664 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
666 },
667 DropSource { dropped_source_ids: Vec<SourceId> },
669 DropMv {
670 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
672 },
673 ReplaceJob {
674 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
675 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
676 fragment_replacements: HashMap<FragmentId, FragmentId>,
677 },
678}
679
680pub fn build_actor_connector_splits(
681 splits: &HashMap<ActorId, Vec<SplitImpl>>,
682) -> HashMap<ActorId, ConnectorSplits> {
683 splits
684 .iter()
685 .map(|(&actor_id, splits)| {
686 (
687 actor_id,
688 ConnectorSplits {
689 splits: splits.iter().map(ConnectorSplit::from).collect(),
690 },
691 )
692 })
693 .collect()
694}
695
696pub fn build_actor_split_impls(
697 actor_splits: &HashMap<ActorId, ConnectorSplits>,
698) -> HashMap<ActorId, Vec<SplitImpl>> {
699 actor_splits
700 .iter()
701 .map(|(actor_id, ConnectorSplits { splits })| {
702 (
703 *actor_id,
704 splits
705 .iter()
706 .map(|split| SplitImpl::try_from(split).unwrap())
707 .collect(),
708 )
709 })
710 .collect()
711}