1use anyhow::anyhow;
16use itertools::Itertools;
17
18use super::*;
19use crate::model::{FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments};
20
21#[derive(Debug, Clone)]
22pub struct SplitState {
23 pub split_assignment: SplitAssignment,
24 pub discovered_source_splits: DiscoveredSourceSplits,
25}
26
27impl SourceManager {
28 pub fn migrate_splits_for_backfill_actors(
30 &self,
31 fragment_id: FragmentId,
32 upstream_source_fragment_id: FragmentId,
33 curr_actor_ids: &[ActorId],
34 fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
35 no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
36 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
37 let actors = no_shuffle_upstream_actor_map
39 .iter()
40 .filter(|(id, _)| curr_actor_ids.contains(id))
41 .map(|(id, upstream_fragment_actors)| {
42 (
43 *id,
44 *upstream_fragment_actors
45 .get(&upstream_source_fragment_id)
46 .unwrap(),
47 )
48 });
49 let upstream_assignment = fragment_actor_splits
50 .get(&upstream_source_fragment_id)
51 .unwrap();
52 tracing::info!(
53 fragment_id,
54 upstream_source_fragment_id,
55 ?upstream_assignment,
56 "migrate_splits_for_backfill_actors"
57 );
58 Ok(align_splits(
59 actors,
60 SplitAlignmentContext::Pending(upstream_assignment),
61 fragment_id,
62 upstream_source_fragment_id,
63 )?)
64 }
65
66 #[await_tree::instrument]
68 pub async fn allocate_splits(
69 &self,
70 table_fragments: &StreamJobFragments,
71 ) -> MetaResult<SplitAssignment> {
72 let core = self.core.lock().await;
73
74 let stream_job_id = table_fragments.stream_job_id;
76 let source_fragments = table_fragments.stream_source_fragments();
77
78 let mut assigned = HashMap::new();
79
80 'loop_source: for (source_id, fragments) in source_fragments {
81 let handle = core
82 .managed_sources
83 .get(&source_id)
84 .with_context(|| format!("could not find source {}", source_id))?;
85
86 if handle.splits.lock().await.splits.is_none() {
87 handle.force_tick().await?;
88 }
89
90 for fragment_id in fragments {
91 let empty_actor_splits: HashMap<u32, Vec<SplitImpl>> = table_fragments
92 .fragments
93 .get(&fragment_id)
94 .unwrap()
95 .actors
96 .iter()
97 .map(|actor| (actor.actor_id, vec![]))
98 .collect();
99 let actor_hashset: HashSet<u32> = empty_actor_splits.keys().cloned().collect();
100 let splits = handle.discovered_splits(source_id, &actor_hashset).await?;
101 if splits.is_empty() {
102 tracing::warn!(?stream_job_id, source_id, "no splits detected");
103 continue 'loop_source;
104 }
105 if let Some(diff) = reassign_splits(
106 fragment_id,
107 empty_actor_splits,
108 &splits,
109 SplitDiffOptions::default(),
110 ) {
111 assigned.insert(fragment_id, diff);
112 }
113 }
114 }
115
116 Ok(assigned)
117 }
118
119 pub async fn allocate_splits_for_replace_source(
121 &self,
122 table_fragments: &StreamJobFragments,
123 upstream_updates: &FragmentReplaceUpstream,
124 new_no_shuffle: &FragmentNewNoShuffle,
130 ) -> MetaResult<SplitAssignment> {
131 tracing::debug!(?upstream_updates, "allocate_splits_for_replace_source");
132 if upstream_updates.is_empty() {
133 return self.allocate_splits(table_fragments).await;
135 }
136
137 let core = self.core.lock().await;
138
139 let source_fragments = table_fragments.stream_source_fragments();
140 assert_eq!(
141 source_fragments.len(),
142 1,
143 "replace source job should only have one source"
144 );
145 let (_source_id, fragments) = source_fragments.into_iter().next().unwrap();
146 assert_eq!(
147 fragments.len(),
148 1,
149 "replace source job should only have one fragment"
150 );
151 let fragment_id = fragments.into_iter().next().unwrap();
152
153 debug_assert!(
154 upstream_updates.values().flatten().next().is_some()
155 && upstream_updates
156 .values()
157 .flatten()
158 .all(|(_, new_upstream_fragment_id)| {
159 *new_upstream_fragment_id == fragment_id
160 })
161 && upstream_updates
162 .values()
163 .flatten()
164 .map(|(upstream_fragment_id, _)| upstream_fragment_id)
165 .all_equal(),
166 "upstream update should only replace one fragment: {:?}",
167 upstream_updates
168 );
169 let prev_fragment_id = upstream_updates
170 .values()
171 .flatten()
172 .next()
173 .map(|(upstream_fragment_id, _)| *upstream_fragment_id)
174 .expect("non-empty");
175 let aligned_actors: HashMap<ActorId, ActorId> = new_no_shuffle
188 .get(&fragment_id)
189 .map(HashMap::values)
190 .into_iter()
191 .flatten()
192 .flatten()
193 .map(|(upstream_actor_id, actor_id)| (*upstream_actor_id, *actor_id))
194 .collect();
195 let assignment = align_splits(
196 aligned_actors.into_iter(),
197 SplitAlignmentContext::Stored(core.env.shared_actor_infos()),
198 fragment_id,
199 prev_fragment_id,
200 )?;
201 Ok(HashMap::from([(fragment_id, assignment)]))
202 }
203
204 #[await_tree::instrument]
209 pub async fn allocate_splits_for_backfill(
210 &self,
211 table_fragments: &StreamJobFragments,
212 upstream_new_no_shuffle: &FragmentNewNoShuffle,
213 upstream_actors: &HashMap<FragmentId, HashSet<ActorId>>,
214 ) -> MetaResult<SplitAssignment> {
215 let core = self.core.lock().await;
216
217 let source_backfill_fragments = table_fragments.source_backfill_fragments();
218
219 let mut assigned = HashMap::new();
220
221 for (_source_id, fragments) in source_backfill_fragments {
222 for (fragment_id, upstream_source_fragment_id) in fragments {
223 let upstream_actors = upstream_actors
224 .get(&upstream_source_fragment_id)
225 .ok_or_else(|| {
226 anyhow!(
227 "no upstream actors found from fragment {} to upstream source fragment {}",
228 fragment_id,
229 upstream_source_fragment_id
230 )
231 })?;
232 let mut backfill_actors = vec![];
233 let Some(source_new_no_shuffle) = upstream_new_no_shuffle
234 .get(&upstream_source_fragment_id)
235 .and_then(|source_upstream_new_no_shuffle| {
236 source_upstream_new_no_shuffle.get(&fragment_id)
237 })
238 else {
239 return Err(anyhow::anyhow!(
240 "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_new_no_shuffle: {upstream_new_no_shuffle:?}",
241 fragment_id = fragment_id,
242 upstream_source_fragment_id = upstream_source_fragment_id,
243 upstream_new_no_shuffle = upstream_new_no_shuffle,
244 ).into());
245 };
246 for upstream_actor in upstream_actors {
247 let Some(no_shuffle_backfill_actor) = source_new_no_shuffle.get(upstream_actor)
248 else {
249 return Err(anyhow::anyhow!(
250 "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, source_new_no_shuffle: {source_new_no_shuffle:?}",
251 fragment_id = fragment_id,
252 upstream_source_fragment_id = upstream_source_fragment_id,
253 upstream_actor = upstream_actor,
254 source_new_no_shuffle = source_new_no_shuffle
255 ).into());
256 };
257 backfill_actors.push((*no_shuffle_backfill_actor, *upstream_actor));
258 }
259 assigned.insert(
260 fragment_id,
261 align_splits(
262 backfill_actors,
263 SplitAlignmentContext::Stored(core.env.shared_actor_infos()),
264 fragment_id,
265 upstream_source_fragment_id,
266 )?,
267 );
268 }
269 }
270
271 Ok(assigned)
272 }
273}
274
275impl SourceManagerCore {
276 pub async fn reassign_splits(&self) -> MetaResult<HashMap<DatabaseId, SplitState>> {
282 let mut split_assignment: SplitAssignment = HashMap::new();
283 let mut source_splits_discovered = HashMap::new();
284
285 'loop_source: for (source_id, handle) in &self.managed_sources {
286 let source_fragment_ids = match self.source_fragments.get(source_id) {
287 Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids,
288 _ => {
289 continue;
290 }
291 };
292 let backfill_fragment_ids = self.backfill_fragments.get(source_id);
293
294 'loop_fragment: for &fragment_id in source_fragment_ids {
295 let actors = match self
296 .metadata_manager
297 .get_running_actors_of_fragment(fragment_id)
298 .await
299 {
300 Ok(actors) => {
301 if actors.is_empty() {
302 tracing::warn!("No actors found for fragment {}", fragment_id);
303 continue 'loop_fragment;
304 }
305 actors
306 }
307 Err(err) => {
308 tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
309 continue 'loop_fragment;
310 }
311 };
312
313 let discovered_splits = handle.discovered_splits(*source_id, &actors).await?;
314 if discovered_splits.is_empty() {
315 continue 'loop_source;
317 }
318
319 let prev_actor_splits = {
320 let guard = self.env.shared_actor_infos().read_guard();
321
322 guard
323 .get_fragment(fragment_id)
324 .and_then(|info| {
325 info.actors
326 .iter()
327 .map(|(actor_id, actor_info)| {
328 (*actor_id, actor_info.splits.clone())
329 })
330 .collect::<HashMap<_, _>>()
331 .into()
332 })
333 .unwrap_or_default()
334 };
335
336 source_splits_discovered.insert(
337 *source_id,
338 discovered_splits.values().cloned().collect_vec(),
339 );
340
341 source_splits_discovered.insert(
342 *source_id,
343 discovered_splits.values().cloned().collect_vec(),
344 );
345
346 if let Some(new_assignment) = reassign_splits(
347 fragment_id,
348 prev_actor_splits,
349 &discovered_splits,
350 SplitDiffOptions {
351 enable_scale_in: handle.enable_drop_split,
352 enable_adaptive: handle.enable_adaptive_splits,
353 },
354 ) {
355 split_assignment.insert(fragment_id, new_assignment);
356 }
357 }
358
359 if let Some(backfill_fragment_ids) = backfill_fragment_ids {
360 for (fragment_id, upstream_fragment_id) in backfill_fragment_ids {
362 let Some(upstream_assignment): Option<&HashMap<ActorId, Vec<SplitImpl>>> =
363 split_assignment.get(upstream_fragment_id)
364 else {
365 continue;
367 };
368 let actors = match self
369 .metadata_manager
370 .get_running_actors_for_source_backfill(*fragment_id, *upstream_fragment_id)
371 .await
372 {
373 Ok(actors) => {
374 if actors.is_empty() {
375 tracing::warn!("No actors found for fragment {}", fragment_id);
376 continue;
377 }
378 actors
379 }
380 Err(err) => {
381 tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
382 continue;
383 }
384 };
385 split_assignment.insert(
386 *fragment_id,
387 align_splits(
388 actors,
389 SplitAlignmentContext::Pending(upstream_assignment),
390 *fragment_id,
391 *upstream_fragment_id,
392 )?,
393 );
394 }
395 }
396 }
397
398 let assignments = self
399 .metadata_manager
400 .split_fragment_map_by_database(split_assignment)
401 .await?;
402
403 let mut result = HashMap::new();
404 for (database_id, assignment) in assignments {
405 let mut source_splits = HashMap::new();
406 for (source_id, fragment_ids) in &self.source_fragments {
407 if fragment_ids
408 .iter()
409 .any(|fragment_id| assignment.contains_key(fragment_id))
410 && let Some(splits) = source_splits_discovered.get(source_id)
411 {
412 source_splits.insert(*source_id, splits.clone());
413 }
414 }
415
416 result.insert(
417 database_id,
418 SplitState {
419 split_assignment: assignment,
420 discovered_source_splits: source_splits,
421 },
422 );
423 }
424
425 Ok(result)
426 }
427}
428
429pub fn reassign_splits<T>(
458 fragment_id: FragmentId,
459 actor_splits: HashMap<ActorId, Vec<T>>,
460 discovered_splits: &BTreeMap<SplitId, T>,
461 opts: SplitDiffOptions,
462) -> Option<HashMap<ActorId, Vec<T>>>
463where
464 T: SplitMetaData + Clone,
465{
466 if actor_splits.is_empty() {
468 return None;
469 }
470
471 let prev_split_ids: HashSet<_> = actor_splits
472 .values()
473 .flat_map(|splits| splits.iter().map(SplitMetaData::id))
474 .collect();
475
476 tracing::trace!(fragment_id, prev_split_ids = ?prev_split_ids, "previous splits");
477 tracing::trace!(fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits");
478
479 let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
480
481 let dropped_splits: HashSet<_> = prev_split_ids
482 .difference(&discovered_split_ids)
483 .cloned()
484 .collect();
485
486 if !dropped_splits.is_empty() {
487 if opts.enable_scale_in {
488 tracing::info!(fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits");
489 } else {
490 tracing::warn!(fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed");
491 }
492 }
493
494 let new_discovered_splits: BTreeSet<_> = discovered_split_ids
495 .into_iter()
496 .filter(|split_id| !prev_split_ids.contains(split_id))
497 .collect();
498
499 if opts.enable_scale_in || opts.enable_adaptive {
500 if dropped_splits.is_empty()
504 && new_discovered_splits.is_empty()
505 && !discovered_splits.is_empty()
506 {
507 return None;
508 }
509 } else {
510 if new_discovered_splits.is_empty() && !discovered_splits.is_empty() {
512 return None;
513 }
514 }
515
516 tracing::info!(fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits");
517
518 let mut heap = BinaryHeap::with_capacity(actor_splits.len());
519
520 for (actor_id, mut splits) in actor_splits {
521 if opts.enable_scale_in || opts.enable_adaptive {
522 splits.retain(|split| !dropped_splits.contains(&split.id()));
523 }
524
525 heap.push(ActorSplitsAssignment { actor_id, splits })
526 }
527
528 for split_id in new_discovered_splits {
529 let mut peek_ref = heap.peek_mut().unwrap();
538 peek_ref
539 .splits
540 .push(discovered_splits.get(&split_id).cloned().unwrap());
541 }
542
543 Some(
544 heap.into_iter()
545 .map(|ActorSplitsAssignment { actor_id, splits }| (actor_id, splits))
546 .collect(),
547 )
548}
549
550pub enum SplitAlignmentContext<'a> {
551 Stored(&'a SharedActorInfos),
552 Pending(&'a HashMap<ActorId, Vec<SplitImpl>>),
553}
554
555fn align_splits(
565 aligned_actors: impl IntoIterator<Item = (ActorId, ActorId)>,
567 alignment_context: SplitAlignmentContext<'_>,
568 fragment_id: FragmentId,
569 upstream_source_fragment_id: FragmentId,
570) -> anyhow::Result<HashMap<ActorId, Vec<SplitImpl>>> {
571 aligned_actors
572 .into_iter()
573 .map(|(actor_id, upstream_actor_id)| {
574 let Some(splits) = (match alignment_context {
575 SplitAlignmentContext::Stored(info) => {
576 let guard = info.read_guard();
577 let actor_info = guard.iter_over_fragments().find_map(|(_, fragment_info)| fragment_info.actors.get(&upstream_actor_id));
578 actor_info.map(|info| info.splits.clone())
579 }
580 SplitAlignmentContext::Pending(existing_assignment) => {
581 existing_assignment.get(&upstream_actor_id).cloned()
582 }
583 }) else {
584 return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_actor_id: {upstream_actor_id:?}"));
585 };
586
587 Ok((
588 actor_id,
589 splits,
590 ))
591 })
592 .collect()
593}
594
595#[derive(Debug)]
597struct ActorSplitsAssignment<T: SplitMetaData> {
598 actor_id: ActorId,
599 splits: Vec<T>,
600}
601
602impl<T: SplitMetaData + Clone> Eq for ActorSplitsAssignment<T> {}
603
604impl<T: SplitMetaData + Clone> PartialEq<Self> for ActorSplitsAssignment<T> {
605 fn eq(&self, other: &Self) -> bool {
606 self.splits.len() == other.splits.len()
607 }
608}
609
610impl<T: SplitMetaData + Clone> PartialOrd<Self> for ActorSplitsAssignment<T> {
611 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
612 Some(self.cmp(other))
613 }
614}
615
616impl<T: SplitMetaData + Clone> Ord for ActorSplitsAssignment<T> {
617 fn cmp(&self, other: &Self) -> Ordering {
618 other
620 .splits
621 .len()
622 .cmp(&self.splits.len())
623 .then(self.actor_id.cmp(&other.actor_id))
624 }
625}
626
627#[derive(Debug)]
628pub struct SplitDiffOptions {
629 pub enable_scale_in: bool,
630
631 pub enable_adaptive: bool,
633}
634
635#[allow(clippy::derivable_impls)]
636impl Default for SplitDiffOptions {
637 fn default() -> Self {
638 SplitDiffOptions {
639 enable_scale_in: false,
640 enable_adaptive: false,
641 }
642 }
643}
644
645#[cfg(test)]
646mod tests {
647 use std::collections::{BTreeMap, HashMap, HashSet};
648
649 use risingwave_common::types::JsonbVal;
650 use risingwave_connector::error::ConnectorResult;
651 use risingwave_connector::source::{SplitId, SplitMetaData};
652 use serde::{Deserialize, Serialize};
653
654 use super::*;
655 use crate::model::{ActorId, FragmentId};
656
657 #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
658 struct TestSplit {
659 id: u32,
660 }
661
662 impl SplitMetaData for TestSplit {
663 fn id(&self) -> SplitId {
664 format!("{}", self.id).into()
665 }
666
667 fn encode_to_json(&self) -> JsonbVal {
668 serde_json::to_value(*self).unwrap().into()
669 }
670
671 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
672 serde_json::from_value(value.take()).map_err(Into::into)
673 }
674
675 fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> {
676 Ok(())
677 }
678 }
679
680 fn check_all_splits(
681 discovered_splits: &BTreeMap<SplitId, TestSplit>,
682 diff: &HashMap<ActorId, Vec<TestSplit>>,
683 ) {
684 let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
685
686 for splits in diff.values() {
687 for split in splits {
688 assert!(split_ids.remove(&split.id()))
689 }
690 }
691
692 assert!(split_ids.is_empty());
693 }
694
695 #[test]
696 fn test_drop_splits() {
697 let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
698 actor_splits.insert(0, vec![TestSplit { id: 0 }, TestSplit { id: 1 }]);
699 actor_splits.insert(1, vec![TestSplit { id: 2 }, TestSplit { id: 3 }]);
700 actor_splits.insert(2, vec![TestSplit { id: 4 }, TestSplit { id: 5 }]);
701
702 let mut prev_split_to_actor = HashMap::new();
703 for (actor_id, splits) in &actor_splits {
704 for split in splits {
705 prev_split_to_actor.insert(split.id(), *actor_id);
706 }
707 }
708
709 let discovered_splits: BTreeMap<SplitId, TestSplit> = (1..5)
710 .map(|i| {
711 let split = TestSplit { id: i };
712 (split.id(), split)
713 })
714 .collect();
715
716 let opts = SplitDiffOptions {
717 enable_scale_in: true,
718 enable_adaptive: false,
719 };
720
721 let prev_split_ids: HashSet<_> = actor_splits
722 .values()
723 .flat_map(|splits| splits.iter().map(|split| split.id()))
724 .collect();
725
726 let diff = reassign_splits(
727 FragmentId::default(),
728 actor_splits,
729 &discovered_splits,
730 opts,
731 )
732 .unwrap();
733 check_all_splits(&discovered_splits, &diff);
734
735 let mut after_split_to_actor = HashMap::new();
736 for (actor_id, splits) in &diff {
737 for split in splits {
738 after_split_to_actor.insert(split.id(), *actor_id);
739 }
740 }
741
742 let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
743
744 let retained_split_ids: HashSet<_> =
745 prev_split_ids.intersection(&discovered_split_ids).collect();
746
747 for retained_split_id in retained_split_ids {
748 assert_eq!(
749 prev_split_to_actor.get(retained_split_id),
750 after_split_to_actor.get(retained_split_id)
751 )
752 }
753 }
754
755 #[test]
756 fn test_drop_splits_to_empty() {
757 let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
758 actor_splits.insert(0, vec![TestSplit { id: 0 }]);
759
760 let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
761
762 let opts = SplitDiffOptions {
763 enable_scale_in: true,
764 enable_adaptive: false,
765 };
766
767 let diff = reassign_splits(
768 FragmentId::default(),
769 actor_splits,
770 &discovered_splits,
771 opts,
772 )
773 .unwrap();
774
775 assert!(!diff.is_empty())
776 }
777
778 #[test]
779 fn test_reassign_splits() {
780 let actor_splits = HashMap::new();
781 let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
782 assert!(
783 reassign_splits(
784 FragmentId::default(),
785 actor_splits,
786 &discovered_splits,
787 Default::default()
788 )
789 .is_none()
790 );
791
792 let actor_splits = (0..3).map(|i| (i, vec![])).collect();
793 let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
794 let diff = reassign_splits(
795 FragmentId::default(),
796 actor_splits,
797 &discovered_splits,
798 Default::default(),
799 )
800 .unwrap();
801 assert_eq!(diff.len(), 3);
802 for splits in diff.values() {
803 assert!(splits.is_empty())
804 }
805
806 let actor_splits = (0..3).map(|i| (i, vec![])).collect();
807 let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..3)
808 .map(|i| {
809 let split = TestSplit { id: i };
810 (split.id(), split)
811 })
812 .collect();
813
814 let diff = reassign_splits(
815 FragmentId::default(),
816 actor_splits,
817 &discovered_splits,
818 Default::default(),
819 )
820 .unwrap();
821 assert_eq!(diff.len(), 3);
822 for splits in diff.values() {
823 assert_eq!(splits.len(), 1);
824 }
825
826 check_all_splits(&discovered_splits, &diff);
827
828 let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
829 let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
830 .map(|i| {
831 let split = TestSplit { id: i };
832 (split.id(), split)
833 })
834 .collect();
835
836 let diff = reassign_splits(
837 FragmentId::default(),
838 actor_splits,
839 &discovered_splits,
840 Default::default(),
841 )
842 .unwrap();
843 assert_eq!(diff.len(), 3);
844 for splits in diff.values() {
845 let len = splits.len();
846 assert!(len == 1 || len == 2);
847 }
848
849 check_all_splits(&discovered_splits, &diff);
850
851 let mut actor_splits: HashMap<ActorId, Vec<TestSplit>> =
852 (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
853 actor_splits.insert(3, vec![]);
854 actor_splits.insert(4, vec![]);
855
856 let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
857 .map(|i| {
858 let split = TestSplit { id: i };
859 (split.id(), split)
860 })
861 .collect();
862
863 let diff = reassign_splits(
864 FragmentId::default(),
865 actor_splits,
866 &discovered_splits,
867 Default::default(),
868 )
869 .unwrap();
870 assert_eq!(diff.len(), 5);
871 for splits in diff.values() {
872 assert_eq!(splits.len(), 1);
873 }
874
875 check_all_splits(&discovered_splits, &diff);
876 }
877}