1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::id::ObjectId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32 ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
33 SplitMetaData, fill_adaptive_split,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::{SplitDiffOptions, SplitState, reassign_splits};
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::MetaResult;
49use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan, SharedActorInfos};
50use crate::manager::{MetaSrvEnv, MetadataManager};
51use crate::model::{ActorId, FragmentId, StreamJobFragments};
52use crate::rpc::metrics::MetaMetrics;
53
54pub type SourceManagerRef = Arc<SourceManager>;
55pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
56pub type DiscoveredSourceSplits = HashMap<SourceId, Vec<SplitImpl>>;
57pub type ConnectorPropsChange = HashMap<ObjectId, HashMap<String, String>>;
59
60const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
61
62pub struct SourceManager {
65 pub paused: Mutex<()>,
66 barrier_scheduler: BarrierScheduler,
67 core: Mutex<SourceManagerCore>,
68 pub metrics: Arc<MetaMetrics>,
69}
70pub struct SourceManagerCore {
71 metadata_manager: MetadataManager,
72
73 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
75 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
77 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
79
80 env: MetaSrvEnv,
81}
82
83pub struct SourceManagerRunningInfo {
84 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
85 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
86}
87
88impl SourceManagerCore {
89 fn new(
90 metadata_manager: MetadataManager,
91 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
92 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
93 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
94 env: MetaSrvEnv,
95 ) -> Self {
96 Self {
97 metadata_manager,
98 managed_sources,
99 source_fragments,
100 backfill_fragments,
101 env,
102 }
103 }
104
105 pub fn apply_source_change(&mut self, source_change: SourceChange) {
107 let mut added_source_fragments = Default::default();
108 let mut added_backfill_fragments = Default::default();
109 let mut finished_backfill_fragments = Default::default();
110 let mut fragment_replacements = Default::default();
111 let mut dropped_source_fragments = Default::default();
112 let mut dropped_source_ids = Default::default();
113 let mut recreate_source_id_map_new_props: Vec<(SourceId, HashMap<String, String>)> =
114 Default::default();
115
116 match source_change {
117 SourceChange::CreateJob {
118 added_source_fragments: added_source_fragments_,
119 added_backfill_fragments: added_backfill_fragments_,
120 } => {
121 added_source_fragments = added_source_fragments_;
122 added_backfill_fragments = added_backfill_fragments_;
123 }
124 SourceChange::CreateJobFinished {
125 finished_backfill_fragments: finished_backfill_fragments_,
126 } => {
127 finished_backfill_fragments = finished_backfill_fragments_;
128 }
129
130 SourceChange::DropMv {
131 dropped_source_fragments: dropped_source_fragments_,
132 } => {
133 dropped_source_fragments = dropped_source_fragments_;
134 }
135 SourceChange::ReplaceJob {
136 dropped_source_fragments: dropped_source_fragments_,
137 added_source_fragments: added_source_fragments_,
138 fragment_replacements: fragment_replacements_,
139 } => {
140 dropped_source_fragments = dropped_source_fragments_;
141 added_source_fragments = added_source_fragments_;
142 fragment_replacements = fragment_replacements_;
143 }
144 SourceChange::DropSource {
145 dropped_source_ids: dropped_source_ids_,
146 } => {
147 dropped_source_ids = dropped_source_ids_;
148 }
149
150 SourceChange::UpdateSourceProps {
151 source_id_map_new_props,
152 } => {
153 for (source_id, new_props) in source_id_map_new_props {
154 recreate_source_id_map_new_props.push((source_id, new_props));
155 }
156 }
157 }
158
159 for source_id in dropped_source_ids {
160 let dropped_fragments = self.source_fragments.remove(&source_id);
161
162 if let Some(handle) = self.managed_sources.remove(&source_id) {
163 handle.terminate(dropped_fragments);
164 }
165 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
166 }
173 }
174
175 for (source_id, fragments) in added_source_fragments {
176 self.source_fragments
177 .entry(source_id)
178 .or_default()
179 .extend(fragments);
180 }
181
182 for (source_id, fragments) in added_backfill_fragments {
183 self.backfill_fragments
184 .entry(source_id)
185 .or_default()
186 .extend(fragments);
187 }
188
189 for (source_id, fragments) in finished_backfill_fragments {
190 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
191 panic!(
192 "source {} not found when adding backfill fragments {:?}",
193 source_id, fragments
194 );
195 });
196 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
197 }
198
199 for (source_id, fragment_ids) in dropped_source_fragments {
200 self.drop_source_fragments(Some(source_id), fragment_ids);
201 }
202
203 for (old_fragment_id, new_fragment_id) in fragment_replacements {
204 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
206
207 for fragment_ids in self.backfill_fragments.values_mut() {
208 let mut new_backfill_fragment_ids = fragment_ids.clone();
209 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
210 assert_ne!(
211 fragment_id, upstream_fragment_id,
212 "backfill fragment should not be replaced"
213 );
214 if *upstream_fragment_id == old_fragment_id {
215 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
216 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
217 }
218 }
219 *fragment_ids = new_backfill_fragment_ids;
220 }
221 }
222
223 for (source_id, new_props) in recreate_source_id_map_new_props {
224 if let Some(handle) = self.managed_sources.get_mut(&(source_id as _)) {
225 let props_wrapper =
228 WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
229 let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); handle.update_props(props);
231 tracing::info!("update source {source_id} properties in source manager");
232 } else {
233 tracing::info!("job id {source_id} is not registered in source manager");
234 }
235 }
236 }
237
238 fn drop_source_fragments(
239 &mut self,
240 source_id: Option<SourceId>,
241 dropped_fragment_ids: BTreeSet<FragmentId>,
242 ) {
243 if let Some(source_id) = source_id {
244 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
245 let mut dropped_ids = vec![];
246 let managed_fragment_ids = entry.get_mut();
247 for fragment_id in &dropped_fragment_ids {
248 managed_fragment_ids.remove(fragment_id);
249 dropped_ids.push(*fragment_id);
250 }
251 if let Some(handle) = self.managed_sources.get(&source_id) {
252 handle.drop_fragments(dropped_ids);
253 } else {
254 panic_if_debug!(
255 "source {source_id} not found when dropping fragment {dropped_ids:?}",
256 );
257 }
258 if managed_fragment_ids.is_empty() {
259 entry.remove();
260 }
261 }
262 } else {
263 for (source_id, fragment_ids) in &mut self.source_fragments {
264 let mut dropped_ids = vec![];
265 for fragment_id in &dropped_fragment_ids {
266 if fragment_ids.remove(fragment_id) {
267 dropped_ids.push(*fragment_id);
268 }
269 }
270 if !dropped_ids.is_empty() {
271 if let Some(handle) = self.managed_sources.get(source_id) {
272 handle.drop_fragments(dropped_ids);
273 } else {
274 panic_if_debug!(
275 "source {source_id} not found when dropping fragment {dropped_ids:?}",
276 );
277 }
278 }
279 }
280 }
281 }
282}
283
284impl SourceManager {
285 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
286
287 pub async fn new(
288 barrier_scheduler: BarrierScheduler,
289 metadata_manager: MetadataManager,
290 metrics: Arc<MetaMetrics>,
291 env: MetaSrvEnv,
292 ) -> MetaResult<Self> {
293 let mut managed_sources = HashMap::new();
294 {
295 let sources = metadata_manager.list_sources().await?;
296 for source in sources {
297 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
298 }
299 }
300
301 let source_fragments = metadata_manager
302 .catalog_controller
303 .load_source_fragment_ids()
304 .await?
305 .into_iter()
306 .map(|(source_id, fragment_ids)| {
307 (
308 source_id as SourceId,
309 fragment_ids.into_iter().map(|id| id as _).collect(),
310 )
311 })
312 .collect();
313 let backfill_fragments = metadata_manager
314 .catalog_controller
315 .load_backfill_fragment_ids()
316 .await?;
317
318 let core = Mutex::new(SourceManagerCore::new(
319 metadata_manager,
320 managed_sources,
321 source_fragments,
322 backfill_fragments,
323 env,
324 ));
325
326 Ok(Self {
327 barrier_scheduler,
328 core,
329 paused: Mutex::new(()),
330 metrics,
331 })
332 }
333
334 pub async fn validate_source_once(
335 &self,
336 source_id: SourceId,
337 new_source_props: WithOptionsSecResolved,
338 ) -> MetaResult<()> {
339 let props = ConnectorProperties::extract(new_source_props, false).unwrap();
340
341 {
342 let mut enumerator = props
343 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
344 metrics: self.metrics.source_enumerator_metrics.clone(),
345 info: SourceEnumeratorInfo { source_id },
346 }))
347 .await
348 .context("failed to create SplitEnumerator")?;
349
350 let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
351 .await
352 .context("failed to list splits")??;
353 }
354 Ok(())
355 }
356
357 #[await_tree::instrument]
359 pub async fn handle_replace_job(
360 &self,
361 dropped_job_fragments: &StreamJobFragments,
362 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
363 replace_plan: &ReplaceStreamJobPlan,
364 ) {
365 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments();
367
368 self.apply_source_change(SourceChange::ReplaceJob {
369 dropped_source_fragments,
370 added_source_fragments,
371 fragment_replacements: replace_plan.fragment_replacements(),
372 })
373 .await;
374 }
375
376 #[await_tree::instrument("apply_source_change({source_change})")]
379 pub async fn apply_source_change(&self, source_change: SourceChange) {
380 let need_force_tick = matches!(source_change, SourceChange::UpdateSourceProps { .. });
381 let updated_source_ids = if let SourceChange::UpdateSourceProps {
382 ref source_id_map_new_props,
383 } = source_change
384 {
385 source_id_map_new_props.keys().cloned().collect::<Vec<_>>()
386 } else {
387 Vec::new()
388 };
389
390 {
391 let mut core = self.core.lock().await;
392 core.apply_source_change(source_change);
393 }
394
395 if need_force_tick {
397 self.force_tick_updated_sources(updated_source_ids).await;
398 }
399 }
400
401 #[await_tree::instrument("register_source({})", source.name)]
403 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
404 tracing::debug!("register_source: {}", source.get_id());
405 let mut core = self.core.lock().await;
406 let source_id = source.get_id();
407 if core.managed_sources.contains_key(&source_id) {
408 tracing::warn!("source {} already registered", source_id);
409 return Ok(());
410 }
411
412 let handle = create_source_worker(source, self.metrics.clone())
413 .await
414 .context("failed to create source worker")?;
415
416 core.managed_sources.insert(source_id, handle);
417
418 Ok(())
419 }
420
421 pub async fn register_source_with_handle(
423 &self,
424 source_id: SourceId,
425 handle: ConnectorSourceWorkerHandle,
426 ) {
427 let mut core = self.core.lock().await;
428 if core.managed_sources.contains_key(&source_id) {
429 tracing::warn!("source {} already registered", source_id);
430 return;
431 }
432
433 core.managed_sources.insert(source_id, handle);
434 }
435
436 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
437 let core = self.core.lock().await;
438
439 SourceManagerRunningInfo {
440 source_fragments: core.source_fragments.clone(),
441 backfill_fragments: core.backfill_fragments.clone(),
442 }
443 }
444
445 async fn tick(&self) -> MetaResult<()> {
454 let split_states = {
455 let core_guard = self.core.lock().await;
456 core_guard.reassign_splits().await?
457 };
458
459 for (database_id, split_state) in split_states {
460 if !split_state.split_assignment.is_empty() {
461 let command = Command::SourceChangeSplit(split_state);
462 tracing::info!(command = ?command, "pushing down split assignment command");
463 self.barrier_scheduler
464 .run_command(database_id, command)
465 .await?;
466 }
467 }
468
469 Ok(())
470 }
471
472 pub async fn run(&self) -> MetaResult<()> {
473 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
474 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
475 loop {
476 ticker.tick().await;
477 let _pause_guard = self.paused.lock().await;
478 if let Err(e) = self.tick().await {
479 tracing::error!(
480 error = %e.as_report(),
481 "error happened while running source manager tick",
482 );
483 }
484 }
485 }
486
487 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
489 tracing::debug!("pausing tick lock in source manager");
490 self.paused.lock().await
491 }
492
493 async fn force_tick_updated_sources(&self, updated_source_ids: Vec<SourceId>) {
495 let core = self.core.lock().await;
496 for source_id in updated_source_ids {
497 if let Some(handle) = core.managed_sources.get(&source_id) {
498 tracing::info!("forcing tick for updated source {}", source_id.as_raw_id());
499 if let Err(e) = handle.force_tick().await {
500 tracing::warn!(
501 error = %e.as_report(),
502 "failed to force tick for source {} after properties update",
503 source_id.as_raw_id()
504 );
505 }
506 } else {
507 tracing::warn!(
508 "source {} not found when trying to force tick after update",
509 source_id.as_raw_id()
510 );
511 }
512 }
513 }
514}
515
516#[derive(strum::Display, Debug)]
517pub enum SourceChange {
518 CreateJob {
521 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
522 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
524 },
525 UpdateSourceProps {
526 source_id_map_new_props: HashMap<SourceId, HashMap<String, String>>,
529 },
530 CreateJobFinished {
534 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
536 },
537 DropSource { dropped_source_ids: Vec<SourceId> },
539 DropMv {
540 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
542 },
543 ReplaceJob {
544 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
545 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
546 fragment_replacements: HashMap<FragmentId, FragmentId>,
547 },
548}
549
550pub fn build_actor_connector_splits(
551 splits: &HashMap<ActorId, Vec<SplitImpl>>,
552) -> HashMap<ActorId, ConnectorSplits> {
553 splits
554 .iter()
555 .map(|(&actor_id, splits)| {
556 (
557 actor_id,
558 ConnectorSplits {
559 splits: splits.iter().map(ConnectorSplit::from).collect(),
560 },
561 )
562 })
563 .collect()
564}
565
566pub fn build_actor_split_impls(
567 actor_splits: &HashMap<ActorId, ConnectorSplits>,
568) -> HashMap<ActorId, Vec<SplitImpl>> {
569 actor_splits
570 .iter()
571 .map(|(actor_id, ConnectorSplits { splits })| {
572 (
573 *actor_id,
574 splits
575 .iter()
576 .map(|split| SplitImpl::try_from(split).unwrap())
577 .collect(),
578 )
579 })
580 .collect()
581}