1use anyhow::anyhow;
16use itertools::Itertools;
17use risingwave_connector::source::fill_adaptive_split;
18
19use super::*;
20use crate::model::{ActorNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments};
21
22#[derive(Debug, Clone)]
23pub struct SplitState {
24 pub split_assignment: SplitAssignment,
25}
26
27impl SourceManager {
28 pub fn migrate_splits_for_backfill_actors(
30 &self,
31 fragment_id: FragmentId,
32 upstream_source_fragment_id: FragmentId,
33 curr_actor_ids: &[ActorId],
34 fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
35 no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
36 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
37 let actors = no_shuffle_upstream_actor_map
39 .iter()
40 .filter(|(id, _)| curr_actor_ids.contains(id))
41 .map(|(id, upstream_fragment_actors)| {
42 (
43 *id,
44 *upstream_fragment_actors
45 .get(&upstream_source_fragment_id)
46 .unwrap(),
47 )
48 });
49 let upstream_assignment = fragment_actor_splits
50 .get(&upstream_source_fragment_id)
51 .unwrap();
52 tracing::info!(
53 %fragment_id,
54 %upstream_source_fragment_id,
55 ?upstream_assignment,
56 "migrate_splits_for_backfill_actors"
57 );
58 Ok(align_splits(
59 actors,
60 |upstream_actor_id| upstream_assignment.get(&upstream_actor_id).cloned(),
61 fragment_id,
62 upstream_source_fragment_id,
63 )?)
64 }
65
66 #[await_tree::instrument]
70 pub async fn discover_splits(
71 &self,
72 table_fragments: &StreamJobFragments,
73 ) -> MetaResult<SourceSplitAssignment> {
74 let core = self.core.lock().await;
75
76 let source_fragments = table_fragments.stream_source_fragments();
77
78 let mut assigned = HashMap::new();
79
80 for (source_id, _fragments) in source_fragments {
81 let handle = core
82 .managed_sources
83 .get(&source_id)
84 .with_context(|| format!("could not find source {}", source_id))?;
85
86 if handle.splits.lock().await.splits.is_none() {
87 handle.force_tick().await?;
88 }
89
90 let Some(discovered) = handle.discovered_splits(source_id).await? else {
91 tracing::warn!(%source_id, "no splits detected (not ready)");
92 continue;
93 };
94 if let DiscoveredSplits::Fixed(ref splits) = discovered
95 && splits.is_empty()
96 {
97 tracing::warn!(%source_id, "no splits detected");
98 continue;
99 }
100 assigned.insert(source_id, discovered);
104 }
105
106 Ok(assigned)
107 }
108
109 pub fn resolve_fragment_to_actor_splits(
117 table_fragments: &StreamJobFragments,
118 source_assignment: &SourceSplitAssignment,
119 fragment_actor_ids: &HashMap<FragmentId, Vec<ActorId>>,
120 ) -> MetaResult<SplitAssignment> {
121 let source_fragments = table_fragments.stream_source_fragments();
122 let mut result: SplitAssignment = HashMap::new();
123 for (source_id, discovered) in source_assignment {
124 let fragment_ids = source_fragments.get(source_id).unwrap_or_else(|| {
125 panic!("source {} not found in stream job fragments", source_id)
126 });
127 for fragment_id in fragment_ids {
128 let actor_ids = fragment_actor_ids.get(fragment_id).unwrap_or_else(|| {
129 panic!("fragment {} not found in rendered actor IDs", fragment_id)
130 });
131 let actor_count = actor_ids.len();
132 match discovered {
133 DiscoveredSplits::Fixed(splits) => {
134 let empty_actor_splits: HashMap<ActorId, Vec<SplitImpl>> = actor_ids
135 .iter()
136 .map(|actor_id| (*actor_id, vec![]))
137 .collect();
138 if let Some(diff) = reassign_splits(
139 *fragment_id,
140 empty_actor_splits,
141 splits,
142 SplitDiffOptions::default(),
143 ) {
144 result.insert(*fragment_id, diff);
145 }
146 }
147 DiscoveredSplits::Adaptive(template) => {
148 let expanded = fill_adaptive_split(template, actor_count)?;
149 let expanded_splits: Vec<SplitImpl> = expanded.into_values().collect();
150 let mut actor_splits = HashMap::new();
151 for (i, actor_id) in actor_ids.iter().enumerate() {
152 actor_splits.insert(*actor_id, vec![expanded_splits[i].clone()]);
153 }
154 result.insert(*fragment_id, actor_splits);
155 }
156 }
157 }
158 }
159 Ok(result)
160 }
161
162 pub async fn discover_splits_for_replace_source(
170 &self,
171 table_fragments: &StreamJobFragments,
172 upstream_updates: &FragmentReplaceUpstream,
173 ) -> MetaResult<Option<SourceSplitAssignment>> {
174 if upstream_updates.is_empty() {
175 return Ok(Some(self.discover_splits(table_fragments).await?));
177 }
178 Ok(None)
181 }
182
183 pub fn resolve_replace_source_splits(
187 table_fragments: &StreamJobFragments,
188 upstream_updates: &FragmentReplaceUpstream,
189 actor_no_shuffle: &ActorNewNoShuffle,
195 get_upstream_actor_splits: impl Fn(FragmentId, ActorId) -> Option<Vec<SplitImpl>>,
196 ) -> MetaResult<SplitAssignment> {
197 tracing::debug!(?upstream_updates, "allocate_splits_for_replace_source");
198 assert!(!upstream_updates.is_empty());
199
200 let source_fragments = table_fragments.stream_source_fragments();
201 assert_eq!(
202 source_fragments.len(),
203 1,
204 "replace source job should only have one source"
205 );
206 let (_source_id, fragments) = source_fragments.into_iter().next().unwrap();
207 assert_eq!(
208 fragments.len(),
209 1,
210 "replace source job should only have one fragment"
211 );
212 let fragment_id = fragments.into_iter().next().unwrap();
213
214 debug_assert!(
215 upstream_updates.values().flatten().next().is_some()
216 && upstream_updates
217 .values()
218 .flatten()
219 .all(|(_, new_upstream_fragment_id)| {
220 *new_upstream_fragment_id == fragment_id
221 })
222 && upstream_updates
223 .values()
224 .flatten()
225 .map(|(upstream_fragment_id, _)| upstream_fragment_id)
226 .all_equal(),
227 "upstream update should only replace one fragment: {:?}",
228 upstream_updates
229 );
230 let prev_fragment_id = upstream_updates
231 .values()
232 .flatten()
233 .next()
234 .map(|(upstream_fragment_id, _)| *upstream_fragment_id)
235 .expect("non-empty");
236 let aligned_actors: HashMap<ActorId, ActorId> = actor_no_shuffle
249 .get(&fragment_id)
250 .map(HashMap::values)
251 .into_iter()
252 .flatten()
253 .flatten()
254 .map(|(upstream_actor_id, actor_id)| (*upstream_actor_id, *actor_id))
255 .collect();
256 let assignment = align_splits(
257 aligned_actors.into_iter(),
258 |actor_id| get_upstream_actor_splits(prev_fragment_id, actor_id),
259 fragment_id,
260 prev_fragment_id,
261 )?;
262 Ok(HashMap::from([(fragment_id, assignment)]))
263 }
264
265 pub fn resolve_backfill_splits(
269 table_fragments: &StreamJobFragments,
270 actor_no_shuffle: &ActorNewNoShuffle,
271 get_upstream_actor_splits: impl Fn(FragmentId, ActorId) -> Option<Vec<SplitImpl>>,
272 ) -> MetaResult<SplitAssignment> {
273 let source_backfill_fragments = table_fragments.source_backfill_fragments();
274
275 let mut assigned = HashMap::new();
276
277 for (_source_id, fragments) in source_backfill_fragments {
278 for (fragment_id, upstream_source_fragment_id) in fragments {
279 let upstream_actors: HashSet<ActorId> = actor_no_shuffle
281 .get(&upstream_source_fragment_id)
282 .and_then(|m| m.get(&fragment_id))
283 .ok_or_else(|| {
284 anyhow!(
285 "no upstream actors found from fragment {} to upstream source fragment {}",
286 fragment_id,
287 upstream_source_fragment_id
288 )
289 })?.keys().copied().collect();
290
291 let mut backfill_actors = vec![];
292 let Some(source_new_no_shuffle) = actor_no_shuffle
293 .get(&upstream_source_fragment_id)
294 .and_then(|source_upstream_actor_no_shuffle| {
295 source_upstream_actor_no_shuffle.get(&fragment_id)
296 })
297 else {
298 return Err(anyhow::anyhow!(
299 "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_no_shuffle: {actor_no_shuffle:?}",
300 fragment_id = fragment_id,
301 upstream_source_fragment_id = upstream_source_fragment_id,
302 actor_no_shuffle = actor_no_shuffle,
303 ).into());
304 };
305 for upstream_actor in &upstream_actors {
306 let Some(no_shuffle_backfill_actor) = source_new_no_shuffle.get(upstream_actor)
307 else {
308 return Err(anyhow::anyhow!(
309 "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, source_new_no_shuffle: {source_new_no_shuffle:?}",
310 fragment_id = fragment_id,
311 upstream_source_fragment_id = upstream_source_fragment_id,
312 upstream_actor = upstream_actor,
313 source_new_no_shuffle = source_new_no_shuffle
314 ).into());
315 };
316 backfill_actors.push((*no_shuffle_backfill_actor, *upstream_actor));
317 }
318 assigned.insert(
319 fragment_id,
320 align_splits(
321 backfill_actors,
322 |actor_id| get_upstream_actor_splits(upstream_source_fragment_id, actor_id),
323 fragment_id,
324 upstream_source_fragment_id,
325 )?,
326 );
327 }
328 }
329
330 Ok(assigned)
331 }
332}
333
334impl SourceManagerCore {
335 pub async fn reassign_splits(&self) -> MetaResult<HashMap<DatabaseId, SplitState>> {
341 let mut split_assignment: SplitAssignment = HashMap::new();
342
343 'loop_source: for (source_id, handle) in &self.managed_sources {
344 let source_fragment_ids = match self.source_fragments.get(source_id) {
345 Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids,
346 _ => {
347 continue;
348 }
349 };
350 let backfill_fragment_ids = self.backfill_fragments.get(source_id);
351
352 'loop_fragment: for &fragment_id in source_fragment_ids {
353 let actors = match self
354 .metadata_manager
355 .get_running_actors_of_fragment(fragment_id)
356 {
357 Ok(actors) => {
358 if actors.is_empty() {
359 tracing::warn!("No actors found for fragment {}", fragment_id);
360 continue 'loop_fragment;
361 }
362 actors
363 }
364 Err(err) => {
365 tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
366 continue 'loop_fragment;
367 }
368 };
369
370 let Some(discovered) = handle.discovered_splits(*source_id).await? else {
371 continue 'loop_source;
373 };
374 let discovered_splits = match discovered {
375 DiscoveredSplits::Fixed(splits) => {
376 if splits.is_empty() {
377 continue 'loop_source;
378 }
379 splits
380 }
381 DiscoveredSplits::Adaptive(template) => {
382 fill_adaptive_split(&template, actors.len())?
383 }
384 };
385
386 let prev_actor_splits = {
387 let guard = self.env.shared_actor_infos().read_guard();
388
389 guard
390 .get_fragment(fragment_id)
391 .and_then(|info| {
392 info.actors
393 .iter()
394 .map(|(actor_id, actor_info)| {
395 (*actor_id, actor_info.splits.clone())
396 })
397 .collect::<HashMap<_, _>>()
398 .into()
399 })
400 .unwrap_or_default()
401 };
402
403 if let Some(new_assignment) = reassign_splits(
404 fragment_id,
405 prev_actor_splits,
406 &discovered_splits,
407 SplitDiffOptions {
408 enable_scale_in: handle.enable_drop_split,
409 enable_adaptive: handle.enable_adaptive_splits,
410 },
411 ) {
412 split_assignment.insert(fragment_id, new_assignment);
413 }
414 }
415
416 if let Some(backfill_fragment_ids) = backfill_fragment_ids {
417 for (fragment_id, upstream_fragment_id) in backfill_fragment_ids {
419 let Some(upstream_assignment): Option<&HashMap<ActorId, Vec<SplitImpl>>> =
420 split_assignment.get(upstream_fragment_id)
421 else {
422 continue;
424 };
425 let actors = match self
426 .metadata_manager
427 .get_running_actors_for_source_backfill(*fragment_id, *upstream_fragment_id)
428 .await
429 {
430 Ok(actors) => {
431 if actors.is_empty() {
432 tracing::warn!("No actors found for fragment {}", fragment_id);
433 continue;
434 }
435 actors
436 }
437 Err(err) => {
438 tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
439 continue;
440 }
441 };
442 split_assignment.insert(
443 *fragment_id,
444 align_splits(
445 actors,
446 |upstream_actor_id| {
447 upstream_assignment.get(&upstream_actor_id).cloned()
448 },
449 *fragment_id,
450 *upstream_fragment_id,
451 )?,
452 );
453 }
454 }
455 }
456
457 let assignments = self
458 .metadata_manager
459 .split_fragment_map_by_database(split_assignment)
460 .await?;
461
462 let mut result = HashMap::new();
463 for (database_id, assignment) in assignments {
464 result.insert(
465 database_id,
466 SplitState {
467 split_assignment: assignment,
468 },
469 );
470 }
471
472 Ok(result)
473 }
474}
475
476pub fn reassign_splits<I, T>(
505 fragment_id: FragmentId,
506 actor_splits: HashMap<I, Vec<T>>,
507 discovered_splits: &BTreeMap<SplitId, T>,
508 opts: SplitDiffOptions,
509) -> Option<HashMap<I, Vec<T>>>
510where
511 I: Ord + std::hash::Hash + Eq,
512 T: SplitMetaData + Clone,
513{
514 if actor_splits.is_empty() {
516 return None;
517 }
518
519 let prev_split_ids: HashSet<_> = actor_splits
520 .values()
521 .flat_map(|splits| splits.iter().map(SplitMetaData::id))
522 .collect();
523
524 tracing::trace!(%fragment_id, prev_split_ids = ?prev_split_ids, "previous splits");
525 tracing::trace!(%fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits");
526
527 let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
528
529 let dropped_splits: HashSet<_> = prev_split_ids
530 .difference(&discovered_split_ids)
531 .cloned()
532 .collect();
533
534 if !dropped_splits.is_empty() {
535 if opts.enable_scale_in {
536 tracing::info!(%fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits");
537 } else {
538 tracing::warn!(%fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed");
539 }
540 }
541
542 let new_discovered_splits: BTreeSet<_> = discovered_split_ids
543 .into_iter()
544 .filter(|split_id| !prev_split_ids.contains(split_id))
545 .collect();
546
547 if opts.enable_scale_in || opts.enable_adaptive {
548 if dropped_splits.is_empty()
552 && new_discovered_splits.is_empty()
553 && !discovered_splits.is_empty()
554 {
555 return None;
556 }
557 } else {
558 if new_discovered_splits.is_empty() && !discovered_splits.is_empty() {
560 return None;
561 }
562 }
563
564 tracing::info!(%fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits");
565
566 let mut heap = BinaryHeap::with_capacity(actor_splits.len());
567
568 for (actor_id, mut splits) in actor_splits {
569 if opts.enable_scale_in || opts.enable_adaptive {
570 splits.retain(|split| !dropped_splits.contains(&split.id()));
571 }
572
573 heap.push(SplitsAssignment { actor_id, splits })
574 }
575
576 for split_id in new_discovered_splits {
577 let mut peek_ref = heap.peek_mut().unwrap();
586 peek_ref
587 .splits
588 .push(discovered_splits.get(&split_id).cloned().unwrap());
589 }
590
591 Some(
592 heap.into_iter()
593 .map(|SplitsAssignment { actor_id, splits }| (actor_id, splits))
594 .collect(),
595 )
596}
597
598pub fn align_splits(
613 aligned_actors: impl IntoIterator<Item = (ActorId, ActorId)>,
615 get_upstream_actor_splits: impl Fn(ActorId) -> Option<Vec<SplitImpl>>,
616 fragment_id: FragmentId,
617 upstream_source_fragment_id: FragmentId,
618) -> anyhow::Result<HashMap<ActorId, Vec<SplitImpl>>> {
619 aligned_actors
620 .into_iter()
621 .map(|(actor_id, upstream_actor_id)| {
622 let Some(splits) = get_upstream_actor_splits(upstream_actor_id) else {
623 return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_actor_id: {upstream_actor_id:?}"));
624 };
625
626 Ok((
627 actor_id,
628 splits,
629 ))
630 })
631 .collect()
632}
633
634#[derive(Debug)]
636struct SplitsAssignment<I, T: SplitMetaData> {
637 actor_id: I,
638 splits: Vec<T>,
639}
640
641impl<I, T: SplitMetaData + Clone> Eq for SplitsAssignment<I, T> {}
642
643impl<I, T: SplitMetaData + Clone> PartialEq<Self> for SplitsAssignment<I, T> {
644 fn eq(&self, other: &Self) -> bool {
645 self.splits.len() == other.splits.len()
646 }
647}
648
649impl<I: Ord, T: SplitMetaData + Clone> PartialOrd<Self> for SplitsAssignment<I, T> {
650 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
651 Some(self.cmp(other))
652 }
653}
654
655impl<I: Ord, T: SplitMetaData + Clone> Ord for SplitsAssignment<I, T> {
656 fn cmp(&self, other: &Self) -> Ordering {
657 other
659 .splits
660 .len()
661 .cmp(&self.splits.len())
662 .then(self.actor_id.cmp(&other.actor_id))
663 }
664}
665
666#[derive(Debug)]
667pub struct SplitDiffOptions {
668 pub enable_scale_in: bool,
669
670 pub enable_adaptive: bool,
672}
673
674#[allow(clippy::derivable_impls)]
675impl Default for SplitDiffOptions {
676 fn default() -> Self {
677 SplitDiffOptions {
678 enable_scale_in: false,
679 enable_adaptive: false,
680 }
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use std::collections::{BTreeMap, HashMap, HashSet};
687
688 use risingwave_common::types::JsonbVal;
689 use risingwave_connector::error::ConnectorResult;
690 use risingwave_connector::source::{SplitId, SplitMetaData};
691 use serde::{Deserialize, Serialize};
692
693 use super::*;
694 use crate::model::{ActorId, FragmentId};
695
696 #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
697 struct TestSplit {
698 id: u32,
699 }
700
701 impl SplitMetaData for TestSplit {
702 fn id(&self) -> SplitId {
703 format!("{}", self.id).into()
704 }
705
706 fn encode_to_json(&self) -> JsonbVal {
707 serde_json::to_value(*self).unwrap().into()
708 }
709
710 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
711 serde_json::from_value(value.take()).map_err(Into::into)
712 }
713
714 fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> {
715 Ok(())
716 }
717 }
718
719 fn check_all_splits(
720 discovered_splits: &BTreeMap<SplitId, TestSplit>,
721 diff: &HashMap<ActorId, Vec<TestSplit>>,
722 ) {
723 let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
724
725 for splits in diff.values() {
726 for split in splits {
727 assert!(split_ids.remove(&split.id()))
728 }
729 }
730
731 assert!(split_ids.is_empty());
732 }
733
734 #[test]
735 fn test_drop_splits() {
736 let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
737 actor_splits.insert(0.into(), vec![TestSplit { id: 0 }, TestSplit { id: 1 }]);
738 actor_splits.insert(1.into(), vec![TestSplit { id: 2 }, TestSplit { id: 3 }]);
739 actor_splits.insert(2.into(), vec![TestSplit { id: 4 }, TestSplit { id: 5 }]);
740
741 let mut prev_split_to_actor = HashMap::new();
742 for (actor_id, splits) in &actor_splits {
743 for split in splits {
744 prev_split_to_actor.insert(split.id(), *actor_id);
745 }
746 }
747
748 let discovered_splits: BTreeMap<SplitId, TestSplit> = (1..5)
749 .map(|i| {
750 let split = TestSplit { id: i };
751 (split.id(), split)
752 })
753 .collect();
754
755 let opts = SplitDiffOptions {
756 enable_scale_in: true,
757 enable_adaptive: false,
758 };
759
760 let prev_split_ids: HashSet<_> = actor_splits
761 .values()
762 .flat_map(|splits| splits.iter().map(|split| split.id()))
763 .collect();
764
765 let diff = reassign_splits(
766 FragmentId::default(),
767 actor_splits,
768 &discovered_splits,
769 opts,
770 )
771 .unwrap();
772 check_all_splits(&discovered_splits, &diff);
773
774 let mut after_split_to_actor = HashMap::new();
775 for (actor_id, splits) in &diff {
776 for split in splits {
777 after_split_to_actor.insert(split.id(), *actor_id);
778 }
779 }
780
781 let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
782
783 let retained_split_ids: HashSet<_> =
784 prev_split_ids.intersection(&discovered_split_ids).collect();
785
786 for retained_split_id in retained_split_ids {
787 assert_eq!(
788 prev_split_to_actor.get(retained_split_id),
789 after_split_to_actor.get(retained_split_id)
790 )
791 }
792 }
793
794 #[test]
795 fn test_drop_splits_to_empty() {
796 let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
797 actor_splits.insert(0.into(), vec![TestSplit { id: 0 }]);
798
799 let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
800
801 let opts = SplitDiffOptions {
802 enable_scale_in: true,
803 enable_adaptive: false,
804 };
805
806 let diff = reassign_splits(
807 FragmentId::default(),
808 actor_splits,
809 &discovered_splits,
810 opts,
811 )
812 .unwrap();
813
814 assert!(!diff.is_empty())
815 }
816
817 #[test]
818 fn test_reassign_splits() {
819 let actor_splits: HashMap<u32, _> = HashMap::new();
820 let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
821 assert!(
822 reassign_splits(
823 FragmentId::default(),
824 actor_splits,
825 &discovered_splits,
826 Default::default()
827 )
828 .is_none()
829 );
830
831 let actor_splits = (0..3).map(|i| (i, vec![])).collect();
832 let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
833 let diff = reassign_splits(
834 FragmentId::default(),
835 actor_splits,
836 &discovered_splits,
837 Default::default(),
838 )
839 .unwrap();
840 assert_eq!(diff.len(), 3);
841 for splits in diff.values() {
842 assert!(splits.is_empty())
843 }
844
845 let actor_splits = (0..3).map(|i| (i.into(), vec![])).collect();
846 let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..3)
847 .map(|i| {
848 let split = TestSplit { id: i };
849 (split.id(), split)
850 })
851 .collect();
852
853 let diff = reassign_splits(
854 FragmentId::default(),
855 actor_splits,
856 &discovered_splits,
857 Default::default(),
858 )
859 .unwrap();
860 assert_eq!(diff.len(), 3);
861 for splits in diff.values() {
862 assert_eq!(splits.len(), 1);
863 }
864
865 check_all_splits(&discovered_splits, &diff);
866
867 let actor_splits = (0..3)
868 .map(|i| (i.into(), vec![TestSplit { id: i }]))
869 .collect();
870 let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
871 .map(|i| {
872 let split = TestSplit { id: i };
873 (split.id(), split)
874 })
875 .collect();
876
877 let diff = reassign_splits(
878 FragmentId::default(),
879 actor_splits,
880 &discovered_splits,
881 Default::default(),
882 )
883 .unwrap();
884 assert_eq!(diff.len(), 3);
885 for splits in diff.values() {
886 let len = splits.len();
887 assert!(len == 1 || len == 2);
888 }
889
890 check_all_splits(&discovered_splits, &diff);
891
892 let mut actor_splits: HashMap<ActorId, Vec<TestSplit>> = (0..3)
893 .map(|i| (i.into(), vec![TestSplit { id: i }]))
894 .collect();
895 actor_splits.insert(3.into(), vec![]);
896 actor_splits.insert(4.into(), vec![]);
897
898 let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
899 .map(|i| {
900 let split = TestSplit { id: i };
901 (split.id(), split)
902 })
903 .collect();
904
905 let diff = reassign_splits(
906 FragmentId::default(),
907 actor_splits,
908 &discovered_splits,
909 Default::default(),
910 )
911 .unwrap();
912 assert_eq!(diff.len(), 5);
913 for splits in diff.values() {
914 assert_eq!(splits.len(), 1);
915 }
916
917 check_all_splits(&discovered_splits, &diff);
918 }
919}