1mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::panic_if_debug;
28use risingwave_connector::WithOptionsSecResolved;
29use risingwave_connector::error::ConnectorResult;
30use risingwave_connector::source::{
31 ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
32 SplitMetaData, fill_adaptive_split,
33};
34use risingwave_meta_model::SourceId;
35use risingwave_pb::catalog::Source;
36use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{Mutex, MutexGuard, oneshot};
40use tokio::task::JoinHandle;
41use tokio::time::MissedTickBehavior;
42use tokio::{select, time};
43pub use worker::create_source_worker;
44use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
45
46use crate::MetaResult;
47use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
48use crate::manager::MetadataManager;
49use crate::model::{ActorId, FragmentId, StreamJobFragments};
50use crate::rpc::metrics::MetaMetrics;
51
52pub type SourceManagerRef = Arc<SourceManager>;
53pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
54pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;
55pub type ConnectorPropsChange = HashMap<u32, HashMap<String, String>>;
57
58pub struct SourceManager {
61 pub paused: Mutex<()>,
62 barrier_scheduler: BarrierScheduler,
63 core: Mutex<SourceManagerCore>,
64 pub metrics: Arc<MetaMetrics>,
65}
66pub struct SourceManagerCore {
67 metadata_manager: MetadataManager,
68
69 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
71 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
73 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
75
76 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
79}
80
81pub struct SourceManagerRunningInfo {
82 pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
83 pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
84 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
85}
86
87impl SourceManagerCore {
88 fn new(
89 metadata_manager: MetadataManager,
90 managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
91 source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
92 backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
93 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
94 ) -> Self {
95 Self {
96 metadata_manager,
97 managed_sources,
98 source_fragments,
99 backfill_fragments,
100 actor_splits,
101 }
102 }
103
104 pub fn apply_source_change(&mut self, source_change: SourceChange) {
106 let mut added_source_fragments = Default::default();
107 let mut added_backfill_fragments = Default::default();
108 let mut finished_backfill_fragments = Default::default();
109 let mut split_assignment = Default::default();
110 let mut dropped_actors = Default::default();
111 let mut fragment_replacements = Default::default();
112 let mut dropped_source_fragments = Default::default();
113 let mut dropped_source_ids = Default::default();
114
115 match source_change {
116 SourceChange::CreateJob {
117 added_source_fragments: added_source_fragments_,
118 added_backfill_fragments: added_backfill_fragments_,
119 split_assignment: split_assignment_,
120 } => {
121 added_source_fragments = added_source_fragments_;
122 added_backfill_fragments = added_backfill_fragments_;
123 split_assignment = split_assignment_;
124 }
125 SourceChange::CreateJobFinished {
126 finished_backfill_fragments: finished_backfill_fragments_,
127 } => {
128 finished_backfill_fragments = finished_backfill_fragments_;
129 }
130 SourceChange::SplitChange(split_assignment_) => {
131 split_assignment = split_assignment_;
132 }
133 SourceChange::DropMv {
134 dropped_source_fragments: dropped_source_fragments_,
135 dropped_actors: dropped_actors_,
136 } => {
137 dropped_source_fragments = dropped_source_fragments_;
138 dropped_actors = dropped_actors_;
139 }
140 SourceChange::ReplaceJob {
141 dropped_source_fragments: dropped_source_fragments_,
142 dropped_actors: dropped_actors_,
143 added_source_fragments: added_source_fragments_,
144 split_assignment: split_assignment_,
145 fragment_replacements: fragment_replacements_,
146 } => {
147 dropped_source_fragments = dropped_source_fragments_;
148 dropped_actors = dropped_actors_;
149 added_source_fragments = added_source_fragments_;
150 split_assignment = split_assignment_;
151 fragment_replacements = fragment_replacements_;
152 }
153 SourceChange::DropSource {
154 dropped_source_ids: dropped_source_ids_,
155 } => {
156 dropped_source_ids = dropped_source_ids_;
157 }
158 SourceChange::Reschedule {
159 split_assignment: split_assignment_,
160 dropped_actors: dropped_actors_,
161 } => {
162 split_assignment = split_assignment_;
163 dropped_actors = dropped_actors_;
164 }
165 }
166
167 for source_id in dropped_source_ids {
168 let dropped_fragments = self.source_fragments.remove(&source_id);
169
170 if let Some(handle) = self.managed_sources.remove(&source_id) {
171 handle.terminate(dropped_fragments);
172 }
173 if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
174 }
181 }
182
183 for (source_id, fragments) in added_source_fragments {
184 self.source_fragments
185 .entry(source_id)
186 .or_default()
187 .extend(fragments);
188 }
189
190 for (source_id, fragments) in added_backfill_fragments {
191 self.backfill_fragments
192 .entry(source_id)
193 .or_default()
194 .extend(fragments);
195 }
196
197 for (source_id, fragments) in finished_backfill_fragments {
198 let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
199 panic!(
200 "source {} not found when adding backfill fragments {:?}",
201 source_id, fragments
202 );
203 });
204 handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
205 }
206
207 for (_, actor_splits) in split_assignment {
208 for (actor_id, splits) in actor_splits {
209 self.actor_splits.insert(actor_id, splits);
211 }
212 }
213
214 for actor_id in dropped_actors {
215 self.actor_splits.remove(&actor_id);
216 }
217
218 for (source_id, fragment_ids) in dropped_source_fragments {
219 self.drop_source_fragments(Some(source_id), fragment_ids);
220 }
221
222 for (old_fragment_id, new_fragment_id) in fragment_replacements {
223 self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
225
226 for fragment_ids in self.backfill_fragments.values_mut() {
227 let mut new_backfill_fragment_ids = fragment_ids.clone();
228 for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
229 assert_ne!(
230 fragment_id, upstream_fragment_id,
231 "backfill fragment should not be replaced"
232 );
233 if *upstream_fragment_id == old_fragment_id {
234 new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
235 new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
236 }
237 }
238 *fragment_ids = new_backfill_fragment_ids;
239 }
240 }
241 }
242
243 fn drop_source_fragments(
244 &mut self,
245 source_id: Option<SourceId>,
246 dropped_fragment_ids: BTreeSet<FragmentId>,
247 ) {
248 if let Some(source_id) = source_id {
249 if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
250 let mut dropped_ids = vec![];
251 let managed_fragment_ids = entry.get_mut();
252 for fragment_id in &dropped_fragment_ids {
253 managed_fragment_ids.remove(fragment_id);
254 dropped_ids.push(*fragment_id);
255 }
256 if let Some(handle) = self.managed_sources.get(&source_id) {
257 handle.drop_fragments(dropped_ids);
258 } else {
259 panic_if_debug!(
260 "source {source_id} not found when dropping fragment {dropped_ids:?}",
261 );
262 }
263 if managed_fragment_ids.is_empty() {
264 entry.remove();
265 }
266 }
267 } else {
268 for (source_id, fragment_ids) in &mut self.source_fragments {
269 let mut dropped_ids = vec![];
270 for fragment_id in &dropped_fragment_ids {
271 if fragment_ids.remove(fragment_id) {
272 dropped_ids.push(*fragment_id);
273 }
274 }
275 if !dropped_ids.is_empty() {
276 if let Some(handle) = self.managed_sources.get(source_id) {
277 handle.drop_fragments(dropped_ids);
278 } else {
279 panic_if_debug!(
280 "source {source_id} not found when dropping fragment {dropped_ids:?}",
281 );
282 }
283 }
284 }
285 }
286 }
287}
288
289impl SourceManager {
290 const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
291
292 pub async fn new(
293 barrier_scheduler: BarrierScheduler,
294 metadata_manager: MetadataManager,
295 metrics: Arc<MetaMetrics>,
296 ) -> MetaResult<Self> {
297 let mut managed_sources = HashMap::new();
298 {
299 let sources = metadata_manager.list_sources().await?;
300 for source in sources {
301 create_source_worker_async(source, &mut managed_sources, metrics.clone())?
302 }
303 }
304
305 let source_fragments = metadata_manager
306 .catalog_controller
307 .load_source_fragment_ids()
308 .await?
309 .into_iter()
310 .map(|(source_id, fragment_ids)| {
311 (
312 source_id as SourceId,
313 fragment_ids.into_iter().map(|id| id as _).collect(),
314 )
315 })
316 .collect();
317 let backfill_fragments = metadata_manager
318 .catalog_controller
319 .load_backfill_fragment_ids()
320 .await?
321 .into_iter()
322 .map(|(source_id, fragment_ids)| {
323 (
324 source_id as SourceId,
325 fragment_ids
326 .into_iter()
327 .map(|(id, up_id)| (id as _, up_id as _))
328 .collect(),
329 )
330 })
331 .collect();
332 let actor_splits = metadata_manager
333 .catalog_controller
334 .load_actor_splits()
335 .await?
336 .into_iter()
337 .map(|(actor_id, splits)| {
338 (
339 actor_id as ActorId,
340 splits
341 .to_protobuf()
342 .splits
343 .iter()
344 .map(|split| SplitImpl::try_from(split).unwrap())
345 .collect(),
346 )
347 })
348 .collect();
349
350 let core = Mutex::new(SourceManagerCore::new(
351 metadata_manager,
352 managed_sources,
353 source_fragments,
354 backfill_fragments,
355 actor_splits,
356 ));
357
358 Ok(Self {
359 barrier_scheduler,
360 core,
361 paused: Mutex::new(()),
362 metrics,
363 })
364 }
365
366 pub async fn handle_replace_job(
368 &self,
369 dropped_job_fragments: &StreamJobFragments,
370 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
371 split_assignment: SplitAssignment,
372 replace_plan: &ReplaceStreamJobPlan,
373 ) {
374 let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
376
377 let fragments = &dropped_job_fragments.fragments;
378
379 let dropped_actors = dropped_source_fragments
380 .values()
381 .flatten()
382 .flat_map(|fragment_id| fragments.get(fragment_id).unwrap().actors.iter())
383 .map(|actor| actor.actor_id)
384 .collect::<HashSet<_>>();
385
386 self.apply_source_change(SourceChange::ReplaceJob {
387 dropped_source_fragments,
388 dropped_actors,
389 added_source_fragments,
390 split_assignment,
391 fragment_replacements: replace_plan.fragment_replacements(),
392 })
393 .await;
394 }
395
396 pub async fn apply_source_change(&self, source_change: SourceChange) {
399 let mut core = self.core.lock().await;
400 core.apply_source_change(source_change);
401 }
402
403 pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
405 tracing::debug!("register_source: {}", source.get_id());
406 let mut core = self.core.lock().await;
407 if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) {
408 let handle = create_source_worker(source, self.metrics.clone())
409 .await
410 .context("failed to create source worker")?;
411 e.insert(handle);
412 } else {
413 tracing::warn!("source {} already registered", source.get_id());
414 }
415 Ok(())
416 }
417
418 pub async fn register_source_with_handle(
420 &self,
421 source_id: SourceId,
422 handle: ConnectorSourceWorkerHandle,
423 ) {
424 let mut core = self.core.lock().await;
425 if let Entry::Vacant(e) = core.managed_sources.entry(source_id) {
426 e.insert(handle);
427 } else {
428 tracing::warn!("source {} already registered", source_id);
429 }
430 }
431
432 pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
433 let core = self.core.lock().await;
434 core.actor_splits.clone()
435 }
436
437 pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
438 let core = self.core.lock().await;
439 SourceManagerRunningInfo {
440 source_fragments: core.source_fragments.clone(),
441 backfill_fragments: core.backfill_fragments.clone(),
442 actor_splits: core.actor_splits.clone(),
443 }
444 }
445
446 async fn tick(&self) -> MetaResult<()> {
455 let split_assignment = {
456 let core_guard = self.core.lock().await;
457 core_guard.reassign_splits().await?
458 };
459
460 for (database_id, split_assignment) in split_assignment {
461 if !split_assignment.is_empty() {
462 let command = Command::SourceChangeSplit(split_assignment);
463 tracing::info!(command = ?command, "pushing down split assignment command");
464 self.barrier_scheduler
465 .run_command(database_id, command)
466 .await?;
467 }
468 }
469
470 Ok(())
471 }
472
473 pub async fn run(&self) -> MetaResult<()> {
474 let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
475 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
476 loop {
477 ticker.tick().await;
478 let _pause_guard = self.paused.lock().await;
479 if let Err(e) = self.tick().await {
480 tracing::error!(
481 error = %e.as_report(),
482 "error happened while running source manager tick",
483 );
484 }
485 }
486 }
487
488 pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
490 tracing::debug!("pausing tick lock in source manager");
491 self.paused.lock().await
492 }
493}
494
495pub enum SourceChange {
496 CreateJob {
499 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
500 added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
502 split_assignment: SplitAssignment,
503 },
504 CreateJobFinished {
508 finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
510 },
511 SplitChange(SplitAssignment),
512 DropSource {
514 dropped_source_ids: Vec<SourceId>,
515 },
516 DropMv {
517 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
519 dropped_actors: HashSet<ActorId>,
520 },
521 ReplaceJob {
522 dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
523 dropped_actors: HashSet<ActorId>,
524
525 added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
526 split_assignment: SplitAssignment,
527 fragment_replacements: HashMap<FragmentId, FragmentId>,
528 },
529 Reschedule {
530 split_assignment: SplitAssignment,
531 dropped_actors: HashSet<ActorId>,
532 },
533}
534
535pub fn build_actor_connector_splits(
536 splits: &HashMap<ActorId, Vec<SplitImpl>>,
537) -> HashMap<u32, ConnectorSplits> {
538 splits
539 .iter()
540 .map(|(&actor_id, splits)| {
541 (
542 actor_id,
543 ConnectorSplits {
544 splits: splits.iter().map(ConnectorSplit::from).collect(),
545 },
546 )
547 })
548 .collect()
549}
550
551pub fn build_actor_split_impls(
552 actor_splits: &HashMap<u32, ConnectorSplits>,
553) -> HashMap<ActorId, Vec<SplitImpl>> {
554 actor_splits
555 .iter()
556 .map(|(actor_id, ConnectorSplits { splits })| {
557 (
558 *actor_id,
559 splits
560 .iter()
561 .map(|split| SplitImpl::try_from(split).unwrap())
562 .collect(),
563 )
564 })
565 .collect()
566}