1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::panic_if_debug;
28use risingwave_connector::WithOptionsSecResolved;
29use risingwave_connector::error::ConnectorResult;
30use risingwave_connector::source::{
31 ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
32 SplitMetaData, fill_adaptive_split,
33};
34use risingwave_meta_model::SourceId;
35use risingwave_pb::catalog::Source;
36use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{Mutex, MutexGuard, oneshot};
40use tokio::task::JoinHandle;
41use tokio::time::MissedTickBehavior;
42use tokio::{select, time};
43pub use worker::create_source_worker;
44use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
45
46use crate::MetaResult;
47use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
48use crate::manager::MetadataManager;
49use crate::model::{ActorId, FragmentId, StreamJobFragments};
50use crate::rpc::metrics::MetaMetrics;
51
52pub type SourceManagerRef = Arc<SourceManager>;
53pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
54pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
55pub type ConnectorPropsChange = HashMap<u32, HashMap<String, String>>;
57
58const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
59
60pub struct SourceManager {
63 pub paused: Mutex<()>,
64 barrier_scheduler: BarrierScheduler,
65 core: Mutex<SourceManagerCore>,
66 pub metrics: Arc<MetaMetrics>,
67}
68pub struct SourceManagerCore {
69 metadata_manager: MetadataManager,
70
71 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
73 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
75 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
77
78 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
81}
82
83pub struct SourceManagerRunningInfo {
84 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
85 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
86 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
87}
88
89impl SourceManagerCore {
90 fn new(
91 metadata_manager: MetadataManager,
92 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
93 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
94 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
95 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
96 ) -> Self {
97 Self {
98 metadata_manager,
99 managed_sources,
100 source_fragments,
101 backfill_fragments,
102 actor_splits,
103 }
104 }
105
106 pub fn apply_source_change(&mut self, source_change: SourceChange) {
108 let mut added_source_fragments = Default::default();
109 let mut added_backfill_fragments = Default::default();
110 let mut finished_backfill_fragments = Default::default();
111 let mut split_assignment = Default::default();
112 let mut dropped_actors = Default::default();
113 let mut fragment_replacements = Default::default();
114 let mut dropped_source_fragments = Default::default();
115 let mut dropped_source_ids = Default::default();
116 let mut recreate_source_id_map_new_props: Vec<(u32, HashMap<String, String>)> =
117 Default::default();
118
119 match source_change {
120 SourceChange::CreateJob {
121 added_source_fragments: added_source_fragments_,
122 added_backfill_fragments: added_backfill_fragments_,
123 split_assignment: split_assignment_,
124 } => {
125 added_source_fragments = added_source_fragments_;
126 added_backfill_fragments = added_backfill_fragments_;
127 split_assignment = split_assignment_;
128 }
129 SourceChange::CreateJobFinished {
130 finished_backfill_fragments: finished_backfill_fragments_,
131 } => {
132 finished_backfill_fragments = finished_backfill_fragments_;
133 }
134 SourceChange::SplitChange(split_assignment_) => {
135 split_assignment = split_assignment_;
136 }
137 SourceChange::DropMv {
138 dropped_source_fragments: dropped_source_fragments_,
139 dropped_actors: dropped_actors_,
140 } => {
141 dropped_source_fragments = dropped_source_fragments_;
142 dropped_actors = dropped_actors_;
143 }
144 SourceChange::ReplaceJob {
145 dropped_source_fragments: dropped_source_fragments_,
146 dropped_actors: dropped_actors_,
147 added_source_fragments: added_source_fragments_,
148 split_assignment: split_assignment_,
149 fragment_replacements: fragment_replacements_,
150 } => {
151 dropped_source_fragments = dropped_source_fragments_;
152 dropped_actors = dropped_actors_;
153 added_source_fragments = added_source_fragments_;
154 split_assignment = split_assignment_;
155 fragment_replacements = fragment_replacements_;
156 }
157 SourceChange::DropSource {
158 dropped_source_ids: dropped_source_ids_,
159 } => {
160 dropped_source_ids = dropped_source_ids_;
161 }
162 SourceChange::Reschedule {
163 split_assignment: split_assignment_,
164 dropped_actors: dropped_actors_,
165 } => {
166 split_assignment = split_assignment_;
167 dropped_actors = dropped_actors_;
168 }
169 SourceChange::UpdateSourceProps {
170 source_id_map_new_props,
171 } => {
172 for (source_id, new_props) in source_id_map_new_props {
173 recreate_source_id_map_new_props.push((source_id, new_props));
174 }
175 }
176 }
177
178 for source_id in dropped_source_ids {
179 let dropped_fragments = self.source_fragments.remove(&source_id);
180
181 if let Some(handle) = self.managed_sources.remove(&source_id) {
182 handle.terminate(dropped_fragments);
183 }
184 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
185 }
192 }
193
194 for (source_id, fragments) in added_source_fragments {
195 self.source_fragments
196 .entry(source_id)
197 .or_default()
198 .extend(fragments);
199 }
200
201 for (source_id, fragments) in added_backfill_fragments {
202 self.backfill_fragments
203 .entry(source_id)
204 .or_default()
205 .extend(fragments);
206 }
207
208 for (source_id, fragments) in finished_backfill_fragments {
209 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
210 panic!(
211 "source {} not found when adding backfill fragments {:?}",
212 source_id, fragments
213 );
214 });
215 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
216 }
217
218 for (_, actor_splits) in split_assignment {
219 for (actor_id, splits) in actor_splits {
220 self.actor_splits.insert(actor_id, splits);
222 }
223 }
224
225 for actor_id in dropped_actors {
226 self.actor_splits.remove(&actor_id);
227 }
228
229 for (source_id, fragment_ids) in dropped_source_fragments {
230 self.drop_source_fragments(Some(source_id), fragment_ids);
231 }
232
233 for (old_fragment_id, new_fragment_id) in fragment_replacements {
234 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
236
237 for fragment_ids in self.backfill_fragments.values_mut() {
238 let mut new_backfill_fragment_ids = fragment_ids.clone();
239 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
240 assert_ne!(
241 fragment_id, upstream_fragment_id,
242 "backfill fragment should not be replaced"
243 );
244 if *upstream_fragment_id == old_fragment_id {
245 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
246 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
247 }
248 }
249 *fragment_ids = new_backfill_fragment_ids;
250 }
251 }
252
253 for (source_id, new_props) in recreate_source_id_map_new_props {
254 tracing::info!("recreate source {source_id} in source manager");
255 if let Some(handle) = self.managed_sources.get_mut(&(source_id as _)) {
256 let props_wrapper =
259 WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
260 let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); handle.update_props(props);
262 }
263 }
264 }
265
266 fn drop_source_fragments(
267 &mut self,
268 source_id: Option<SourceId>,
269 dropped_fragment_ids: BTreeSet<FragmentId>,
270 ) {
271 if let Some(source_id) = source_id {
272 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
273 let mut dropped_ids = vec![];
274 let managed_fragment_ids = entry.get_mut();
275 for fragment_id in &dropped_fragment_ids {
276 managed_fragment_ids.remove(fragment_id);
277 dropped_ids.push(*fragment_id);
278 }
279 if let Some(handle) = self.managed_sources.get(&source_id) {
280 handle.drop_fragments(dropped_ids);
281 } else {
282 panic_if_debug!(
283 "source {source_id} not found when dropping fragment {dropped_ids:?}",
284 );
285 }
286 if managed_fragment_ids.is_empty() {
287 entry.remove();
288 }
289 }
290 } else {
291 for (source_id, fragment_ids) in &mut self.source_fragments {
292 let mut dropped_ids = vec![];
293 for fragment_id in &dropped_fragment_ids {
294 if fragment_ids.remove(fragment_id) {
295 dropped_ids.push(*fragment_id);
296 }
297 }
298 if !dropped_ids.is_empty() {
299 if let Some(handle) = self.managed_sources.get(source_id) {
300 handle.drop_fragments(dropped_ids);
301 } else {
302 panic_if_debug!(
303 "source {source_id} not found when dropping fragment {dropped_ids:?}",
304 );
305 }
306 }
307 }
308 }
309 }
310}
311
312impl SourceManager {
313 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
314
315 pub async fn new(
316 barrier_scheduler: BarrierScheduler,
317 metadata_manager: MetadataManager,
318 metrics: Arc<MetaMetrics>,
319 ) -> MetaResult<Self> {
320 let mut managed_sources = HashMap::new();
321 {
322 let sources = metadata_manager.list_sources().await?;
323 for source in sources {
324 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
325 }
326 }
327
328 let source_fragments = metadata_manager
329 .catalog_controller
330 .load_source_fragment_ids()
331 .await?
332 .into_iter()
333 .map(|(source_id, fragment_ids)| {
334 (
335 source_id as SourceId,
336 fragment_ids.into_iter().map(|id| id as _).collect(),
337 )
338 })
339 .collect();
340 let backfill_fragments = metadata_manager
341 .catalog_controller
342 .load_backfill_fragment_ids()
343 .await?
344 .into_iter()
345 .map(|(source_id, fragment_ids)| {
346 (
347 source_id as SourceId,
348 fragment_ids
349 .into_iter()
350 .map(|(id, up_id)| (id as _, up_id as _))
351 .collect(),
352 )
353 })
354 .collect();
355 let actor_splits = metadata_manager
356 .catalog_controller
357 .load_actor_splits()
358 .await?
359 .into_iter()
360 .map(|(actor_id, splits)| {
361 (
362 actor_id as ActorId,
363 splits
364 .to_protobuf()
365 .splits
366 .iter()
367 .map(|split| SplitImpl::try_from(split).unwrap())
368 .collect(),
369 )
370 })
371 .collect();
372
373 let core = Mutex::new(SourceManagerCore::new(
374 metadata_manager,
375 managed_sources,
376 source_fragments,
377 backfill_fragments,
378 actor_splits,
379 ));
380
381 Ok(Self {
382 barrier_scheduler,
383 core,
384 paused: Mutex::new(()),
385 metrics,
386 })
387 }
388
389 pub async fn validate_source_once(
390 &self,
391 source_id: u32,
392 new_source_props: WithOptionsSecResolved,
393 ) -> MetaResult<()> {
394 let props = ConnectorProperties::extract(new_source_props, false).unwrap();
395
396 {
397 let mut enumerator = props
398 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
399 metrics: self.metrics.source_enumerator_metrics.clone(),
400 info: SourceEnumeratorInfo { source_id },
401 }))
402 .await
403 .context("failed to create SplitEnumerator")?;
404
405 let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
406 .await
407 .context("failed to list splits")??;
408 }
409 Ok(())
410 }
411
412 #[await_tree::instrument]
414 pub async fn handle_replace_job(
415 &self,
416 dropped_job_fragments: &StreamJobFragments,
417 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
418 split_assignment: SplitAssignment,
419 replace_plan: &ReplaceStreamJobPlan,
420 ) {
421 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
423
424 let fragments = &dropped_job_fragments.fragments;
425
426 let dropped_actors = dropped_source_fragments
427 .values()
428 .flatten()
429 .flat_map(|fragment_id| fragments.get(fragment_id).unwrap().actors.iter())
430 .map(|actor| actor.actor_id)
431 .collect::<HashSet<_>>();
432
433 self.apply_source_change(SourceChange::ReplaceJob {
434 dropped_source_fragments,
435 dropped_actors,
436 added_source_fragments,
437 split_assignment,
438 fragment_replacements: replace_plan.fragment_replacements(),
439 })
440 .await;
441 }
442
443 #[await_tree::instrument("apply_source_change({source_change})")]
446 pub async fn apply_source_change(&self, source_change: SourceChange) {
447 let mut core = self.core.lock().await;
448 core.apply_source_change(source_change);
449 }
450
451 #[await_tree::instrument("register_source({})", source.name)]
453 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
454 tracing::debug!("register_source: {}", source.get_id());
455 let mut core = self.core.lock().await;
456 if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) {
457 let handle = create_source_worker(source, self.metrics.clone())
458 .await
459 .context("failed to create source worker")?;
460 e.insert(handle);
461 } else {
462 tracing::warn!("source {} already registered", source.get_id());
463 }
464 Ok(())
465 }
466
467 pub async fn register_source_with_handle(
469 &self,
470 source_id: SourceId,
471 handle: ConnectorSourceWorkerHandle,
472 ) {
473 let mut core = self.core.lock().await;
474 if let Entry::Vacant(e) = core.managed_sources.entry(source_id) {
475 e.insert(handle);
476 } else {
477 tracing::warn!("source {} already registered", source_id);
478 }
479 }
480
481 pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
482 let core = self.core.lock().await;
483 core.actor_splits.clone()
484 }
485
486 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
487 let core = self.core.lock().await;
488 SourceManagerRunningInfo {
489 source_fragments: core.source_fragments.clone(),
490 backfill_fragments: core.backfill_fragments.clone(),
491 actor_splits: core.actor_splits.clone(),
492 }
493 }
494
495 async fn tick(&self) -> MetaResult<()> {
504 let split_assignment = {
505 let core_guard = self.core.lock().await;
506 core_guard.reassign_splits().await?
507 };
508
509 for (database_id, split_assignment) in split_assignment {
510 if !split_assignment.is_empty() {
511 let command = Command::SourceChangeSplit(split_assignment);
512 tracing::info!(command = ?command, "pushing down split assignment command");
513 self.barrier_scheduler
514 .run_command(database_id, command)
515 .await?;
516 }
517 }
518
519 Ok(())
520 }
521
522 pub async fn run(&self) -> MetaResult<()> {
523 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
524 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
525 loop {
526 ticker.tick().await;
527 let _pause_guard = self.paused.lock().await;
528 if let Err(e) = self.tick().await {
529 tracing::error!(
530 error = %e.as_report(),
531 "error happened while running source manager tick",
532 );
533 }
534 }
535 }
536
537 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
539 tracing::debug!("pausing tick lock in source manager");
540 self.paused.lock().await
541 }
542}
543
544#[derive(strum::Display)]
545pub enum SourceChange {
546 CreateJob {
549 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
550 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
552 split_assignment: SplitAssignment,
553 },
554 UpdateSourceProps {
555 source_id_map_new_props: HashMap<u32, HashMap<String, String>>,
558 },
559 CreateJobFinished {
563 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
565 },
566 SplitChange(SplitAssignment),
567 DropSource {
569 dropped_source_ids: Vec<SourceId>,
570 },
571 DropMv {
572 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
574 dropped_actors: HashSet<ActorId>,
575 },
576 ReplaceJob {
577 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
578 dropped_actors: HashSet<ActorId>,
579
580 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
581 split_assignment: SplitAssignment,
582 fragment_replacements: HashMap<FragmentId, FragmentId>,
583 },
584 Reschedule {
585 split_assignment: SplitAssignment,
586 dropped_actors: HashSet<ActorId>,
587 },
588}
589
590pub fn build_actor_connector_splits(
591 splits: &HashMap<ActorId, Vec<SplitImpl>>,
592) -> HashMap<u32, ConnectorSplits> {
593 splits
594 .iter()
595 .map(|(&actor_id, splits)| {
596 (
597 actor_id,
598 ConnectorSplits {
599 splits: splits.iter().map(ConnectorSplit::from).collect(),
600 },
601 )
602 })
603 .collect()
604}
605
606pub fn build_actor_split_impls(
607 actor_splits: &HashMap<u32, ConnectorSplits>,
608) -> HashMap<ActorId, Vec<SplitImpl>> {
609 actor_splits
610 .iter()
611 .map(|(actor_id, ConnectorSplits { splits })| {
612 (
613 *actor_id,
614 splits
615 .iter()
616 .map(|split| SplitImpl::try_from(split).unwrap())
617 .collect(),
618 )
619 })
620 .collect()
621}