1use anyhow::anyhow;
16use itertools::Itertools;
17
18use super::*;
19use crate::model::{FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments};
20
21impl SourceManager {
22 pub async fn migrate_splits_for_source_actors(
28 &self,
29 fragment_id: FragmentId,
30 prev_actor_ids: &[ActorId],
31 curr_actor_ids: &[ActorId],
32 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
33 let core = self.core.lock().await;
34
35 let prev_splits = prev_actor_ids
36 .iter()
37 .flat_map(|actor_id| core.actor_splits.get(actor_id).unwrap())
38 .map(|split| (split.id(), split.clone()))
39 .collect();
40
41 let empty_actor_splits = curr_actor_ids
42 .iter()
43 .map(|actor_id| (*actor_id, vec![]))
44 .collect();
45
46 let diff = reassign_splits(
47 fragment_id,
48 empty_actor_splits,
49 &prev_splits,
50 SplitDiffOptions::default(),
52 )
53 .unwrap_or_default();
54
55 Ok(diff)
56 }
57
58 pub fn migrate_splits_for_backfill_actors(
60 &self,
61 fragment_id: FragmentId,
62 upstream_source_fragment_id: FragmentId,
63 curr_actor_ids: &[ActorId],
64 fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
65 no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
66 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
67 let actors = no_shuffle_upstream_actor_map
69 .iter()
70 .filter(|(id, _)| curr_actor_ids.contains(id))
71 .map(|(id, upstream_fragment_actors)| {
72 (
73 *id,
74 *upstream_fragment_actors
75 .get(&upstream_source_fragment_id)
76 .unwrap(),
77 )
78 });
79 let upstream_assignment = fragment_actor_splits
80 .get(&upstream_source_fragment_id)
81 .unwrap();
82 tracing::info!(
83 fragment_id,
84 upstream_source_fragment_id,
85 ?upstream_assignment,
86 "migrate_splits_for_backfill_actors"
87 );
88 Ok(align_splits(
89 actors,
90 upstream_assignment,
91 fragment_id,
92 upstream_source_fragment_id,
93 )?)
94 }
95
96 pub async fn allocate_splits(
98 &self,
99 table_fragments: &StreamJobFragments,
100 ) -> MetaResult<SplitAssignment> {
101 let core = self.core.lock().await;
102
103 let source_fragments = table_fragments.stream_source_fragments();
104
105 let mut assigned = HashMap::new();
106
107 'loop_source: for (source_id, fragments) in source_fragments {
108 let handle = core
109 .managed_sources
110 .get(&source_id)
111 .with_context(|| format!("could not find source {}", source_id))?;
112
113 if handle.splits.lock().await.splits.is_none() {
114 handle.force_tick().await?;
115 }
116
117 for fragment_id in fragments {
118 let empty_actor_splits: HashMap<u32, Vec<SplitImpl>> = table_fragments
119 .fragments
120 .get(&fragment_id)
121 .unwrap()
122 .actors
123 .iter()
124 .map(|actor| (actor.actor_id, vec![]))
125 .collect();
126 let actor_hashset: HashSet<u32> = empty_actor_splits.keys().cloned().collect();
127 let splits = handle.discovered_splits(source_id, &actor_hashset).await?;
128 if splits.is_empty() {
129 tracing::warn!("no splits detected for source {}", source_id);
130 continue 'loop_source;
131 }
132
133 if let Some(diff) = reassign_splits(
134 fragment_id,
135 empty_actor_splits,
136 &splits,
137 SplitDiffOptions::default(),
138 ) {
139 assigned.insert(fragment_id, diff);
140 }
141 }
142 }
143
144 Ok(assigned)
145 }
146
147 pub async fn allocate_splits_for_replace_source(
149 &self,
150 table_fragments: &StreamJobFragments,
151 upstream_updates: &FragmentReplaceUpstream,
152 new_no_shuffle: &FragmentNewNoShuffle,
158 ) -> MetaResult<SplitAssignment> {
159 tracing::debug!(?upstream_updates, "allocate_splits_for_replace_source");
160 if upstream_updates.is_empty() {
161 return self.allocate_splits(table_fragments).await;
163 }
164
165 let core = self.core.lock().await;
166
167 let source_fragments = table_fragments.stream_source_fragments();
168 assert_eq!(
169 source_fragments.len(),
170 1,
171 "replace source job should only have one source"
172 );
173 let (_source_id, fragments) = source_fragments.into_iter().next().unwrap();
174 assert_eq!(
175 fragments.len(),
176 1,
177 "replace source job should only have one fragment"
178 );
179 let fragment_id = fragments.into_iter().next().unwrap();
180
181 debug_assert!(
182 upstream_updates.values().flatten().next().is_some()
183 && upstream_updates
184 .values()
185 .flatten()
186 .all(|(_, new_upstream_fragment_id)| {
187 *new_upstream_fragment_id == fragment_id
188 })
189 && upstream_updates
190 .values()
191 .flatten()
192 .map(|(upstream_fragment_id, _)| upstream_fragment_id)
193 .all_equal(),
194 "upstream update should only replace one fragment: {:?}",
195 upstream_updates
196 );
197 let prev_fragment_id = upstream_updates
198 .values()
199 .flatten()
200 .next()
201 .map(|(upstream_fragment_id, _)| *upstream_fragment_id)
202 .expect("non-empty");
203 let aligned_actors: HashMap<ActorId, ActorId> = new_no_shuffle
216 .get(&fragment_id)
217 .map(HashMap::values)
218 .into_iter()
219 .flatten()
220 .flatten()
221 .map(|(upstream_actor_id, actor_id)| (*upstream_actor_id, *actor_id))
222 .collect();
223 let assignment = align_splits(
224 aligned_actors.into_iter(),
225 &core.actor_splits,
226 fragment_id,
227 prev_fragment_id,
228 )?;
229 Ok(HashMap::from([(fragment_id, assignment)]))
230 }
231
232 pub async fn allocate_splits_for_backfill(
237 &self,
238 table_fragments: &StreamJobFragments,
239 upstream_new_no_shuffle: &FragmentNewNoShuffle,
240 upstream_actors: &HashMap<FragmentId, HashSet<ActorId>>,
241 ) -> MetaResult<SplitAssignment> {
242 let core = self.core.lock().await;
243
244 let source_backfill_fragments = table_fragments.source_backfill_fragments()?;
245
246 let mut assigned = HashMap::new();
247
248 for (_source_id, fragments) in source_backfill_fragments {
249 for (fragment_id, upstream_source_fragment_id) in fragments {
250 let upstream_actors = upstream_actors
251 .get(&upstream_source_fragment_id)
252 .ok_or_else(|| {
253 anyhow!(
254 "no upstream actors found from fragment {} to upstream source fragment {}",
255 fragment_id,
256 upstream_source_fragment_id
257 )
258 })?;
259 let mut backfill_actors = vec![];
260 let Some(source_new_no_shuffle) = upstream_new_no_shuffle
261 .get(&upstream_source_fragment_id)
262 .and_then(|source_upstream_new_no_shuffle| {
263 source_upstream_new_no_shuffle.get(&fragment_id)
264 })
265 else {
266 return Err(anyhow::anyhow!(
267 "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_new_no_shuffle: {upstream_new_no_shuffle:?}",
268 fragment_id = fragment_id,
269 upstream_source_fragment_id = upstream_source_fragment_id,
270 upstream_new_no_shuffle = upstream_new_no_shuffle,
271 ).into());
272 };
273 for upstream_actor in upstream_actors {
274 let Some(no_shuffle_backfill_actor) = source_new_no_shuffle.get(upstream_actor)
275 else {
276 return Err(anyhow::anyhow!(
277 "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, source_new_no_shuffle: {source_new_no_shuffle:?}",
278 fragment_id = fragment_id,
279 upstream_source_fragment_id = upstream_source_fragment_id,
280 upstream_actor = upstream_actor,
281 source_new_no_shuffle = source_new_no_shuffle
282 ).into());
283 };
284 backfill_actors.push((*no_shuffle_backfill_actor, *upstream_actor));
285 }
286 assigned.insert(
287 fragment_id,
288 align_splits(
289 backfill_actors,
290 &core.actor_splits,
291 fragment_id,
292 upstream_source_fragment_id,
293 )?,
294 );
295 }
296 }
297
298 Ok(assigned)
299 }
300}
301
302impl SourceManagerCore {
303 pub async fn reassign_splits(&self) -> MetaResult<HashMap<DatabaseId, SplitAssignment>> {
309 let mut split_assignment: SplitAssignment = HashMap::new();
310
311 'loop_source: for (source_id, handle) in &self.managed_sources {
312 let source_fragment_ids = match self.source_fragments.get(source_id) {
313 Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids,
314 _ => {
315 continue;
316 }
317 };
318 let backfill_fragment_ids = self.backfill_fragments.get(source_id);
319
320 'loop_fragment: for &fragment_id in source_fragment_ids {
321 let actors = match self
322 .metadata_manager
323 .get_running_actors_of_fragment(fragment_id)
324 .await
325 {
326 Ok(actors) => {
327 if actors.is_empty() {
328 tracing::warn!("No actors found for fragment {}", fragment_id);
329 continue 'loop_fragment;
330 }
331 actors
332 }
333 Err(err) => {
334 tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
335 continue 'loop_fragment;
336 }
337 };
338
339 let discovered_splits = handle.discovered_splits(*source_id, &actors).await?;
340 if discovered_splits.is_empty() {
341 continue 'loop_source;
343 }
344
345 let prev_actor_splits: HashMap<_, _> = actors
346 .into_iter()
347 .map(|actor_id| {
348 (
349 actor_id,
350 self.actor_splits
351 .get(&actor_id)
352 .cloned()
353 .unwrap_or_default(),
354 )
355 })
356 .collect();
357
358 if let Some(new_assignment) = reassign_splits(
359 fragment_id,
360 prev_actor_splits,
361 &discovered_splits,
362 SplitDiffOptions {
363 enable_scale_in: handle.enable_drop_split,
364 enable_adaptive: handle.enable_adaptive_splits,
365 },
366 ) {
367 split_assignment.insert(fragment_id, new_assignment);
368 }
369 }
370
371 if let Some(backfill_fragment_ids) = backfill_fragment_ids {
372 for (fragment_id, upstream_fragment_id) in backfill_fragment_ids {
374 let Some(upstream_assignment) = split_assignment.get(upstream_fragment_id)
375 else {
376 continue;
378 };
379 let actors = match self
380 .metadata_manager
381 .get_running_actors_for_source_backfill(*fragment_id, *upstream_fragment_id)
382 .await
383 {
384 Ok(actors) => {
385 if actors.is_empty() {
386 tracing::warn!("No actors found for fragment {}", fragment_id);
387 continue;
388 }
389 actors
390 }
391 Err(err) => {
392 tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
393 continue;
394 }
395 };
396 split_assignment.insert(
397 *fragment_id,
398 align_splits(
399 actors,
400 upstream_assignment,
401 *fragment_id,
402 *upstream_fragment_id,
403 )?,
404 );
405 }
406 }
407 }
408
409 self.metadata_manager
410 .split_fragment_map_by_database(split_assignment)
411 .await
412 }
413}
414
415fn reassign_splits<T>(
444 fragment_id: FragmentId,
445 actor_splits: HashMap<ActorId, Vec<T>>,
446 discovered_splits: &BTreeMap<SplitId, T>,
447 opts: SplitDiffOptions,
448) -> Option<HashMap<ActorId, Vec<T>>>
449where
450 T: SplitMetaData + Clone,
451{
452 if actor_splits.is_empty() {
454 return None;
455 }
456
457 let prev_split_ids: HashSet<_> = actor_splits
458 .values()
459 .flat_map(|splits| splits.iter().map(SplitMetaData::id))
460 .collect();
461
462 tracing::trace!(fragment_id, prev_split_ids = ?prev_split_ids, "previous splits");
463 tracing::trace!(fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits");
464
465 let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
466
467 let dropped_splits: HashSet<_> = prev_split_ids
468 .difference(&discovered_split_ids)
469 .cloned()
470 .collect();
471
472 if !dropped_splits.is_empty() {
473 if opts.enable_scale_in {
474 tracing::info!(fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits");
475 } else {
476 tracing::warn!(fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed");
477 }
478 }
479
480 let new_discovered_splits: BTreeSet<_> = discovered_split_ids
481 .into_iter()
482 .filter(|split_id| !prev_split_ids.contains(split_id))
483 .collect();
484
485 if opts.enable_scale_in || opts.enable_adaptive {
486 if dropped_splits.is_empty()
490 && new_discovered_splits.is_empty()
491 && !discovered_splits.is_empty()
492 {
493 return None;
494 }
495 } else {
496 if new_discovered_splits.is_empty() && !discovered_splits.is_empty() {
498 return None;
499 }
500 }
501
502 tracing::info!(fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits");
503
504 let mut heap = BinaryHeap::with_capacity(actor_splits.len());
505
506 for (actor_id, mut splits) in actor_splits {
507 if opts.enable_scale_in || opts.enable_adaptive {
508 splits.retain(|split| !dropped_splits.contains(&split.id()));
509 }
510
511 heap.push(ActorSplitsAssignment { actor_id, splits })
512 }
513
514 for split_id in new_discovered_splits {
515 let mut peek_ref = heap.peek_mut().unwrap();
524 peek_ref
525 .splits
526 .push(discovered_splits.get(&split_id).cloned().unwrap());
527 }
528
529 Some(
530 heap.into_iter()
531 .map(|ActorSplitsAssignment { actor_id, splits }| (actor_id, splits))
532 .collect(),
533 )
534}
535
536fn align_splits(
546 aligned_actors: impl IntoIterator<Item = (ActorId, ActorId)>,
548 existing_assignment: &HashMap<ActorId, Vec<SplitImpl>>,
549 fragment_id: FragmentId,
550 upstream_source_fragment_id: FragmentId,
551) -> anyhow::Result<HashMap<ActorId, Vec<SplitImpl>>> {
552 aligned_actors
553 .into_iter()
554 .map(|(actor_id, upstream_actor_id)| {
555 let Some(splits) = existing_assignment.get(&upstream_actor_id) else {
556 return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_assignment: {existing_assignment:?}, upstream_actor_id: {upstream_actor_id:?}"));
557 };
558 Ok((
559 actor_id,
560 splits.clone(),
561 ))
562 })
563 .collect()
564}
565
566#[derive(Debug)]
568struct ActorSplitsAssignment<T: SplitMetaData> {
569 actor_id: ActorId,
570 splits: Vec<T>,
571}
572
573impl<T: SplitMetaData + Clone> Eq for ActorSplitsAssignment<T> {}
574
575impl<T: SplitMetaData + Clone> PartialEq<Self> for ActorSplitsAssignment<T> {
576 fn eq(&self, other: &Self) -> bool {
577 self.splits.len() == other.splits.len()
578 }
579}
580
581impl<T: SplitMetaData + Clone> PartialOrd<Self> for ActorSplitsAssignment<T> {
582 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
583 Some(self.cmp(other))
584 }
585}
586
587impl<T: SplitMetaData + Clone> Ord for ActorSplitsAssignment<T> {
588 fn cmp(&self, other: &Self) -> Ordering {
589 other.splits.len().cmp(&self.splits.len())
591 }
592}
593
594#[derive(Debug)]
595pub struct SplitDiffOptions {
596 pub enable_scale_in: bool,
597
598 pub enable_adaptive: bool,
600}
601
602#[allow(clippy::derivable_impls)]
603impl Default for SplitDiffOptions {
604 fn default() -> Self {
605 SplitDiffOptions {
606 enable_scale_in: false,
607 enable_adaptive: false,
608 }
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use std::collections::{BTreeMap, HashMap, HashSet};
615
616 use risingwave_common::types::JsonbVal;
617 use risingwave_connector::error::ConnectorResult;
618 use risingwave_connector::source::{SplitId, SplitMetaData};
619 use serde::{Deserialize, Serialize};
620
621 use super::*;
622 use crate::model::{ActorId, FragmentId};
623
624 #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
625 struct TestSplit {
626 id: u32,
627 }
628
629 impl SplitMetaData for TestSplit {
630 fn id(&self) -> SplitId {
631 format!("{}", self.id).into()
632 }
633
634 fn encode_to_json(&self) -> JsonbVal {
635 serde_json::to_value(*self).unwrap().into()
636 }
637
638 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
639 serde_json::from_value(value.take()).map_err(Into::into)
640 }
641
642 fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> {
643 Ok(())
644 }
645 }
646
647 fn check_all_splits(
648 discovered_splits: &BTreeMap<SplitId, TestSplit>,
649 diff: &HashMap<ActorId, Vec<TestSplit>>,
650 ) {
651 let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
652
653 for splits in diff.values() {
654 for split in splits {
655 assert!(split_ids.remove(&split.id()))
656 }
657 }
658
659 assert!(split_ids.is_empty());
660 }
661
662 #[test]
663 fn test_drop_splits() {
664 let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
665 actor_splits.insert(0, vec![TestSplit { id: 0 }, TestSplit { id: 1 }]);
666 actor_splits.insert(1, vec![TestSplit { id: 2 }, TestSplit { id: 3 }]);
667 actor_splits.insert(2, vec![TestSplit { id: 4 }, TestSplit { id: 5 }]);
668
669 let mut prev_split_to_actor = HashMap::new();
670 for (actor_id, splits) in &actor_splits {
671 for split in splits {
672 prev_split_to_actor.insert(split.id(), *actor_id);
673 }
674 }
675
676 let discovered_splits: BTreeMap<SplitId, TestSplit> = (1..5)
677 .map(|i| {
678 let split = TestSplit { id: i };
679 (split.id(), split)
680 })
681 .collect();
682
683 let opts = SplitDiffOptions {
684 enable_scale_in: true,
685 enable_adaptive: false,
686 };
687
688 let prev_split_ids: HashSet<_> = actor_splits
689 .values()
690 .flat_map(|splits| splits.iter().map(|split| split.id()))
691 .collect();
692
693 let diff = reassign_splits(
694 FragmentId::default(),
695 actor_splits,
696 &discovered_splits,
697 opts,
698 )
699 .unwrap();
700 check_all_splits(&discovered_splits, &diff);
701
702 let mut after_split_to_actor = HashMap::new();
703 for (actor_id, splits) in &diff {
704 for split in splits {
705 after_split_to_actor.insert(split.id(), *actor_id);
706 }
707 }
708
709 let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
710
711 let retained_split_ids: HashSet<_> =
712 prev_split_ids.intersection(&discovered_split_ids).collect();
713
714 for retained_split_id in retained_split_ids {
715 assert_eq!(
716 prev_split_to_actor.get(retained_split_id),
717 after_split_to_actor.get(retained_split_id)
718 )
719 }
720 }
721
722 #[test]
723 fn test_drop_splits_to_empty() {
724 let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
725 actor_splits.insert(0, vec![TestSplit { id: 0 }]);
726
727 let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
728
729 let opts = SplitDiffOptions {
730 enable_scale_in: true,
731 enable_adaptive: false,
732 };
733
734 let diff = reassign_splits(
735 FragmentId::default(),
736 actor_splits,
737 &discovered_splits,
738 opts,
739 )
740 .unwrap();
741
742 assert!(!diff.is_empty())
743 }
744
745 #[test]
746 fn test_reassign_splits() {
747 let actor_splits = HashMap::new();
748 let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
749 assert!(
750 reassign_splits(
751 FragmentId::default(),
752 actor_splits,
753 &discovered_splits,
754 Default::default()
755 )
756 .is_none()
757 );
758
759 let actor_splits = (0..3).map(|i| (i, vec![])).collect();
760 let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
761 let diff = reassign_splits(
762 FragmentId::default(),
763 actor_splits,
764 &discovered_splits,
765 Default::default(),
766 )
767 .unwrap();
768 assert_eq!(diff.len(), 3);
769 for splits in diff.values() {
770 assert!(splits.is_empty())
771 }
772
773 let actor_splits = (0..3).map(|i| (i, vec![])).collect();
774 let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..3)
775 .map(|i| {
776 let split = TestSplit { id: i };
777 (split.id(), split)
778 })
779 .collect();
780
781 let diff = reassign_splits(
782 FragmentId::default(),
783 actor_splits,
784 &discovered_splits,
785 Default::default(),
786 )
787 .unwrap();
788 assert_eq!(diff.len(), 3);
789 for splits in diff.values() {
790 assert_eq!(splits.len(), 1);
791 }
792
793 check_all_splits(&discovered_splits, &diff);
794
795 let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
796 let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
797 .map(|i| {
798 let split = TestSplit { id: i };
799 (split.id(), split)
800 })
801 .collect();
802
803 let diff = reassign_splits(
804 FragmentId::default(),
805 actor_splits,
806 &discovered_splits,
807 Default::default(),
808 )
809 .unwrap();
810 assert_eq!(diff.len(), 3);
811 for splits in diff.values() {
812 let len = splits.len();
813 assert!(len == 1 || len == 2);
814 }
815
816 check_all_splits(&discovered_splits, &diff);
817
818 let mut actor_splits: HashMap<ActorId, Vec<TestSplit>> =
819 (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
820 actor_splits.insert(3, vec![]);
821 actor_splits.insert(4, vec![]);
822
823 let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
824 .map(|i| {
825 let split = TestSplit { id: i };
826 (split.id(), split)
827 })
828 .collect();
829
830 let diff = reassign_splits(
831 FragmentId::default(),
832 actor_splits,
833 &discovered_splits,
834 Default::default(),
835 )
836 .unwrap();
837 assert_eq!(diff.len(), 5);
838 for splits in diff.values() {
839 assert_eq!(splits.len(), 1);
840 }
841
842 check_all_splits(&discovered_splits, &diff);
843 }
844}