risingwave_meta/stream/source_manager/
split_assignment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use itertools::Itertools;
17
18use super::*;
19use crate::model::{FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments};
20
21impl SourceManager {
22    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
23    ///
24    /// Very occasionally split removal may happen during scaling, in which case we need to
25    /// use the old splits for reallocation instead of the latest splits (which may be missing),
26    /// so that we can resolve the split removal in the next command.
27    pub async fn migrate_splits_for_source_actors(
28        &self,
29        fragment_id: FragmentId,
30        prev_actor_ids: &[ActorId],
31        curr_actor_ids: &[ActorId],
32    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
33        let core = self.core.lock().await;
34
35        let prev_splits = prev_actor_ids
36            .iter()
37            .flat_map(|actor_id| core.actor_splits.get(actor_id).unwrap())
38            .map(|split| (split.id(), split.clone()))
39            .collect();
40
41        let empty_actor_splits = curr_actor_ids
42            .iter()
43            .map(|actor_id| (*actor_id, vec![]))
44            .collect();
45
46        let diff = reassign_splits(
47            fragment_id,
48            empty_actor_splits,
49            &prev_splits,
50            // pre-allocate splits is the first time getting splits and it does not have scale-in scene
51            SplitDiffOptions::default(),
52        )
53        .unwrap_or_default();
54
55        Ok(diff)
56    }
57
58    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
59    pub fn migrate_splits_for_backfill_actors(
60        &self,
61        fragment_id: FragmentId,
62        upstream_source_fragment_id: FragmentId,
63        curr_actor_ids: &[ActorId],
64        fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
65        no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
66    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
67        // align splits for backfill fragments with its upstream source fragment
68        let actors = no_shuffle_upstream_actor_map
69            .iter()
70            .filter(|(id, _)| curr_actor_ids.contains(id))
71            .map(|(id, upstream_fragment_actors)| {
72                (
73                    *id,
74                    *upstream_fragment_actors
75                        .get(&upstream_source_fragment_id)
76                        .unwrap(),
77                )
78            });
79        let upstream_assignment = fragment_actor_splits
80            .get(&upstream_source_fragment_id)
81            .unwrap();
82        tracing::info!(
83            fragment_id,
84            upstream_source_fragment_id,
85            ?upstream_assignment,
86            "migrate_splits_for_backfill_actors"
87        );
88        Ok(align_splits(
89            actors,
90            upstream_assignment,
91            fragment_id,
92            upstream_source_fragment_id,
93        )?)
94    }
95
96    /// Allocates splits to actors for a newly created source executor.
97    pub async fn allocate_splits(
98        &self,
99        table_fragments: &StreamJobFragments,
100    ) -> MetaResult<SplitAssignment> {
101        let core = self.core.lock().await;
102
103        let source_fragments = table_fragments.stream_source_fragments();
104
105        let mut assigned = HashMap::new();
106
107        'loop_source: for (source_id, fragments) in source_fragments {
108            let handle = core
109                .managed_sources
110                .get(&source_id)
111                .with_context(|| format!("could not find source {}", source_id))?;
112
113            if handle.splits.lock().await.splits.is_none() {
114                handle.force_tick().await?;
115            }
116
117            for fragment_id in fragments {
118                let empty_actor_splits: HashMap<u32, Vec<SplitImpl>> = table_fragments
119                    .fragments
120                    .get(&fragment_id)
121                    .unwrap()
122                    .actors
123                    .iter()
124                    .map(|actor| (actor.actor_id, vec![]))
125                    .collect();
126                let actor_hashset: HashSet<u32> = empty_actor_splits.keys().cloned().collect();
127                let splits = handle.discovered_splits(source_id, &actor_hashset).await?;
128                if splits.is_empty() {
129                    tracing::warn!("no splits detected for source {}", source_id);
130                    continue 'loop_source;
131                }
132
133                if let Some(diff) = reassign_splits(
134                    fragment_id,
135                    empty_actor_splits,
136                    &splits,
137                    SplitDiffOptions::default(),
138                ) {
139                    assigned.insert(fragment_id, diff);
140                }
141            }
142        }
143
144        Ok(assigned)
145    }
146
147    /// Allocates splits to actors for replace source job.
148    pub async fn allocate_splits_for_replace_source(
149        &self,
150        table_fragments: &StreamJobFragments,
151        upstream_updates: &FragmentReplaceUpstream,
152        // new_no_shuffle:
153        //     upstream fragment_id ->
154        //     downstream fragment_id ->
155        //     upstream actor_id ->
156        //     downstream actor_id
157        new_no_shuffle: &FragmentNewNoShuffle,
158    ) -> MetaResult<SplitAssignment> {
159        tracing::debug!(?upstream_updates, "allocate_splits_for_replace_source");
160        if upstream_updates.is_empty() {
161            // no existing downstream. We can just re-allocate splits arbitrarily.
162            return self.allocate_splits(table_fragments).await;
163        }
164
165        let core = self.core.lock().await;
166
167        let source_fragments = table_fragments.stream_source_fragments();
168        assert_eq!(
169            source_fragments.len(),
170            1,
171            "replace source job should only have one source"
172        );
173        let (_source_id, fragments) = source_fragments.into_iter().next().unwrap();
174        assert_eq!(
175            fragments.len(),
176            1,
177            "replace source job should only have one fragment"
178        );
179        let fragment_id = fragments.into_iter().next().unwrap();
180
181        debug_assert!(
182            upstream_updates.values().flatten().next().is_some()
183                && upstream_updates
184                    .values()
185                    .flatten()
186                    .all(|(_, new_upstream_fragment_id)| {
187                        *new_upstream_fragment_id == fragment_id
188                    })
189                && upstream_updates
190                    .values()
191                    .flatten()
192                    .map(|(upstream_fragment_id, _)| upstream_fragment_id)
193                    .all_equal(),
194            "upstream update should only replace one fragment: {:?}",
195            upstream_updates
196        );
197        let prev_fragment_id = upstream_updates
198            .values()
199            .flatten()
200            .next()
201            .map(|(upstream_fragment_id, _)| *upstream_fragment_id)
202            .expect("non-empty");
203        // Here we align the new source executor to backfill executors
204        //
205        // old_source => new_source            backfill_1
206        // actor_x1   => actor_y1 -----┬------>actor_a1
207        // actor_x2   => actor_y2 -----┼-┬---->actor_a2
208        //                             │ │
209        //                             │ │     backfill_2
210        //                             └─┼---->actor_b1
211        //                               └---->actor_b2
212        //
213        // Note: we can choose any backfill actor to align here.
214        // We use `HashMap` to dedup.
215        let aligned_actors: HashMap<ActorId, ActorId> = new_no_shuffle
216            .get(&fragment_id)
217            .map(HashMap::values)
218            .into_iter()
219            .flatten()
220            .flatten()
221            .map(|(upstream_actor_id, actor_id)| (*upstream_actor_id, *actor_id))
222            .collect();
223        let assignment = align_splits(
224            aligned_actors.into_iter(),
225            &core.actor_splits,
226            fragment_id,
227            prev_fragment_id,
228        )?;
229        Ok(HashMap::from([(fragment_id, assignment)]))
230    }
231
232    /// Allocates splits to actors for a newly created `SourceBackfill` executor.
233    ///
234    /// Unlike [`Self::allocate_splits`], which creates a new assignment,
235    /// this method aligns the splits for backfill fragments with its upstream source fragment ([`align_splits`]).
236    pub async fn allocate_splits_for_backfill(
237        &self,
238        table_fragments: &StreamJobFragments,
239        upstream_new_no_shuffle: &FragmentNewNoShuffle,
240        upstream_actors: &HashMap<FragmentId, HashSet<ActorId>>,
241    ) -> MetaResult<SplitAssignment> {
242        let core = self.core.lock().await;
243
244        let source_backfill_fragments = table_fragments.source_backfill_fragments()?;
245
246        let mut assigned = HashMap::new();
247
248        for (_source_id, fragments) in source_backfill_fragments {
249            for (fragment_id, upstream_source_fragment_id) in fragments {
250                let upstream_actors = upstream_actors
251                    .get(&upstream_source_fragment_id)
252                    .ok_or_else(|| {
253                        anyhow!(
254                            "no upstream actors found from fragment {} to upstream source fragment {}",
255                            fragment_id,
256                            upstream_source_fragment_id
257                        )
258                    })?;
259                let mut backfill_actors = vec![];
260                let Some(source_new_no_shuffle) = upstream_new_no_shuffle
261                    .get(&upstream_source_fragment_id)
262                    .and_then(|source_upstream_new_no_shuffle| {
263                        source_upstream_new_no_shuffle.get(&fragment_id)
264                    })
265                else {
266                    return Err(anyhow::anyhow!(
267                            "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_new_no_shuffle: {upstream_new_no_shuffle:?}",
268                            fragment_id = fragment_id,
269                            upstream_source_fragment_id = upstream_source_fragment_id,
270                            upstream_new_no_shuffle = upstream_new_no_shuffle,
271                        ).into());
272                };
273                for upstream_actor in upstream_actors {
274                    let Some(no_shuffle_backfill_actor) = source_new_no_shuffle.get(upstream_actor)
275                    else {
276                        return Err(anyhow::anyhow!(
277                            "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, source_new_no_shuffle: {source_new_no_shuffle:?}",
278                            fragment_id = fragment_id,
279                            upstream_source_fragment_id = upstream_source_fragment_id,
280                            upstream_actor = upstream_actor,
281                            source_new_no_shuffle = source_new_no_shuffle
282                        ).into());
283                    };
284                    backfill_actors.push((*no_shuffle_backfill_actor, *upstream_actor));
285                }
286                assigned.insert(
287                    fragment_id,
288                    align_splits(
289                        backfill_actors,
290                        &core.actor_splits,
291                        fragment_id,
292                        upstream_source_fragment_id,
293                    )?,
294                );
295            }
296        }
297
298        Ok(assigned)
299    }
300}
301
302impl SourceManagerCore {
303    /// Checks whether the external source metadata has changed,
304    /// and re-assigns splits if there's a diff.
305    ///
306    /// `self.actor_splits` will not be updated. It will be updated by `Self::apply_source_change`,
307    /// after the mutation barrier has been collected.
308    pub async fn reassign_splits(&self) -> MetaResult<HashMap<DatabaseId, SplitAssignment>> {
309        let mut split_assignment: SplitAssignment = HashMap::new();
310
311        'loop_source: for (source_id, handle) in &self.managed_sources {
312            let source_fragment_ids = match self.source_fragments.get(source_id) {
313                Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids,
314                _ => {
315                    continue;
316                }
317            };
318            let backfill_fragment_ids = self.backfill_fragments.get(source_id);
319
320            'loop_fragment: for &fragment_id in source_fragment_ids {
321                let actors = match self
322                    .metadata_manager
323                    .get_running_actors_of_fragment(fragment_id)
324                    .await
325                {
326                    Ok(actors) => {
327                        if actors.is_empty() {
328                            tracing::warn!("No actors found for fragment {}", fragment_id);
329                            continue 'loop_fragment;
330                        }
331                        actors
332                    }
333                    Err(err) => {
334                        tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
335                        continue 'loop_fragment;
336                    }
337                };
338
339                let discovered_splits = handle.discovered_splits(*source_id, &actors).await?;
340                if discovered_splits.is_empty() {
341                    // The discover loop for this source is not ready yet; we'll wait for the next run
342                    continue 'loop_source;
343                }
344
345                let prev_actor_splits: HashMap<_, _> = actors
346                    .into_iter()
347                    .map(|actor_id| {
348                        (
349                            actor_id,
350                            self.actor_splits
351                                .get(&actor_id)
352                                .cloned()
353                                .unwrap_or_default(),
354                        )
355                    })
356                    .collect();
357
358                if let Some(new_assignment) = reassign_splits(
359                    fragment_id,
360                    prev_actor_splits,
361                    &discovered_splits,
362                    SplitDiffOptions {
363                        enable_scale_in: handle.enable_drop_split,
364                        enable_adaptive: handle.enable_adaptive_splits,
365                    },
366                ) {
367                    split_assignment.insert(fragment_id, new_assignment);
368                }
369            }
370
371            if let Some(backfill_fragment_ids) = backfill_fragment_ids {
372                // align splits for backfill fragments with its upstream source fragment
373                for (fragment_id, upstream_fragment_id) in backfill_fragment_ids {
374                    let Some(upstream_assignment) = split_assignment.get(upstream_fragment_id)
375                    else {
376                        // upstream fragment unchanged, do not update backfill fragment too
377                        continue;
378                    };
379                    let actors = match self
380                        .metadata_manager
381                        .get_running_actors_for_source_backfill(*fragment_id, *upstream_fragment_id)
382                        .await
383                    {
384                        Ok(actors) => {
385                            if actors.is_empty() {
386                                tracing::warn!("No actors found for fragment {}", fragment_id);
387                                continue;
388                            }
389                            actors
390                        }
391                        Err(err) => {
392                            tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
393                            continue;
394                        }
395                    };
396                    split_assignment.insert(
397                        *fragment_id,
398                        align_splits(
399                            actors,
400                            upstream_assignment,
401                            *fragment_id,
402                            *upstream_fragment_id,
403                        )?,
404                    );
405                }
406            }
407        }
408
409        self.metadata_manager
410            .split_fragment_map_by_database(split_assignment)
411            .await
412    }
413}
414
415/// Reassigns splits if there are new splits or dropped splits,
416/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled.
417///
418/// The existing splits will remain unmoved in their currently assigned actor.
419///
420/// If an actor has an upstream actor, it should be a backfill executor,
421/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case.
422/// Use [`align_splits`] instead.**
423///
424/// - `fragment_id`: just for logging
425///
426/// ## Different connectors' behavior of split change
427///
428/// ### Kafka and Pulsar
429/// They only support increasing the number of splits via adding new empty splits.
430/// Old data is not moved.
431///
432/// ### Kinesis
433/// It supports *pairwise* shard split and merge.
434///
435/// In both cases, old data remain in the old shard(s) and the old shard is still available.
436/// New data are routed to the new shard(s).
437/// After the retention period has expired, the old shard will become `EXPIRED` and isn't
438/// listed any more. In other words, the total number of shards will first increase and then decrease.
439///
440/// See also:
441/// - [Kinesis resharding doc](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing)
442/// - An example of how the shards can be like: <https://stackoverflow.com/questions/72272034/list-shard-show-more-shards-than-provisioned>
443fn reassign_splits<T>(
444    fragment_id: FragmentId,
445    actor_splits: HashMap<ActorId, Vec<T>>,
446    discovered_splits: &BTreeMap<SplitId, T>,
447    opts: SplitDiffOptions,
448) -> Option<HashMap<ActorId, Vec<T>>>
449where
450    T: SplitMetaData + Clone,
451{
452    // if no actors, return
453    if actor_splits.is_empty() {
454        return None;
455    }
456
457    let prev_split_ids: HashSet<_> = actor_splits
458        .values()
459        .flat_map(|splits| splits.iter().map(SplitMetaData::id))
460        .collect();
461
462    tracing::trace!(fragment_id, prev_split_ids = ?prev_split_ids, "previous splits");
463    tracing::trace!(fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits");
464
465    let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
466
467    let dropped_splits: HashSet<_> = prev_split_ids
468        .difference(&discovered_split_ids)
469        .cloned()
470        .collect();
471
472    if !dropped_splits.is_empty() {
473        if opts.enable_scale_in {
474            tracing::info!(fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits");
475        } else {
476            tracing::warn!(fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed");
477        }
478    }
479
480    let new_discovered_splits: BTreeSet<_> = discovered_split_ids
481        .into_iter()
482        .filter(|split_id| !prev_split_ids.contains(split_id))
483        .collect();
484
485    if opts.enable_scale_in || opts.enable_adaptive {
486        // if we support scale in, no more splits are discovered, and no splits are dropped, return
487        // we need to check if discovered_split_ids is empty, because if it is empty, we need to
488        // handle the case of scale in to zero (like deleting all objects from s3)
489        if dropped_splits.is_empty()
490            && new_discovered_splits.is_empty()
491            && !discovered_splits.is_empty()
492        {
493            return None;
494        }
495    } else {
496        // if we do not support scale in, and no more splits are discovered, return
497        if new_discovered_splits.is_empty() && !discovered_splits.is_empty() {
498            return None;
499        }
500    }
501
502    tracing::info!(fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits");
503
504    let mut heap = BinaryHeap::with_capacity(actor_splits.len());
505
506    for (actor_id, mut splits) in actor_splits {
507        if opts.enable_scale_in || opts.enable_adaptive {
508            splits.retain(|split| !dropped_splits.contains(&split.id()));
509        }
510
511        heap.push(ActorSplitsAssignment { actor_id, splits })
512    }
513
514    for split_id in new_discovered_splits {
515        // ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e.,
516        // we get the assignment with the least splits here.
517
518        // Note: If multiple actors have the same number of splits, it will be randomly picked.
519        // When the number of source actors is larger than the number of splits,
520        // It's possible that the assignment is uneven.
521        // e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158
522        // TODO: We should make the assignment rack-aware to make sure it's even.
523        let mut peek_ref = heap.peek_mut().unwrap();
524        peek_ref
525            .splits
526            .push(discovered_splits.get(&split_id).cloned().unwrap());
527    }
528
529    Some(
530        heap.into_iter()
531            .map(|ActorSplitsAssignment { actor_id, splits }| (actor_id, splits))
532            .collect(),
533    )
534}
535
536/// Assign splits to a new set of actors, according to existing assignment.
537///
538/// illustration:
539/// ```text
540/// upstream                               new
541/// actor x1 [split 1, split2]      ->     actor y1 [split 1, split2]
542/// actor x2 [split 3]              ->     actor y2 [split 3]
543/// ...
544/// ```
545fn align_splits(
546    // (actor_id, upstream_actor_id)
547    aligned_actors: impl IntoIterator<Item = (ActorId, ActorId)>,
548    existing_assignment: &HashMap<ActorId, Vec<SplitImpl>>,
549    fragment_id: FragmentId,
550    upstream_source_fragment_id: FragmentId,
551) -> anyhow::Result<HashMap<ActorId, Vec<SplitImpl>>> {
552    aligned_actors
553        .into_iter()
554        .map(|(actor_id, upstream_actor_id)| {
555            let Some(splits) = existing_assignment.get(&upstream_actor_id) else {
556                return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_assignment: {existing_assignment:?}, upstream_actor_id: {upstream_actor_id:?}"));
557            };
558            Ok((
559                actor_id,
560                splits.clone(),
561            ))
562        })
563        .collect()
564}
565
566/// Note: the `PartialEq` and `Ord` impl just compares the number of splits.
567#[derive(Debug)]
568struct ActorSplitsAssignment<T: SplitMetaData> {
569    actor_id: ActorId,
570    splits: Vec<T>,
571}
572
573impl<T: SplitMetaData + Clone> Eq for ActorSplitsAssignment<T> {}
574
575impl<T: SplitMetaData + Clone> PartialEq<Self> for ActorSplitsAssignment<T> {
576    fn eq(&self, other: &Self) -> bool {
577        self.splits.len() == other.splits.len()
578    }
579}
580
581impl<T: SplitMetaData + Clone> PartialOrd<Self> for ActorSplitsAssignment<T> {
582    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
583        Some(self.cmp(other))
584    }
585}
586
587impl<T: SplitMetaData + Clone> Ord for ActorSplitsAssignment<T> {
588    fn cmp(&self, other: &Self) -> Ordering {
589        // Note: this is reversed order, to make BinaryHeap a min heap.
590        other.splits.len().cmp(&self.splits.len())
591    }
592}
593
594#[derive(Debug)]
595pub struct SplitDiffOptions {
596    pub enable_scale_in: bool,
597
598    /// For most connectors, this should be false. When enabled, RisingWave will not track any progress.
599    pub enable_adaptive: bool,
600}
601
602#[allow(clippy::derivable_impls)]
603impl Default for SplitDiffOptions {
604    fn default() -> Self {
605        SplitDiffOptions {
606            enable_scale_in: false,
607            enable_adaptive: false,
608        }
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use std::collections::{BTreeMap, HashMap, HashSet};
615
616    use risingwave_common::types::JsonbVal;
617    use risingwave_connector::error::ConnectorResult;
618    use risingwave_connector::source::{SplitId, SplitMetaData};
619    use serde::{Deserialize, Serialize};
620
621    use super::*;
622    use crate::model::{ActorId, FragmentId};
623
624    #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
625    struct TestSplit {
626        id: u32,
627    }
628
629    impl SplitMetaData for TestSplit {
630        fn id(&self) -> SplitId {
631            format!("{}", self.id).into()
632        }
633
634        fn encode_to_json(&self) -> JsonbVal {
635            serde_json::to_value(*self).unwrap().into()
636        }
637
638        fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
639            serde_json::from_value(value.take()).map_err(Into::into)
640        }
641
642        fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> {
643            Ok(())
644        }
645    }
646
647    fn check_all_splits(
648        discovered_splits: &BTreeMap<SplitId, TestSplit>,
649        diff: &HashMap<ActorId, Vec<TestSplit>>,
650    ) {
651        let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
652
653        for splits in diff.values() {
654            for split in splits {
655                assert!(split_ids.remove(&split.id()))
656            }
657        }
658
659        assert!(split_ids.is_empty());
660    }
661
662    #[test]
663    fn test_drop_splits() {
664        let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
665        actor_splits.insert(0, vec![TestSplit { id: 0 }, TestSplit { id: 1 }]);
666        actor_splits.insert(1, vec![TestSplit { id: 2 }, TestSplit { id: 3 }]);
667        actor_splits.insert(2, vec![TestSplit { id: 4 }, TestSplit { id: 5 }]);
668
669        let mut prev_split_to_actor = HashMap::new();
670        for (actor_id, splits) in &actor_splits {
671            for split in splits {
672                prev_split_to_actor.insert(split.id(), *actor_id);
673            }
674        }
675
676        let discovered_splits: BTreeMap<SplitId, TestSplit> = (1..5)
677            .map(|i| {
678                let split = TestSplit { id: i };
679                (split.id(), split)
680            })
681            .collect();
682
683        let opts = SplitDiffOptions {
684            enable_scale_in: true,
685            enable_adaptive: false,
686        };
687
688        let prev_split_ids: HashSet<_> = actor_splits
689            .values()
690            .flat_map(|splits| splits.iter().map(|split| split.id()))
691            .collect();
692
693        let diff = reassign_splits(
694            FragmentId::default(),
695            actor_splits,
696            &discovered_splits,
697            opts,
698        )
699        .unwrap();
700        check_all_splits(&discovered_splits, &diff);
701
702        let mut after_split_to_actor = HashMap::new();
703        for (actor_id, splits) in &diff {
704            for split in splits {
705                after_split_to_actor.insert(split.id(), *actor_id);
706            }
707        }
708
709        let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
710
711        let retained_split_ids: HashSet<_> =
712            prev_split_ids.intersection(&discovered_split_ids).collect();
713
714        for retained_split_id in retained_split_ids {
715            assert_eq!(
716                prev_split_to_actor.get(retained_split_id),
717                after_split_to_actor.get(retained_split_id)
718            )
719        }
720    }
721
722    #[test]
723    fn test_drop_splits_to_empty() {
724        let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
725        actor_splits.insert(0, vec![TestSplit { id: 0 }]);
726
727        let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
728
729        let opts = SplitDiffOptions {
730            enable_scale_in: true,
731            enable_adaptive: false,
732        };
733
734        let diff = reassign_splits(
735            FragmentId::default(),
736            actor_splits,
737            &discovered_splits,
738            opts,
739        )
740        .unwrap();
741
742        assert!(!diff.is_empty())
743    }
744
745    #[test]
746    fn test_reassign_splits() {
747        let actor_splits = HashMap::new();
748        let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
749        assert!(
750            reassign_splits(
751                FragmentId::default(),
752                actor_splits,
753                &discovered_splits,
754                Default::default()
755            )
756            .is_none()
757        );
758
759        let actor_splits = (0..3).map(|i| (i, vec![])).collect();
760        let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
761        let diff = reassign_splits(
762            FragmentId::default(),
763            actor_splits,
764            &discovered_splits,
765            Default::default(),
766        )
767        .unwrap();
768        assert_eq!(diff.len(), 3);
769        for splits in diff.values() {
770            assert!(splits.is_empty())
771        }
772
773        let actor_splits = (0..3).map(|i| (i, vec![])).collect();
774        let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..3)
775            .map(|i| {
776                let split = TestSplit { id: i };
777                (split.id(), split)
778            })
779            .collect();
780
781        let diff = reassign_splits(
782            FragmentId::default(),
783            actor_splits,
784            &discovered_splits,
785            Default::default(),
786        )
787        .unwrap();
788        assert_eq!(diff.len(), 3);
789        for splits in diff.values() {
790            assert_eq!(splits.len(), 1);
791        }
792
793        check_all_splits(&discovered_splits, &diff);
794
795        let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
796        let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
797            .map(|i| {
798                let split = TestSplit { id: i };
799                (split.id(), split)
800            })
801            .collect();
802
803        let diff = reassign_splits(
804            FragmentId::default(),
805            actor_splits,
806            &discovered_splits,
807            Default::default(),
808        )
809        .unwrap();
810        assert_eq!(diff.len(), 3);
811        for splits in diff.values() {
812            let len = splits.len();
813            assert!(len == 1 || len == 2);
814        }
815
816        check_all_splits(&discovered_splits, &diff);
817
818        let mut actor_splits: HashMap<ActorId, Vec<TestSplit>> =
819            (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
820        actor_splits.insert(3, vec![]);
821        actor_splits.insert(4, vec![]);
822
823        let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
824            .map(|i| {
825                let split = TestSplit { id: i };
826                (split.id(), split)
827            })
828            .collect();
829
830        let diff = reassign_splits(
831            FragmentId::default(),
832            actor_splits,
833            &discovered_splits,
834            Default::default(),
835        )
836        .unwrap();
837        assert_eq!(diff.len(), 5);
838        for splits in diff.values() {
839            assert_eq!(splits.len(), 1);
840        }
841
842        check_all_splits(&discovered_splits, &diff);
843    }
844}