1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32 ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
33 SplitMetaData, fill_adaptive_split,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::{SplitDiffOptions, SplitState, reassign_splits};
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::MetaResult;
49use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan, SharedActorInfos};
50use crate::manager::{MetaSrvEnv, MetadataManager};
51use crate::model::{ActorId, FragmentId, StreamJobFragments};
52use crate::rpc::metrics::MetaMetrics;
53
54pub type SourceManagerRef = Arc<SourceManager>;
55pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
56pub type DiscoveredSourceSplits = HashMap<SourceId, Vec<SplitImpl>>;
57pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
58pub type ConnectorPropsChange = HashMap<u32, HashMap<String, String>>;
60
61const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
62
63pub struct SourceManager {
66 pub paused: Mutex<()>,
67 barrier_scheduler: BarrierScheduler,
68 core: Mutex<SourceManagerCore>,
69 pub metrics: Arc<MetaMetrics>,
70}
71pub struct SourceManagerCore {
72 metadata_manager: MetadataManager,
73
74 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
76 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
78 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
80
81 env: MetaSrvEnv,
82}
83
84pub struct SourceManagerRunningInfo {
85 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
86 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
87}
88
89impl SourceManagerCore {
90 fn new(
91 metadata_manager: MetadataManager,
92 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
93 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
94 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
95 env: MetaSrvEnv,
96 ) -> Self {
97 Self {
98 metadata_manager,
99 managed_sources,
100 source_fragments,
101 backfill_fragments,
102 env,
103 }
104 }
105
106 pub fn apply_source_change(&mut self, source_change: SourceChange) {
108 let mut added_source_fragments = Default::default();
109 let mut added_backfill_fragments = Default::default();
110 let mut finished_backfill_fragments = Default::default();
111 let mut fragment_replacements = Default::default();
112 let mut dropped_source_fragments = Default::default();
113 let mut dropped_source_ids = Default::default();
114 let mut recreate_source_id_map_new_props: Vec<(u32, HashMap<String, String>)> =
115 Default::default();
116
117 match source_change {
118 SourceChange::CreateJob {
119 added_source_fragments: added_source_fragments_,
120 added_backfill_fragments: added_backfill_fragments_,
121 } => {
122 added_source_fragments = added_source_fragments_;
123 added_backfill_fragments = added_backfill_fragments_;
124 }
125 SourceChange::CreateJobFinished {
126 finished_backfill_fragments: finished_backfill_fragments_,
127 } => {
128 finished_backfill_fragments = finished_backfill_fragments_;
129 }
130
131 SourceChange::DropMv {
132 dropped_source_fragments: dropped_source_fragments_,
133 } => {
134 dropped_source_fragments = dropped_source_fragments_;
135 }
136 SourceChange::ReplaceJob {
137 dropped_source_fragments: dropped_source_fragments_,
138 added_source_fragments: added_source_fragments_,
139 fragment_replacements: fragment_replacements_,
140 } => {
141 dropped_source_fragments = dropped_source_fragments_;
142 added_source_fragments = added_source_fragments_;
143 fragment_replacements = fragment_replacements_;
144 }
145 SourceChange::DropSource {
146 dropped_source_ids: dropped_source_ids_,
147 } => {
148 dropped_source_ids = dropped_source_ids_;
149 }
150
151 SourceChange::UpdateSourceProps {
152 source_id_map_new_props,
153 } => {
154 for (source_id, new_props) in source_id_map_new_props {
155 recreate_source_id_map_new_props.push((source_id, new_props));
156 }
157 }
158 }
159
160 for source_id in dropped_source_ids {
161 let dropped_fragments = self.source_fragments.remove(&source_id);
162
163 if let Some(handle) = self.managed_sources.remove(&source_id) {
164 handle.terminate(dropped_fragments);
165 }
166 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
167 }
174 }
175
176 for (source_id, fragments) in added_source_fragments {
177 self.source_fragments
178 .entry(source_id)
179 .or_default()
180 .extend(fragments);
181 }
182
183 for (source_id, fragments) in added_backfill_fragments {
184 self.backfill_fragments
185 .entry(source_id)
186 .or_default()
187 .extend(fragments);
188 }
189
190 for (source_id, fragments) in finished_backfill_fragments {
191 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
192 panic!(
193 "source {} not found when adding backfill fragments {:?}",
194 source_id, fragments
195 );
196 });
197 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
198 }
199
200 for (source_id, fragment_ids) in dropped_source_fragments {
201 self.drop_source_fragments(Some(source_id), fragment_ids);
202 }
203
204 for (old_fragment_id, new_fragment_id) in fragment_replacements {
205 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
207
208 for fragment_ids in self.backfill_fragments.values_mut() {
209 let mut new_backfill_fragment_ids = fragment_ids.clone();
210 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
211 assert_ne!(
212 fragment_id, upstream_fragment_id,
213 "backfill fragment should not be replaced"
214 );
215 if *upstream_fragment_id == old_fragment_id {
216 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
217 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
218 }
219 }
220 *fragment_ids = new_backfill_fragment_ids;
221 }
222 }
223
224 for (source_id, new_props) in recreate_source_id_map_new_props {
225 tracing::info!("recreate source {source_id} in source manager");
226 if let Some(handle) = self.managed_sources.get_mut(&(source_id as _)) {
227 let props_wrapper =
230 WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
231 let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); handle.update_props(props);
233 }
234 }
235 }
236
237 fn drop_source_fragments(
238 &mut self,
239 source_id: Option<SourceId>,
240 dropped_fragment_ids: BTreeSet<FragmentId>,
241 ) {
242 if let Some(source_id) = source_id {
243 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
244 let mut dropped_ids = vec![];
245 let managed_fragment_ids = entry.get_mut();
246 for fragment_id in &dropped_fragment_ids {
247 managed_fragment_ids.remove(fragment_id);
248 dropped_ids.push(*fragment_id);
249 }
250 if let Some(handle) = self.managed_sources.get(&source_id) {
251 handle.drop_fragments(dropped_ids);
252 } else {
253 panic_if_debug!(
254 "source {source_id} not found when dropping fragment {dropped_ids:?}",
255 );
256 }
257 if managed_fragment_ids.is_empty() {
258 entry.remove();
259 }
260 }
261 } else {
262 for (source_id, fragment_ids) in &mut self.source_fragments {
263 let mut dropped_ids = vec![];
264 for fragment_id in &dropped_fragment_ids {
265 if fragment_ids.remove(fragment_id) {
266 dropped_ids.push(*fragment_id);
267 }
268 }
269 if !dropped_ids.is_empty() {
270 if let Some(handle) = self.managed_sources.get(source_id) {
271 handle.drop_fragments(dropped_ids);
272 } else {
273 panic_if_debug!(
274 "source {source_id} not found when dropping fragment {dropped_ids:?}",
275 );
276 }
277 }
278 }
279 }
280 }
281
282 async fn update_source_splits(&self, source_id: SourceId) -> MetaResult<()> {
283 let handle_ref = self.managed_sources.get(&source_id).unwrap();
284
285 let discovered_splits = handle_ref.splits.lock().await.splits.clone();
286
287 if let Some(splits) = discovered_splits {
288 let source_splits =
289 HashMap::from([(source_id as _, splits.into_values().collect_vec())]);
290 self.metadata_manager
291 .update_source_splits(&source_splits)
292 .await?;
293 }
294 Ok(())
295 }
296}
297
298impl SourceManager {
299 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
300
301 pub async fn new(
302 barrier_scheduler: BarrierScheduler,
303 metadata_manager: MetadataManager,
304 metrics: Arc<MetaMetrics>,
305 env: MetaSrvEnv,
306 ) -> MetaResult<Self> {
307 let mut managed_sources = HashMap::new();
308 {
309 let sources = metadata_manager.list_sources().await?;
310 for source in sources {
311 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
312 }
313 }
314
315 let source_fragments = metadata_manager
316 .catalog_controller
317 .load_source_fragment_ids()
318 .await?
319 .into_iter()
320 .map(|(source_id, fragment_ids)| {
321 (
322 source_id as SourceId,
323 fragment_ids.into_iter().map(|id| id as _).collect(),
324 )
325 })
326 .collect();
327 let backfill_fragments = metadata_manager
328 .catalog_controller
329 .load_backfill_fragment_ids()
330 .await?
331 .into_iter()
332 .map(|(source_id, fragment_ids)| {
333 (
334 source_id as SourceId,
335 fragment_ids
336 .into_iter()
337 .map(|(id, up_id)| (id as _, up_id as _))
338 .collect(),
339 )
340 })
341 .collect();
342
343 let core = Mutex::new(SourceManagerCore::new(
344 metadata_manager,
345 managed_sources,
346 source_fragments,
347 backfill_fragments,
348 env,
349 ));
350
351 Ok(Self {
352 barrier_scheduler,
353 core,
354 paused: Mutex::new(()),
355 metrics,
356 })
357 }
358
359 pub async fn validate_source_once(
360 &self,
361 source_id: u32,
362 new_source_props: WithOptionsSecResolved,
363 ) -> MetaResult<()> {
364 let props = ConnectorProperties::extract(new_source_props, false).unwrap();
365
366 {
367 let mut enumerator = props
368 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
369 metrics: self.metrics.source_enumerator_metrics.clone(),
370 info: SourceEnumeratorInfo { source_id },
371 }))
372 .await
373 .context("failed to create SplitEnumerator")?;
374
375 let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
376 .await
377 .context("failed to list splits")??;
378 }
379 Ok(())
380 }
381
382 #[await_tree::instrument]
384 pub async fn handle_replace_job(
385 &self,
386 dropped_job_fragments: &StreamJobFragments,
387 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
388 replace_plan: &ReplaceStreamJobPlan,
389 ) {
390 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
392
393 self.apply_source_change(SourceChange::ReplaceJob {
394 dropped_source_fragments,
395 added_source_fragments,
396 fragment_replacements: replace_plan.fragment_replacements(),
397 })
398 .await;
399 }
400
401 #[await_tree::instrument("apply_source_change({source_change})")]
404 pub async fn apply_source_change(&self, source_change: SourceChange) {
405 let mut core = self.core.lock().await;
406 core.apply_source_change(source_change);
407 }
408
409 #[await_tree::instrument("register_source({})", source.name)]
411 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
412 tracing::debug!("register_source: {}", source.get_id());
413 let mut core = self.core.lock().await;
414 let source_id = source.get_id() as _;
415 if core.managed_sources.contains_key(&source_id) {
416 tracing::warn!("source {} already registered", source_id);
417 return Ok(());
418 }
419
420 let handle = create_source_worker(source, self.metrics.clone())
421 .await
422 .context("failed to create source worker")?;
423 core.managed_sources.insert(source_id, handle);
424 core.update_source_splits(source_id).await?;
425 Ok(())
426 }
427
428 pub async fn register_source_with_handle(
430 &self,
431 source_id: SourceId,
432 handle: ConnectorSourceWorkerHandle,
433 ) -> MetaResult<()> {
434 let mut core = self.core.lock().await;
435 if core.managed_sources.contains_key(&source_id) {
436 tracing::warn!("source {} already registered", source_id);
437 return Ok(());
438 }
439 core.managed_sources.insert(source_id, handle);
440 core.update_source_splits(source_id).await?;
441
442 Ok(())
443 }
444
445 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
446 let core = self.core.lock().await;
447
448 SourceManagerRunningInfo {
449 source_fragments: core.source_fragments.clone(),
450 backfill_fragments: core.backfill_fragments.clone(),
451 }
452 }
453
454 async fn tick(&self) -> MetaResult<()> {
463 let split_states = {
464 let core_guard = self.core.lock().await;
465 core_guard.reassign_splits().await?
466 };
467
468 for (database_id, split_state) in split_states {
469 if !split_state.split_assignment.is_empty() {
470 let command = Command::SourceChangeSplit(split_state);
471 tracing::info!(command = ?command, "pushing down split assignment command");
472 self.barrier_scheduler
473 .run_command(database_id, command)
474 .await?;
475 }
476 }
477
478 Ok(())
479 }
480
481 pub async fn run(&self) -> MetaResult<()> {
482 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
483 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
484 loop {
485 ticker.tick().await;
486 let _pause_guard = self.paused.lock().await;
487 if let Err(e) = self.tick().await {
488 tracing::error!(
489 error = %e.as_report(),
490 "error happened while running source manager tick",
491 );
492 }
493 }
494 }
495
496 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
498 tracing::debug!("pausing tick lock in source manager");
499 self.paused.lock().await
500 }
501}
502
503#[derive(strum::Display)]
504pub enum SourceChange {
505 CreateJob {
508 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
509 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
511 },
512 UpdateSourceProps {
513 source_id_map_new_props: HashMap<u32, HashMap<String, String>>,
516 },
517 CreateJobFinished {
521 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
523 },
524 DropSource { dropped_source_ids: Vec<SourceId> },
526 DropMv {
527 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
529 },
530 ReplaceJob {
531 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
532 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
533 fragment_replacements: HashMap<FragmentId, FragmentId>,
534 },
535}
536
537pub fn build_actor_connector_splits(
538 splits: &HashMap<ActorId, Vec<SplitImpl>>,
539) -> HashMap<u32, ConnectorSplits> {
540 splits
541 .iter()
542 .map(|(&actor_id, splits)| {
543 (
544 actor_id,
545 ConnectorSplits {
546 splits: splits.iter().map(ConnectorSplit::from).collect(),
547 },
548 )
549 })
550 .collect()
551}
552
553pub fn build_actor_split_impls(
554 actor_splits: &HashMap<u32, ConnectorSplits>,
555) -> HashMap<ActorId, Vec<SplitImpl>> {
556 actor_splits
557 .iter()
558 .map(|(actor_id, ConnectorSplits { splits })| {
559 (
560 *actor_id,
561 splits
562 .iter()
563 .map(|split| SplitImpl::try_from(split).unwrap())
564 .collect(),
565 )
566 })
567 .collect()
568}