1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32 ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
33 SplitMetaData, fill_adaptive_split,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::SplitState;
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::MetaResult;
49use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
50use crate::manager::MetadataManager;
51use crate::model::{ActorId, FragmentId, StreamJobFragments};
52use crate::rpc::metrics::MetaMetrics;
53
54pub type SourceManagerRef = Arc<SourceManager>;
55pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
56pub type DiscoveredSourceSplits = HashMap<SourceId, Vec<SplitImpl>>;
57pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
58pub type ConnectorPropsChange = HashMap<u32, HashMap<String, String>>;
60
61const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
62
63pub struct SourceManager {
66 pub paused: Mutex<()>,
67 barrier_scheduler: BarrierScheduler,
68 core: Mutex<SourceManagerCore>,
69 pub metrics: Arc<MetaMetrics>,
70}
71pub struct SourceManagerCore {
72 metadata_manager: MetadataManager,
73
74 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
76 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
78 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
80
81 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
84}
85
86pub struct SourceManagerRunningInfo {
87 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
88 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
89 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
90}
91
92impl SourceManagerCore {
93 fn new(
94 metadata_manager: MetadataManager,
95 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
96 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
97 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
98 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
99 ) -> Self {
100 Self {
101 metadata_manager,
102 managed_sources,
103 source_fragments,
104 backfill_fragments,
105 actor_splits,
106 }
107 }
108
109 pub fn apply_source_change(&mut self, source_change: SourceChange) {
111 let mut added_source_fragments = Default::default();
112 let mut added_backfill_fragments = Default::default();
113 let mut finished_backfill_fragments = Default::default();
114 let mut split_assignment = Default::default();
115 let mut dropped_actors = Default::default();
116 let mut fragment_replacements = Default::default();
117 let mut dropped_source_fragments = Default::default();
118 let mut dropped_source_ids = Default::default();
119 let mut recreate_source_id_map_new_props: Vec<(u32, HashMap<String, String>)> =
120 Default::default();
121
122 match source_change {
123 SourceChange::CreateJob {
124 added_source_fragments: added_source_fragments_,
125 added_backfill_fragments: added_backfill_fragments_,
126 split_assignment: split_assignment_,
127 } => {
128 added_source_fragments = added_source_fragments_;
129 added_backfill_fragments = added_backfill_fragments_;
130 split_assignment = split_assignment_;
131 }
132 SourceChange::CreateJobFinished {
133 finished_backfill_fragments: finished_backfill_fragments_,
134 } => {
135 finished_backfill_fragments = finished_backfill_fragments_;
136 }
137 SourceChange::SplitChange(split_assignment_) => {
138 split_assignment = split_assignment_;
139 }
140 SourceChange::DropMv {
141 dropped_source_fragments: dropped_source_fragments_,
142 dropped_actors: dropped_actors_,
143 } => {
144 dropped_source_fragments = dropped_source_fragments_;
145 dropped_actors = dropped_actors_;
146 }
147 SourceChange::ReplaceJob {
148 dropped_source_fragments: dropped_source_fragments_,
149 dropped_actors: dropped_actors_,
150 added_source_fragments: added_source_fragments_,
151 split_assignment: split_assignment_,
152 fragment_replacements: fragment_replacements_,
153 } => {
154 dropped_source_fragments = dropped_source_fragments_;
155 dropped_actors = dropped_actors_;
156 added_source_fragments = added_source_fragments_;
157 split_assignment = split_assignment_;
158 fragment_replacements = fragment_replacements_;
159 }
160 SourceChange::DropSource {
161 dropped_source_ids: dropped_source_ids_,
162 } => {
163 dropped_source_ids = dropped_source_ids_;
164 }
165 SourceChange::Reschedule {
166 split_assignment: split_assignment_,
167 dropped_actors: dropped_actors_,
168 } => {
169 split_assignment = split_assignment_;
170 dropped_actors = dropped_actors_;
171 }
172 SourceChange::UpdateSourceProps {
173 source_id_map_new_props,
174 } => {
175 for (source_id, new_props) in source_id_map_new_props {
176 recreate_source_id_map_new_props.push((source_id, new_props));
177 }
178 }
179 }
180
181 for source_id in dropped_source_ids {
182 let dropped_fragments = self.source_fragments.remove(&source_id);
183
184 if let Some(handle) = self.managed_sources.remove(&source_id) {
185 handle.terminate(dropped_fragments);
186 }
187 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
188 }
195 }
196
197 for (source_id, fragments) in added_source_fragments {
198 self.source_fragments
199 .entry(source_id)
200 .or_default()
201 .extend(fragments);
202 }
203
204 for (source_id, fragments) in added_backfill_fragments {
205 self.backfill_fragments
206 .entry(source_id)
207 .or_default()
208 .extend(fragments);
209 }
210
211 for (source_id, fragments) in finished_backfill_fragments {
212 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
213 panic!(
214 "source {} not found when adding backfill fragments {:?}",
215 source_id, fragments
216 );
217 });
218 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
219 }
220
221 for (_, actor_splits) in split_assignment {
222 for (actor_id, splits) in actor_splits {
223 self.actor_splits.insert(actor_id, splits);
225 }
226 }
227
228 for actor_id in dropped_actors {
229 self.actor_splits.remove(&actor_id);
230 }
231
232 for (source_id, fragment_ids) in dropped_source_fragments {
233 self.drop_source_fragments(Some(source_id), fragment_ids);
234 }
235
236 for (old_fragment_id, new_fragment_id) in fragment_replacements {
237 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
239
240 for fragment_ids in self.backfill_fragments.values_mut() {
241 let mut new_backfill_fragment_ids = fragment_ids.clone();
242 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
243 assert_ne!(
244 fragment_id, upstream_fragment_id,
245 "backfill fragment should not be replaced"
246 );
247 if *upstream_fragment_id == old_fragment_id {
248 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
249 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
250 }
251 }
252 *fragment_ids = new_backfill_fragment_ids;
253 }
254 }
255
256 for (source_id, new_props) in recreate_source_id_map_new_props {
257 tracing::info!("recreate source {source_id} in source manager");
258 if let Some(handle) = self.managed_sources.get_mut(&(source_id as _)) {
259 let props_wrapper =
262 WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
263 let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); handle.update_props(props);
265 }
266 }
267 }
268
269 fn drop_source_fragments(
270 &mut self,
271 source_id: Option<SourceId>,
272 dropped_fragment_ids: BTreeSet<FragmentId>,
273 ) {
274 if let Some(source_id) = source_id {
275 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
276 let mut dropped_ids = vec![];
277 let managed_fragment_ids = entry.get_mut();
278 for fragment_id in &dropped_fragment_ids {
279 managed_fragment_ids.remove(fragment_id);
280 dropped_ids.push(*fragment_id);
281 }
282 if let Some(handle) = self.managed_sources.get(&source_id) {
283 handle.drop_fragments(dropped_ids);
284 } else {
285 panic_if_debug!(
286 "source {source_id} not found when dropping fragment {dropped_ids:?}",
287 );
288 }
289 if managed_fragment_ids.is_empty() {
290 entry.remove();
291 }
292 }
293 } else {
294 for (source_id, fragment_ids) in &mut self.source_fragments {
295 let mut dropped_ids = vec![];
296 for fragment_id in &dropped_fragment_ids {
297 if fragment_ids.remove(fragment_id) {
298 dropped_ids.push(*fragment_id);
299 }
300 }
301 if !dropped_ids.is_empty() {
302 if let Some(handle) = self.managed_sources.get(source_id) {
303 handle.drop_fragments(dropped_ids);
304 } else {
305 panic_if_debug!(
306 "source {source_id} not found when dropping fragment {dropped_ids:?}",
307 );
308 }
309 }
310 }
311 }
312 }
313
314 async fn update_source_splits(&self, source_id: SourceId) -> MetaResult<()> {
315 let handle_ref = self.managed_sources.get(&source_id).unwrap();
316
317 let discovered_splits = handle_ref.splits.lock().await.splits.clone();
318
319 if let Some(splits) = discovered_splits {
320 let source_splits =
321 HashMap::from([(source_id as _, splits.into_values().collect_vec())]);
322 self.metadata_manager
323 .update_source_splits(&source_splits)
324 .await?;
325 }
326 Ok(())
327 }
328}
329
330impl SourceManager {
331 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
332
333 pub async fn new(
334 barrier_scheduler: BarrierScheduler,
335 metadata_manager: MetadataManager,
336 metrics: Arc<MetaMetrics>,
337 ) -> MetaResult<Self> {
338 let mut managed_sources = HashMap::new();
339 {
340 let sources = metadata_manager.list_sources().await?;
341 for source in sources {
342 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
343 }
344 }
345
346 let source_fragments = metadata_manager
347 .catalog_controller
348 .load_source_fragment_ids()
349 .await?
350 .into_iter()
351 .map(|(source_id, fragment_ids)| {
352 (
353 source_id as SourceId,
354 fragment_ids.into_iter().map(|id| id as _).collect(),
355 )
356 })
357 .collect();
358 let backfill_fragments = metadata_manager
359 .catalog_controller
360 .load_backfill_fragment_ids()
361 .await?
362 .into_iter()
363 .map(|(source_id, fragment_ids)| {
364 (
365 source_id as SourceId,
366 fragment_ids
367 .into_iter()
368 .map(|(id, up_id)| (id as _, up_id as _))
369 .collect(),
370 )
371 })
372 .collect();
373 let actor_splits = metadata_manager
374 .catalog_controller
375 .load_actor_splits()
376 .await?
377 .into_iter()
378 .map(|(actor_id, splits)| {
379 (
380 actor_id as ActorId,
381 splits
382 .to_protobuf()
383 .splits
384 .iter()
385 .map(|split| SplitImpl::try_from(split).unwrap())
386 .collect(),
387 )
388 })
389 .collect();
390
391 let core = Mutex::new(SourceManagerCore::new(
392 metadata_manager,
393 managed_sources,
394 source_fragments,
395 backfill_fragments,
396 actor_splits,
397 ));
398
399 Ok(Self {
400 barrier_scheduler,
401 core,
402 paused: Mutex::new(()),
403 metrics,
404 })
405 }
406
407 pub async fn validate_source_once(
408 &self,
409 source_id: u32,
410 new_source_props: WithOptionsSecResolved,
411 ) -> MetaResult<()> {
412 let props = ConnectorProperties::extract(new_source_props, false).unwrap();
413
414 {
415 let mut enumerator = props
416 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
417 metrics: self.metrics.source_enumerator_metrics.clone(),
418 info: SourceEnumeratorInfo { source_id },
419 }))
420 .await
421 .context("failed to create SplitEnumerator")?;
422
423 let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
424 .await
425 .context("failed to list splits")??;
426 }
427 Ok(())
428 }
429
430 #[await_tree::instrument]
432 pub async fn handle_replace_job(
433 &self,
434 dropped_job_fragments: &StreamJobFragments,
435 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
436 split_assignment: SplitAssignment,
437 replace_plan: &ReplaceStreamJobPlan,
438 ) {
439 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
441
442 let fragments = &dropped_job_fragments.fragments;
443
444 let dropped_actors = dropped_source_fragments
445 .values()
446 .flatten()
447 .flat_map(|fragment_id| fragments.get(fragment_id).unwrap().actors.iter())
448 .map(|actor| actor.actor_id)
449 .collect::<HashSet<_>>();
450
451 self.apply_source_change(SourceChange::ReplaceJob {
452 dropped_source_fragments,
453 dropped_actors,
454 added_source_fragments,
455 split_assignment,
456 fragment_replacements: replace_plan.fragment_replacements(),
457 })
458 .await;
459 }
460
461 #[await_tree::instrument("apply_source_change({source_change})")]
464 pub async fn apply_source_change(&self, source_change: SourceChange) {
465 let mut core = self.core.lock().await;
466 core.apply_source_change(source_change);
467 }
468
469 #[await_tree::instrument("register_source({})", source.name)]
471 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
472 tracing::debug!("register_source: {}", source.get_id());
473 let mut core = self.core.lock().await;
474 let source_id = source.get_id() as _;
475 if core.managed_sources.contains_key(&source_id) {
476 tracing::warn!("source {} already registered", source_id);
477 return Ok(());
478 }
479
480 let handle = create_source_worker(source, self.metrics.clone())
481 .await
482 .context("failed to create source worker")?;
483 core.managed_sources.insert(source_id, handle);
484 core.update_source_splits(source_id).await?;
485 Ok(())
486 }
487
488 pub async fn register_source_with_handle(
490 &self,
491 source_id: SourceId,
492 handle: ConnectorSourceWorkerHandle,
493 ) -> MetaResult<()> {
494 let mut core = self.core.lock().await;
495 if core.managed_sources.contains_key(&source_id) {
496 tracing::warn!("source {} already registered", source_id);
497 return Ok(());
498 }
499 core.managed_sources.insert(source_id, handle);
500 core.update_source_splits(source_id).await?;
501
502 Ok(())
503 }
504
505 pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
506 let core = self.core.lock().await;
507 core.actor_splits.clone()
508 }
509
510 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
511 let core = self.core.lock().await;
512 SourceManagerRunningInfo {
513 source_fragments: core.source_fragments.clone(),
514 backfill_fragments: core.backfill_fragments.clone(),
515 actor_splits: core.actor_splits.clone(),
516 }
517 }
518
519 async fn tick(&self) -> MetaResult<()> {
528 let split_states = {
529 let core_guard = self.core.lock().await;
530 core_guard.reassign_splits().await?
531 };
532
533 for (database_id, split_state) in split_states {
534 if !split_state.split_assignment.is_empty() {
535 let command = Command::SourceChangeSplit(split_state);
536 tracing::info!(command = ?command, "pushing down split assignment command");
537 self.barrier_scheduler
538 .run_command(database_id, command)
539 .await?;
540 }
541 }
542
543 Ok(())
544 }
545
546 pub async fn run(&self) -> MetaResult<()> {
547 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
548 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
549 loop {
550 ticker.tick().await;
551 let _pause_guard = self.paused.lock().await;
552 if let Err(e) = self.tick().await {
553 tracing::error!(
554 error = %e.as_report(),
555 "error happened while running source manager tick",
556 );
557 }
558 }
559 }
560
561 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
563 tracing::debug!("pausing tick lock in source manager");
564 self.paused.lock().await
565 }
566}
567
568#[derive(strum::Display)]
569pub enum SourceChange {
570 CreateJob {
573 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
574 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
576 split_assignment: SplitAssignment,
577 },
578 UpdateSourceProps {
579 source_id_map_new_props: HashMap<u32, HashMap<String, String>>,
582 },
583 CreateJobFinished {
587 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
589 },
590 SplitChange(SplitAssignment),
591 DropSource {
593 dropped_source_ids: Vec<SourceId>,
594 },
595 DropMv {
596 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
598 dropped_actors: HashSet<ActorId>,
599 },
600 ReplaceJob {
601 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
602 dropped_actors: HashSet<ActorId>,
603
604 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
605 split_assignment: SplitAssignment,
606 fragment_replacements: HashMap<FragmentId, FragmentId>,
607 },
608 Reschedule {
609 split_assignment: SplitAssignment,
610 dropped_actors: HashSet<ActorId>,
611 },
612}
613
614pub fn build_actor_connector_splits(
615 splits: &HashMap<ActorId, Vec<SplitImpl>>,
616) -> HashMap<u32, ConnectorSplits> {
617 splits
618 .iter()
619 .map(|(&actor_id, splits)| {
620 (
621 actor_id,
622 ConnectorSplits {
623 splits: splits.iter().map(ConnectorSplit::from).collect(),
624 },
625 )
626 })
627 .collect()
628}
629
630pub fn build_actor_split_impls(
631 actor_splits: &HashMap<u32, ConnectorSplits>,
632) -> HashMap<ActorId, Vec<SplitImpl>> {
633 actor_splits
634 .iter()
635 .map(|(actor_id, ConnectorSplits { splits })| {
636 (
637 *actor_id,
638 splits
639 .iter()
640 .map(|split| SplitImpl::try_from(split).unwrap())
641 .collect(),
642 )
643 })
644 .collect()
645}