1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::panic_if_debug;
28use risingwave_connector::WithOptionsSecResolved;
29use risingwave_connector::error::ConnectorResult;
30use risingwave_connector::source::{
31 ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
32 SplitMetaData, fill_adaptive_split,
33};
34use risingwave_meta_model::SourceId;
35use risingwave_pb::catalog::Source;
36use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{Mutex, MutexGuard, oneshot};
40use tokio::task::JoinHandle;
41use tokio::time::MissedTickBehavior;
42use tokio::{select, time};
43pub use worker::create_source_worker;
44use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
45
46use crate::MetaResult;
47use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
48use crate::manager::MetadataManager;
49use crate::model::{ActorId, FragmentId, StreamJobFragments};
50use crate::rpc::metrics::MetaMetrics;
51
52pub type SourceManagerRef = Arc<SourceManager>;
53pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
54pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
55
56pub struct SourceManager {
59 pub paused: Mutex<()>,
60 barrier_scheduler: BarrierScheduler,
61 core: Mutex<SourceManagerCore>,
62 pub metrics: Arc<MetaMetrics>,
63}
64pub struct SourceManagerCore {
65 metadata_manager: MetadataManager,
66
67 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
69 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
71 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
73
74 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
77}
78
79pub struct SourceManagerRunningInfo {
80 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
81 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
82 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
83}
84
85impl SourceManagerCore {
86 fn new(
87 metadata_manager: MetadataManager,
88 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
89 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
90 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
91 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
92 ) -> Self {
93 Self {
94 metadata_manager,
95 managed_sources,
96 source_fragments,
97 backfill_fragments,
98 actor_splits,
99 }
100 }
101
102 pub fn apply_source_change(&mut self, source_change: SourceChange) {
104 let mut added_source_fragments = Default::default();
105 let mut added_backfill_fragments = Default::default();
106 let mut finished_backfill_fragments = Default::default();
107 let mut split_assignment = Default::default();
108 let mut dropped_actors = Default::default();
109 let mut fragment_replacements = Default::default();
110 let mut dropped_source_fragments = Default::default();
111 let mut dropped_source_ids = Default::default();
112
113 match source_change {
114 SourceChange::CreateJob {
115 added_source_fragments: added_source_fragments_,
116 added_backfill_fragments: added_backfill_fragments_,
117 split_assignment: split_assignment_,
118 } => {
119 added_source_fragments = added_source_fragments_;
120 added_backfill_fragments = added_backfill_fragments_;
121 split_assignment = split_assignment_;
122 }
123 SourceChange::CreateJobFinished {
124 finished_backfill_fragments: finished_backfill_fragments_,
125 } => {
126 finished_backfill_fragments = finished_backfill_fragments_;
127 }
128 SourceChange::SplitChange(split_assignment_) => {
129 split_assignment = split_assignment_;
130 }
131 SourceChange::DropMv {
132 dropped_source_fragments: dropped_source_fragments_,
133 dropped_actors: dropped_actors_,
134 } => {
135 dropped_source_fragments = dropped_source_fragments_;
136 dropped_actors = dropped_actors_;
137 }
138 SourceChange::ReplaceJob {
139 dropped_source_fragments: dropped_source_fragments_,
140 dropped_actors: dropped_actors_,
141 added_source_fragments: added_source_fragments_,
142 split_assignment: split_assignment_,
143 fragment_replacements: fragment_replacements_,
144 } => {
145 dropped_source_fragments = dropped_source_fragments_;
146 dropped_actors = dropped_actors_;
147 added_source_fragments = added_source_fragments_;
148 split_assignment = split_assignment_;
149 fragment_replacements = fragment_replacements_;
150 }
151 SourceChange::DropSource {
152 dropped_source_ids: dropped_source_ids_,
153 } => {
154 dropped_source_ids = dropped_source_ids_;
155 }
156 SourceChange::Reschedule {
157 split_assignment: split_assignment_,
158 dropped_actors: dropped_actors_,
159 } => {
160 split_assignment = split_assignment_;
161 dropped_actors = dropped_actors_;
162 }
163 }
164
165 for source_id in dropped_source_ids {
166 let dropped_fragments = self.source_fragments.remove(&source_id);
167
168 if let Some(handle) = self.managed_sources.remove(&source_id) {
169 handle.terminate(dropped_fragments);
170 }
171 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
172 }
179 }
180
181 for (source_id, fragments) in added_source_fragments {
182 self.source_fragments
183 .entry(source_id)
184 .or_default()
185 .extend(fragments);
186 }
187
188 for (source_id, fragments) in added_backfill_fragments {
189 self.backfill_fragments
190 .entry(source_id)
191 .or_default()
192 .extend(fragments);
193 }
194
195 for (source_id, fragments) in finished_backfill_fragments {
196 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
197 panic!(
198 "source {} not found when adding backfill fragments {:?}",
199 source_id, fragments
200 );
201 });
202 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
203 }
204
205 for (_, actor_splits) in split_assignment {
206 for (actor_id, splits) in actor_splits {
207 self.actor_splits.insert(actor_id, splits);
209 }
210 }
211
212 for actor_id in dropped_actors {
213 self.actor_splits.remove(&actor_id);
214 }
215
216 for (source_id, fragment_ids) in dropped_source_fragments {
217 self.drop_source_fragments(Some(source_id), fragment_ids);
218 }
219
220 for (old_fragment_id, new_fragment_id) in fragment_replacements {
221 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
223
224 for fragment_ids in self.backfill_fragments.values_mut() {
225 let mut new_backfill_fragment_ids = fragment_ids.clone();
226 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
227 assert_ne!(
228 fragment_id, upstream_fragment_id,
229 "backfill fragment should not be replaced"
230 );
231 if *upstream_fragment_id == old_fragment_id {
232 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
233 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
234 }
235 }
236 *fragment_ids = new_backfill_fragment_ids;
237 }
238 }
239 }
240
241 fn drop_source_fragments(
242 &mut self,
243 source_id: Option<SourceId>,
244 dropped_fragment_ids: BTreeSet<FragmentId>,
245 ) {
246 if let Some(source_id) = source_id {
247 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
248 let mut dropped_ids = vec![];
249 let managed_fragment_ids = entry.get_mut();
250 for fragment_id in &dropped_fragment_ids {
251 managed_fragment_ids.remove(fragment_id);
252 dropped_ids.push(*fragment_id);
253 }
254 if let Some(handle) = self.managed_sources.get(&source_id) {
255 handle.drop_fragments(dropped_ids);
256 } else {
257 panic_if_debug!(
258 "source {source_id} not found when dropping fragment {dropped_ids:?}",
259 );
260 }
261 if managed_fragment_ids.is_empty() {
262 entry.remove();
263 }
264 }
265 } else {
266 for (source_id, fragment_ids) in &mut self.source_fragments {
267 let mut dropped_ids = vec![];
268 for fragment_id in &dropped_fragment_ids {
269 if fragment_ids.remove(fragment_id) {
270 dropped_ids.push(*fragment_id);
271 }
272 }
273 if !dropped_ids.is_empty() {
274 if let Some(handle) = self.managed_sources.get(source_id) {
275 handle.drop_fragments(dropped_ids);
276 } else {
277 panic_if_debug!(
278 "source {source_id} not found when dropping fragment {dropped_ids:?}",
279 );
280 }
281 }
282 }
283 }
284 }
285}
286
287impl SourceManager {
288 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
289
290 pub async fn new(
291 barrier_scheduler: BarrierScheduler,
292 metadata_manager: MetadataManager,
293 metrics: Arc<MetaMetrics>,
294 ) -> MetaResult<Self> {
295 let mut managed_sources = HashMap::new();
296 {
297 let sources = metadata_manager.list_sources().await?;
298 for source in sources {
299 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
300 }
301 }
302
303 let source_fragments = metadata_manager
304 .catalog_controller
305 .load_source_fragment_ids()
306 .await?
307 .into_iter()
308 .map(|(source_id, fragment_ids)| {
309 (
310 source_id as SourceId,
311 fragment_ids.into_iter().map(|id| id as _).collect(),
312 )
313 })
314 .collect();
315 let backfill_fragments = metadata_manager
316 .catalog_controller
317 .load_backfill_fragment_ids()
318 .await?
319 .into_iter()
320 .map(|(source_id, fragment_ids)| {
321 (
322 source_id as SourceId,
323 fragment_ids
324 .into_iter()
325 .map(|(id, up_id)| (id as _, up_id as _))
326 .collect(),
327 )
328 })
329 .collect();
330 let actor_splits = metadata_manager
331 .catalog_controller
332 .load_actor_splits()
333 .await?
334 .into_iter()
335 .map(|(actor_id, splits)| {
336 (
337 actor_id as ActorId,
338 splits
339 .to_protobuf()
340 .splits
341 .iter()
342 .map(|split| SplitImpl::try_from(split).unwrap())
343 .collect(),
344 )
345 })
346 .collect();
347
348 let core = Mutex::new(SourceManagerCore::new(
349 metadata_manager,
350 managed_sources,
351 source_fragments,
352 backfill_fragments,
353 actor_splits,
354 ));
355
356 Ok(Self {
357 barrier_scheduler,
358 core,
359 paused: Mutex::new(()),
360 metrics,
361 })
362 }
363
364 pub async fn handle_replace_job(
366 &self,
367 dropped_job_fragments: &StreamJobFragments,
368 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
369 split_assignment: SplitAssignment,
370 replace_plan: &ReplaceStreamJobPlan,
371 ) {
372 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
374
375 let fragments = &dropped_job_fragments.fragments;
376
377 let dropped_actors = dropped_source_fragments
378 .values()
379 .flatten()
380 .flat_map(|fragment_id| fragments.get(fragment_id).unwrap().actors.iter())
381 .map(|actor| actor.actor_id)
382 .collect::<HashSet<_>>();
383
384 self.apply_source_change(SourceChange::ReplaceJob {
385 dropped_source_fragments,
386 dropped_actors,
387 added_source_fragments,
388 split_assignment,
389 fragment_replacements: replace_plan.fragment_replacements(),
390 })
391 .await;
392 }
393
394 pub async fn apply_source_change(&self, source_change: SourceChange) {
397 let mut core = self.core.lock().await;
398 core.apply_source_change(source_change);
399 }
400
401 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
403 tracing::debug!("register_source: {}", source.get_id());
404 let mut core = self.core.lock().await;
405 if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) {
406 let handle = create_source_worker(source, self.metrics.clone())
407 .await
408 .context("failed to create source worker")?;
409 e.insert(handle);
410 } else {
411 tracing::warn!("source {} already registered", source.get_id());
412 }
413 Ok(())
414 }
415
416 pub async fn register_source_with_handle(
418 &self,
419 source_id: SourceId,
420 handle: ConnectorSourceWorkerHandle,
421 ) {
422 let mut core = self.core.lock().await;
423 if let Entry::Vacant(e) = core.managed_sources.entry(source_id) {
424 e.insert(handle);
425 } else {
426 tracing::warn!("source {} already registered", source_id);
427 }
428 }
429
430 pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
431 let core = self.core.lock().await;
432 core.actor_splits.clone()
433 }
434
435 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
436 let core = self.core.lock().await;
437 SourceManagerRunningInfo {
438 source_fragments: core.source_fragments.clone(),
439 backfill_fragments: core.backfill_fragments.clone(),
440 actor_splits: core.actor_splits.clone(),
441 }
442 }
443
444 async fn tick(&self) -> MetaResult<()> {
453 let split_assignment = {
454 let core_guard = self.core.lock().await;
455 core_guard.reassign_splits().await?
456 };
457
458 for (database_id, split_assignment) in split_assignment {
459 if !split_assignment.is_empty() {
460 let command = Command::SourceChangeSplit(split_assignment);
461 tracing::info!(command = ?command, "pushing down split assignment command");
462 self.barrier_scheduler
463 .run_command(database_id, command)
464 .await?;
465 }
466 }
467
468 Ok(())
469 }
470
471 pub async fn run(&self) -> MetaResult<()> {
472 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
473 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
474 loop {
475 ticker.tick().await;
476 let _pause_guard = self.paused.lock().await;
477 if let Err(e) = self.tick().await {
478 tracing::error!(
479 error = %e.as_report(),
480 "error happened while running source manager tick",
481 );
482 }
483 }
484 }
485
486 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
488 tracing::debug!("pausing tick lock in source manager");
489 self.paused.lock().await
490 }
491}
492
493pub enum SourceChange {
494 CreateJob {
497 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
498 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
500 split_assignment: SplitAssignment,
501 },
502 CreateJobFinished {
506 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
508 },
509 SplitChange(SplitAssignment),
510 DropSource {
512 dropped_source_ids: Vec<SourceId>,
513 },
514 DropMv {
515 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
517 dropped_actors: HashSet<ActorId>,
518 },
519 ReplaceJob {
520 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
521 dropped_actors: HashSet<ActorId>,
522
523 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
524 split_assignment: SplitAssignment,
525 fragment_replacements: HashMap<FragmentId, FragmentId>,
526 },
527 Reschedule {
528 split_assignment: SplitAssignment,
529 dropped_actors: HashSet<ActorId>,
530 },
531}
532
533pub fn build_actor_connector_splits(
534 splits: &HashMap<ActorId, Vec<SplitImpl>>,
535) -> HashMap<u32, ConnectorSplits> {
536 splits
537 .iter()
538 .map(|(&actor_id, splits)| {
539 (
540 actor_id,
541 ConnectorSplits {
542 splits: splits.iter().map(ConnectorSplit::from).collect(),
543 },
544 )
545 })
546 .collect()
547}
548
549pub fn build_actor_split_impls(
550 actor_splits: &HashMap<u32, ConnectorSplits>,
551) -> HashMap<ActorId, Vec<SplitImpl>> {
552 actor_splits
553 .iter()
554 .map(|(actor_id, ConnectorSplits { splits })| {
555 (
556 *actor_id,
557 splits
558 .iter()
559 .map(|split| SplitImpl::try_from(split).unwrap())
560 .collect(),
561 )
562 })
563 .collect()
564}