1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::panic_if_debug;
28use risingwave_connector::WithOptionsSecResolved;
29use risingwave_connector::error::ConnectorResult;
30use risingwave_connector::source::{
31 ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
32 SplitMetaData, fill_adaptive_split,
33};
34use risingwave_meta_model::SourceId;
35use risingwave_pb::catalog::Source;
36use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
37pub use split_assignment::{SplitDiffOptions, SplitState, reassign_splits};
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
40use tokio::sync::{Mutex, MutexGuard, oneshot};
41use tokio::task::JoinHandle;
42use tokio::time::MissedTickBehavior;
43use tokio::{select, time};
44pub use worker::create_source_worker;
45use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
46
47use crate::MetaResult;
48use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan, SharedActorInfos};
49use crate::manager::{MetaSrvEnv, MetadataManager};
50use crate::model::{ActorId, FragmentId, StreamJobFragments};
51use crate::rpc::metrics::MetaMetrics;
52
53pub type SourceManagerRef = Arc<SourceManager>;
54pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
55pub type DiscoveredSourceSplits = HashMap<SourceId, Vec<SplitImpl>>;
56pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
57pub type ConnectorPropsChange = HashMap<u32, HashMap<String, String>>;
59
60const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
61
62pub struct SourceManager {
65 pub paused: Mutex<()>,
66 barrier_scheduler: BarrierScheduler,
67 core: Mutex<SourceManagerCore>,
68 pub metrics: Arc<MetaMetrics>,
69}
70pub struct SourceManagerCore {
71 metadata_manager: MetadataManager,
72
73 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
75 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
77 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
79
80 env: MetaSrvEnv,
81}
82
83pub struct SourceManagerRunningInfo {
84 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
85 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
86}
87
88impl SourceManagerCore {
89 fn new(
90 metadata_manager: MetadataManager,
91 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
92 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
93 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
94 env: MetaSrvEnv,
95 ) -> Self {
96 Self {
97 metadata_manager,
98 managed_sources,
99 source_fragments,
100 backfill_fragments,
101 env,
102 }
103 }
104
105 pub fn apply_source_change(&mut self, source_change: SourceChange) {
107 let mut added_source_fragments = Default::default();
108 let mut added_backfill_fragments = Default::default();
109 let mut finished_backfill_fragments = Default::default();
110 let mut fragment_replacements = Default::default();
111 let mut dropped_source_fragments = Default::default();
112 let mut dropped_source_ids = Default::default();
113 let mut recreate_source_id_map_new_props: Vec<(u32, HashMap<String, String>)> =
114 Default::default();
115
116 match source_change {
117 SourceChange::CreateJob {
118 added_source_fragments: added_source_fragments_,
119 added_backfill_fragments: added_backfill_fragments_,
120 } => {
121 added_source_fragments = added_source_fragments_;
122 added_backfill_fragments = added_backfill_fragments_;
123 }
124 SourceChange::CreateJobFinished {
125 finished_backfill_fragments: finished_backfill_fragments_,
126 } => {
127 finished_backfill_fragments = finished_backfill_fragments_;
128 }
129
130 SourceChange::DropMv {
131 dropped_source_fragments: dropped_source_fragments_,
132 } => {
133 dropped_source_fragments = dropped_source_fragments_;
134 }
135 SourceChange::ReplaceJob {
136 dropped_source_fragments: dropped_source_fragments_,
137 added_source_fragments: added_source_fragments_,
138 fragment_replacements: fragment_replacements_,
139 } => {
140 dropped_source_fragments = dropped_source_fragments_;
141 added_source_fragments = added_source_fragments_;
142 fragment_replacements = fragment_replacements_;
143 }
144 SourceChange::DropSource {
145 dropped_source_ids: dropped_source_ids_,
146 } => {
147 dropped_source_ids = dropped_source_ids_;
148 }
149
150 SourceChange::UpdateSourceProps {
151 source_id_map_new_props,
152 } => {
153 for (source_id, new_props) in source_id_map_new_props {
154 recreate_source_id_map_new_props.push((source_id, new_props));
155 }
156 }
157 }
158
159 for source_id in dropped_source_ids {
160 let dropped_fragments = self.source_fragments.remove(&source_id);
161
162 if let Some(handle) = self.managed_sources.remove(&source_id) {
163 handle.terminate(dropped_fragments);
164 }
165 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
166 }
173 }
174
175 for (source_id, fragments) in added_source_fragments {
176 self.source_fragments
177 .entry(source_id)
178 .or_default()
179 .extend(fragments);
180 }
181
182 for (source_id, fragments) in added_backfill_fragments {
183 self.backfill_fragments
184 .entry(source_id)
185 .or_default()
186 .extend(fragments);
187 }
188
189 for (source_id, fragments) in finished_backfill_fragments {
190 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
191 panic!(
192 "source {} not found when adding backfill fragments {:?}",
193 source_id, fragments
194 );
195 });
196 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
197 }
198
199 for (source_id, fragment_ids) in dropped_source_fragments {
200 self.drop_source_fragments(Some(source_id), fragment_ids);
201 }
202
203 for (old_fragment_id, new_fragment_id) in fragment_replacements {
204 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
206
207 for fragment_ids in self.backfill_fragments.values_mut() {
208 let mut new_backfill_fragment_ids = fragment_ids.clone();
209 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
210 assert_ne!(
211 fragment_id, upstream_fragment_id,
212 "backfill fragment should not be replaced"
213 );
214 if *upstream_fragment_id == old_fragment_id {
215 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
216 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
217 }
218 }
219 *fragment_ids = new_backfill_fragment_ids;
220 }
221 }
222
223 for (source_id, new_props) in recreate_source_id_map_new_props {
224 tracing::info!("recreate source {source_id} in source manager");
225 if let Some(handle) = self.managed_sources.get_mut(&(source_id as _)) {
226 let props_wrapper =
229 WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
230 let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); handle.update_props(props);
232 }
233 }
234 }
235
236 fn drop_source_fragments(
237 &mut self,
238 source_id: Option<SourceId>,
239 dropped_fragment_ids: BTreeSet<FragmentId>,
240 ) {
241 if let Some(source_id) = source_id {
242 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
243 let mut dropped_ids = vec![];
244 let managed_fragment_ids = entry.get_mut();
245 for fragment_id in &dropped_fragment_ids {
246 managed_fragment_ids.remove(fragment_id);
247 dropped_ids.push(*fragment_id);
248 }
249 if let Some(handle) = self.managed_sources.get(&source_id) {
250 handle.drop_fragments(dropped_ids);
251 } else {
252 panic_if_debug!(
253 "source {source_id} not found when dropping fragment {dropped_ids:?}",
254 );
255 }
256 if managed_fragment_ids.is_empty() {
257 entry.remove();
258 }
259 }
260 } else {
261 for (source_id, fragment_ids) in &mut self.source_fragments {
262 let mut dropped_ids = vec![];
263 for fragment_id in &dropped_fragment_ids {
264 if fragment_ids.remove(fragment_id) {
265 dropped_ids.push(*fragment_id);
266 }
267 }
268 if !dropped_ids.is_empty() {
269 if let Some(handle) = self.managed_sources.get(source_id) {
270 handle.drop_fragments(dropped_ids);
271 } else {
272 panic_if_debug!(
273 "source {source_id} not found when dropping fragment {dropped_ids:?}",
274 );
275 }
276 }
277 }
278 }
279 }
280}
281
282impl SourceManager {
283 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
284
285 pub async fn new(
286 barrier_scheduler: BarrierScheduler,
287 metadata_manager: MetadataManager,
288 metrics: Arc<MetaMetrics>,
289 env: MetaSrvEnv,
290 ) -> MetaResult<Self> {
291 let mut managed_sources = HashMap::new();
292 {
293 let sources = metadata_manager.list_sources().await?;
294 for source in sources {
295 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
296 }
297 }
298
299 let source_fragments = metadata_manager
300 .catalog_controller
301 .load_source_fragment_ids()
302 .await?
303 .into_iter()
304 .map(|(source_id, fragment_ids)| {
305 (
306 source_id as SourceId,
307 fragment_ids.into_iter().map(|id| id as _).collect(),
308 )
309 })
310 .collect();
311 let backfill_fragments = metadata_manager
312 .catalog_controller
313 .load_backfill_fragment_ids()
314 .await?
315 .into_iter()
316 .map(|(source_id, fragment_ids)| {
317 (
318 source_id as SourceId,
319 fragment_ids
320 .into_iter()
321 .map(|(id, up_id)| (id as _, up_id as _))
322 .collect(),
323 )
324 })
325 .collect();
326
327 let core = Mutex::new(SourceManagerCore::new(
328 metadata_manager,
329 managed_sources,
330 source_fragments,
331 backfill_fragments,
332 env,
333 ));
334
335 Ok(Self {
336 barrier_scheduler,
337 core,
338 paused: Mutex::new(()),
339 metrics,
340 })
341 }
342
343 pub async fn validate_source_once(
344 &self,
345 source_id: u32,
346 new_source_props: WithOptionsSecResolved,
347 ) -> MetaResult<()> {
348 let props = ConnectorProperties::extract(new_source_props, false).unwrap();
349
350 {
351 let mut enumerator = props
352 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
353 metrics: self.metrics.source_enumerator_metrics.clone(),
354 info: SourceEnumeratorInfo { source_id },
355 }))
356 .await
357 .context("failed to create SplitEnumerator")?;
358
359 let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
360 .await
361 .context("failed to list splits")??;
362 }
363 Ok(())
364 }
365
366 #[await_tree::instrument]
368 pub async fn handle_replace_job(
369 &self,
370 dropped_job_fragments: &StreamJobFragments,
371 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
372 replace_plan: &ReplaceStreamJobPlan,
373 ) {
374 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments();
376
377 self.apply_source_change(SourceChange::ReplaceJob {
378 dropped_source_fragments,
379 added_source_fragments,
380 fragment_replacements: replace_plan.fragment_replacements(),
381 })
382 .await;
383 }
384
385 #[await_tree::instrument("apply_source_change({source_change})")]
388 pub async fn apply_source_change(&self, source_change: SourceChange) {
389 let mut core = self.core.lock().await;
390 core.apply_source_change(source_change);
391 }
392
393 #[await_tree::instrument("register_source({})", source.name)]
395 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
396 tracing::debug!("register_source: {}", source.get_id());
397 let mut core = self.core.lock().await;
398 let source_id = source.get_id() as _;
399 if core.managed_sources.contains_key(&source_id) {
400 tracing::warn!("source {} already registered", source_id);
401 return Ok(());
402 }
403
404 let handle = create_source_worker(source, self.metrics.clone())
405 .await
406 .context("failed to create source worker")?;
407
408 core.managed_sources.insert(source_id, handle);
409
410 Ok(())
411 }
412
413 pub async fn register_source_with_handle(
415 &self,
416 source_id: SourceId,
417 handle: ConnectorSourceWorkerHandle,
418 ) -> MetaResult<()> {
419 let mut core = self.core.lock().await;
420 if core.managed_sources.contains_key(&source_id) {
421 tracing::warn!("source {} already registered", source_id);
422 return Ok(());
423 }
424
425 core.managed_sources.insert(source_id, handle);
426
427 Ok(())
428 }
429
430 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
431 let core = self.core.lock().await;
432
433 SourceManagerRunningInfo {
434 source_fragments: core.source_fragments.clone(),
435 backfill_fragments: core.backfill_fragments.clone(),
436 }
437 }
438
439 async fn tick(&self) -> MetaResult<()> {
448 let split_states = {
449 let core_guard = self.core.lock().await;
450 core_guard.reassign_splits().await?
451 };
452
453 for (database_id, split_state) in split_states {
454 if !split_state.split_assignment.is_empty() {
455 let command = Command::SourceChangeSplit(split_state);
456 tracing::info!(command = ?command, "pushing down split assignment command");
457 self.barrier_scheduler
458 .run_command(database_id, command)
459 .await?;
460 }
461 }
462
463 Ok(())
464 }
465
466 pub async fn run(&self) -> MetaResult<()> {
467 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
468 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
469 loop {
470 ticker.tick().await;
471 let _pause_guard = self.paused.lock().await;
472 if let Err(e) = self.tick().await {
473 tracing::error!(
474 error = %e.as_report(),
475 "error happened while running source manager tick",
476 );
477 }
478 }
479 }
480
481 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
483 tracing::debug!("pausing tick lock in source manager");
484 self.paused.lock().await
485 }
486}
487
488#[derive(strum::Display, Debug)]
489pub enum SourceChange {
490 CreateJob {
493 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
494 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
496 },
497 UpdateSourceProps {
498 source_id_map_new_props: HashMap<u32, HashMap<String, String>>,
501 },
502 CreateJobFinished {
506 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
508 },
509 DropSource { dropped_source_ids: Vec<SourceId> },
511 DropMv {
512 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
514 },
515 ReplaceJob {
516 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
517 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
518 fragment_replacements: HashMap<FragmentId, FragmentId>,
519 },
520}
521
522pub fn build_actor_connector_splits(
523 splits: &HashMap<ActorId, Vec<SplitImpl>>,
524) -> HashMap<u32, ConnectorSplits> {
525 splits
526 .iter()
527 .map(|(&actor_id, splits)| {
528 (
529 actor_id,
530 ConnectorSplits {
531 splits: splits.iter().map(ConnectorSplit::from).collect(),
532 },
533 )
534 })
535 .collect()
536}
537
538pub fn build_actor_split_impls(
539 actor_splits: &HashMap<u32, ConnectorSplits>,
540) -> HashMap<ActorId, Vec<SplitImpl>> {
541 actor_splits
542 .iter()
543 .map(|(actor_id, ConnectorSplits { splits })| {
544 (
545 *actor_id,
546 splits
547 .iter()
548 .map(|split| SplitImpl::try_from(split).unwrap())
549 .collect(),
550 )
551 })
552 .collect()
553}