risingwave_meta/stream/source_manager/
split_assignment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use itertools::Itertools;
17
18use super::*;
19use crate::model::{FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments};
20
21#[derive(Debug, Clone)]
22pub struct SplitState {
23    pub split_assignment: SplitAssignment,
24    pub discovered_source_splits: DiscoveredSourceSplits,
25}
26
27impl SourceManager {
28    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
29    pub fn migrate_splits_for_backfill_actors(
30        &self,
31        fragment_id: FragmentId,
32        upstream_source_fragment_id: FragmentId,
33        curr_actor_ids: &[ActorId],
34        fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
35        no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
36    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
37        // align splits for backfill fragments with its upstream source fragment
38        let actors = no_shuffle_upstream_actor_map
39            .iter()
40            .filter(|(id, _)| curr_actor_ids.contains(id))
41            .map(|(id, upstream_fragment_actors)| {
42                (
43                    *id,
44                    *upstream_fragment_actors
45                        .get(&upstream_source_fragment_id)
46                        .unwrap(),
47                )
48            });
49        let upstream_assignment = fragment_actor_splits
50            .get(&upstream_source_fragment_id)
51            .unwrap();
52        tracing::info!(
53            fragment_id,
54            upstream_source_fragment_id,
55            ?upstream_assignment,
56            "migrate_splits_for_backfill_actors"
57        );
58        Ok(align_splits(
59            actors,
60            SplitAlignmentContext::Pending(upstream_assignment),
61            fragment_id,
62            upstream_source_fragment_id,
63        )?)
64    }
65
66    /// Allocates splits to actors for a newly created source executor.
67    #[await_tree::instrument]
68    pub async fn allocate_splits(
69        &self,
70        table_fragments: &StreamJobFragments,
71    ) -> MetaResult<SplitAssignment> {
72        let core = self.core.lock().await;
73
74        // stream_job_id is used to fetch pre-calculated splits for CDC table.
75        let stream_job_id = table_fragments.stream_job_id;
76        let source_fragments = table_fragments.stream_source_fragments();
77
78        let mut assigned = HashMap::new();
79
80        'loop_source: for (source_id, fragments) in source_fragments {
81            let handle = core
82                .managed_sources
83                .get(&source_id)
84                .with_context(|| format!("could not find source {}", source_id))?;
85
86            if handle.splits.lock().await.splits.is_none() {
87                handle.force_tick().await?;
88            }
89
90            for fragment_id in fragments {
91                let empty_actor_splits: HashMap<u32, Vec<SplitImpl>> = table_fragments
92                    .fragments
93                    .get(&fragment_id)
94                    .unwrap()
95                    .actors
96                    .iter()
97                    .map(|actor| (actor.actor_id, vec![]))
98                    .collect();
99                let actor_hashset: HashSet<u32> = empty_actor_splits.keys().cloned().collect();
100                let splits = handle.discovered_splits(source_id, &actor_hashset).await?;
101                if splits.is_empty() {
102                    tracing::warn!(?stream_job_id, source_id, "no splits detected");
103                    continue 'loop_source;
104                }
105                if let Some(diff) = reassign_splits(
106                    fragment_id,
107                    empty_actor_splits,
108                    &splits,
109                    SplitDiffOptions::default(),
110                ) {
111                    assigned.insert(fragment_id, diff);
112                }
113            }
114        }
115
116        Ok(assigned)
117    }
118
119    /// Allocates splits to actors for replace source job.
120    pub async fn allocate_splits_for_replace_source(
121        &self,
122        table_fragments: &StreamJobFragments,
123        upstream_updates: &FragmentReplaceUpstream,
124        // new_no_shuffle:
125        //     upstream fragment_id ->
126        //     downstream fragment_id ->
127        //     upstream actor_id ->
128        //     downstream actor_id
129        new_no_shuffle: &FragmentNewNoShuffle,
130    ) -> MetaResult<SplitAssignment> {
131        tracing::debug!(?upstream_updates, "allocate_splits_for_replace_source");
132        if upstream_updates.is_empty() {
133            // no existing downstream. We can just re-allocate splits arbitrarily.
134            return self.allocate_splits(table_fragments).await;
135        }
136
137        let core = self.core.lock().await;
138
139        let source_fragments = table_fragments.stream_source_fragments();
140        assert_eq!(
141            source_fragments.len(),
142            1,
143            "replace source job should only have one source"
144        );
145        let (_source_id, fragments) = source_fragments.into_iter().next().unwrap();
146        assert_eq!(
147            fragments.len(),
148            1,
149            "replace source job should only have one fragment"
150        );
151        let fragment_id = fragments.into_iter().next().unwrap();
152
153        debug_assert!(
154            upstream_updates.values().flatten().next().is_some()
155                && upstream_updates
156                    .values()
157                    .flatten()
158                    .all(|(_, new_upstream_fragment_id)| {
159                        *new_upstream_fragment_id == fragment_id
160                    })
161                && upstream_updates
162                    .values()
163                    .flatten()
164                    .map(|(upstream_fragment_id, _)| upstream_fragment_id)
165                    .all_equal(),
166            "upstream update should only replace one fragment: {:?}",
167            upstream_updates
168        );
169        let prev_fragment_id = upstream_updates
170            .values()
171            .flatten()
172            .next()
173            .map(|(upstream_fragment_id, _)| *upstream_fragment_id)
174            .expect("non-empty");
175        // Here we align the new source executor to backfill executors
176        //
177        // old_source => new_source            backfill_1
178        // actor_x1   => actor_y1 -----┬------>actor_a1
179        // actor_x2   => actor_y2 -----┼-┬---->actor_a2
180        //                             │ │
181        //                             │ │     backfill_2
182        //                             └─┼---->actor_b1
183        //                               └---->actor_b2
184        //
185        // Note: we can choose any backfill actor to align here.
186        // We use `HashMap` to dedup.
187        let aligned_actors: HashMap<ActorId, ActorId> = new_no_shuffle
188            .get(&fragment_id)
189            .map(HashMap::values)
190            .into_iter()
191            .flatten()
192            .flatten()
193            .map(|(upstream_actor_id, actor_id)| (*upstream_actor_id, *actor_id))
194            .collect();
195        let assignment = align_splits(
196            aligned_actors.into_iter(),
197            SplitAlignmentContext::Stored(core.env.shared_actor_infos()),
198            fragment_id,
199            prev_fragment_id,
200        )?;
201        Ok(HashMap::from([(fragment_id, assignment)]))
202    }
203
204    /// Allocates splits to actors for a newly created `SourceBackfill` executor.
205    ///
206    /// Unlike [`Self::allocate_splits`], which creates a new assignment,
207    /// this method aligns the splits for backfill fragments with its upstream source fragment ([`align_splits`]).
208    #[await_tree::instrument]
209    pub async fn allocate_splits_for_backfill(
210        &self,
211        table_fragments: &StreamJobFragments,
212        upstream_new_no_shuffle: &FragmentNewNoShuffle,
213        upstream_actors: &HashMap<FragmentId, HashSet<ActorId>>,
214    ) -> MetaResult<SplitAssignment> {
215        let core = self.core.lock().await;
216
217        let source_backfill_fragments = table_fragments.source_backfill_fragments();
218
219        let mut assigned = HashMap::new();
220
221        for (_source_id, fragments) in source_backfill_fragments {
222            for (fragment_id, upstream_source_fragment_id) in fragments {
223                let upstream_actors = upstream_actors
224                    .get(&upstream_source_fragment_id)
225                    .ok_or_else(|| {
226                        anyhow!(
227                            "no upstream actors found from fragment {} to upstream source fragment {}",
228                            fragment_id,
229                            upstream_source_fragment_id
230                        )
231                    })?;
232                let mut backfill_actors = vec![];
233                let Some(source_new_no_shuffle) = upstream_new_no_shuffle
234                    .get(&upstream_source_fragment_id)
235                    .and_then(|source_upstream_new_no_shuffle| {
236                        source_upstream_new_no_shuffle.get(&fragment_id)
237                    })
238                else {
239                    return Err(anyhow::anyhow!(
240                            "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_new_no_shuffle: {upstream_new_no_shuffle:?}",
241                            fragment_id = fragment_id,
242                            upstream_source_fragment_id = upstream_source_fragment_id,
243                            upstream_new_no_shuffle = upstream_new_no_shuffle,
244                        ).into());
245                };
246                for upstream_actor in upstream_actors {
247                    let Some(no_shuffle_backfill_actor) = source_new_no_shuffle.get(upstream_actor)
248                    else {
249                        return Err(anyhow::anyhow!(
250                            "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, source_new_no_shuffle: {source_new_no_shuffle:?}",
251                            fragment_id = fragment_id,
252                            upstream_source_fragment_id = upstream_source_fragment_id,
253                            upstream_actor = upstream_actor,
254                            source_new_no_shuffle = source_new_no_shuffle
255                        ).into());
256                    };
257                    backfill_actors.push((*no_shuffle_backfill_actor, *upstream_actor));
258                }
259                assigned.insert(
260                    fragment_id,
261                    align_splits(
262                        backfill_actors,
263                        SplitAlignmentContext::Stored(core.env.shared_actor_infos()),
264                        fragment_id,
265                        upstream_source_fragment_id,
266                    )?,
267                );
268            }
269        }
270
271        Ok(assigned)
272    }
273}
274
275impl SourceManagerCore {
276    /// Checks whether the external source metadata has changed,
277    /// and re-assigns splits if there's a diff.
278    ///
279    /// `self.actor_splits` will not be updated. It will be updated by `Self::apply_source_change`,
280    /// after the mutation barrier has been collected.
281    pub async fn reassign_splits(&self) -> MetaResult<HashMap<DatabaseId, SplitState>> {
282        let mut split_assignment: SplitAssignment = HashMap::new();
283        let mut source_splits_discovered = HashMap::new();
284
285        'loop_source: for (source_id, handle) in &self.managed_sources {
286            let source_fragment_ids = match self.source_fragments.get(source_id) {
287                Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids,
288                _ => {
289                    continue;
290                }
291            };
292            let backfill_fragment_ids = self.backfill_fragments.get(source_id);
293
294            'loop_fragment: for &fragment_id in source_fragment_ids {
295                let actors = match self
296                    .metadata_manager
297                    .get_running_actors_of_fragment(fragment_id)
298                    .await
299                {
300                    Ok(actors) => {
301                        if actors.is_empty() {
302                            tracing::warn!("No actors found for fragment {}", fragment_id);
303                            continue 'loop_fragment;
304                        }
305                        actors
306                    }
307                    Err(err) => {
308                        tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
309                        continue 'loop_fragment;
310                    }
311                };
312
313                let discovered_splits = handle.discovered_splits(*source_id, &actors).await?;
314                if discovered_splits.is_empty() {
315                    // The discover loop for this source is not ready yet; we'll wait for the next run
316                    continue 'loop_source;
317                }
318
319                let prev_actor_splits = {
320                    let guard = self.env.shared_actor_infos().read_guard();
321
322                    guard
323                        .get_fragment(fragment_id)
324                        .and_then(|info| {
325                            info.actors
326                                .iter()
327                                .map(|(actor_id, actor_info)| {
328                                    (*actor_id, actor_info.splits.clone())
329                                })
330                                .collect::<HashMap<_, _>>()
331                                .into()
332                        })
333                        .unwrap_or_default()
334                };
335
336                source_splits_discovered.insert(
337                    *source_id,
338                    discovered_splits.values().cloned().collect_vec(),
339                );
340
341                source_splits_discovered.insert(
342                    *source_id,
343                    discovered_splits.values().cloned().collect_vec(),
344                );
345
346                if let Some(new_assignment) = reassign_splits(
347                    fragment_id,
348                    prev_actor_splits,
349                    &discovered_splits,
350                    SplitDiffOptions {
351                        enable_scale_in: handle.enable_drop_split,
352                        enable_adaptive: handle.enable_adaptive_splits,
353                    },
354                ) {
355                    split_assignment.insert(fragment_id, new_assignment);
356                }
357            }
358
359            if let Some(backfill_fragment_ids) = backfill_fragment_ids {
360                // align splits for backfill fragments with its upstream source fragment
361                for (fragment_id, upstream_fragment_id) in backfill_fragment_ids {
362                    let Some(upstream_assignment): Option<&HashMap<ActorId, Vec<SplitImpl>>> =
363                        split_assignment.get(upstream_fragment_id)
364                    else {
365                        // upstream fragment unchanged, do not update backfill fragment too
366                        continue;
367                    };
368                    let actors = match self
369                        .metadata_manager
370                        .get_running_actors_for_source_backfill(*fragment_id, *upstream_fragment_id)
371                        .await
372                    {
373                        Ok(actors) => {
374                            if actors.is_empty() {
375                                tracing::warn!("No actors found for fragment {}", fragment_id);
376                                continue;
377                            }
378                            actors
379                        }
380                        Err(err) => {
381                            tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
382                            continue;
383                        }
384                    };
385                    split_assignment.insert(
386                        *fragment_id,
387                        align_splits(
388                            actors,
389                            SplitAlignmentContext::Pending(upstream_assignment),
390                            *fragment_id,
391                            *upstream_fragment_id,
392                        )?,
393                    );
394                }
395            }
396        }
397
398        let assignments = self
399            .metadata_manager
400            .split_fragment_map_by_database(split_assignment)
401            .await?;
402
403        let mut result = HashMap::new();
404        for (database_id, assignment) in assignments {
405            let mut source_splits = HashMap::new();
406            for (source_id, fragment_ids) in &self.source_fragments {
407                if fragment_ids
408                    .iter()
409                    .any(|fragment_id| assignment.contains_key(fragment_id))
410                    && let Some(splits) = source_splits_discovered.get(source_id)
411                {
412                    source_splits.insert(*source_id, splits.clone());
413                }
414            }
415
416            result.insert(
417                database_id,
418                SplitState {
419                    split_assignment: assignment,
420                    discovered_source_splits: source_splits,
421                },
422            );
423        }
424
425        Ok(result)
426    }
427}
428
429/// Reassigns splits if there are new splits or dropped splits,
430/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled.
431///
432/// The existing splits will remain unmoved in their currently assigned actor.
433///
434/// If an actor has an upstream actor, it should be a backfill executor,
435/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case.
436/// Use [`align_splits`] instead.**
437///
438/// - `fragment_id`: just for logging
439///
440/// ## Different connectors' behavior of split change
441///
442/// ### Kafka and Pulsar
443/// They only support increasing the number of splits via adding new empty splits.
444/// Old data is not moved.
445///
446/// ### Kinesis
447/// It supports *pairwise* shard split and merge.
448///
449/// In both cases, old data remain in the old shard(s) and the old shard is still available.
450/// New data are routed to the new shard(s).
451/// After the retention period has expired, the old shard will become `EXPIRED` and isn't
452/// listed any more. In other words, the total number of shards will first increase and then decrease.
453///
454/// See also:
455/// - [Kinesis resharding doc](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing)
456/// - An example of how the shards can be like: <https://stackoverflow.com/questions/72272034/list-shard-show-more-shards-than-provisioned>
457pub fn reassign_splits<T>(
458    fragment_id: FragmentId,
459    actor_splits: HashMap<ActorId, Vec<T>>,
460    discovered_splits: &BTreeMap<SplitId, T>,
461    opts: SplitDiffOptions,
462) -> Option<HashMap<ActorId, Vec<T>>>
463where
464    T: SplitMetaData + Clone,
465{
466    // if no actors, return
467    if actor_splits.is_empty() {
468        return None;
469    }
470
471    let prev_split_ids: HashSet<_> = actor_splits
472        .values()
473        .flat_map(|splits| splits.iter().map(SplitMetaData::id))
474        .collect();
475
476    tracing::trace!(fragment_id, prev_split_ids = ?prev_split_ids, "previous splits");
477    tracing::trace!(fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits");
478
479    let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
480
481    let dropped_splits: HashSet<_> = prev_split_ids
482        .difference(&discovered_split_ids)
483        .cloned()
484        .collect();
485
486    if !dropped_splits.is_empty() {
487        if opts.enable_scale_in {
488            tracing::info!(fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits");
489        } else {
490            tracing::warn!(fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed");
491        }
492    }
493
494    let new_discovered_splits: BTreeSet<_> = discovered_split_ids
495        .into_iter()
496        .filter(|split_id| !prev_split_ids.contains(split_id))
497        .collect();
498
499    if opts.enable_scale_in || opts.enable_adaptive {
500        // if we support scale in, no more splits are discovered, and no splits are dropped, return
501        // we need to check if discovered_split_ids is empty, because if it is empty, we need to
502        // handle the case of scale in to zero (like deleting all objects from s3)
503        if dropped_splits.is_empty()
504            && new_discovered_splits.is_empty()
505            && !discovered_splits.is_empty()
506        {
507            return None;
508        }
509    } else {
510        // if we do not support scale in, and no more splits are discovered, return
511        if new_discovered_splits.is_empty() && !discovered_splits.is_empty() {
512            return None;
513        }
514    }
515
516    tracing::info!(fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits");
517
518    let mut heap = BinaryHeap::with_capacity(actor_splits.len());
519
520    for (actor_id, mut splits) in actor_splits {
521        if opts.enable_scale_in || opts.enable_adaptive {
522            splits.retain(|split| !dropped_splits.contains(&split.id()));
523        }
524
525        heap.push(ActorSplitsAssignment { actor_id, splits })
526    }
527
528    for split_id in new_discovered_splits {
529        // ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e.,
530        // we get the assignment with the least splits here.
531
532        // Note: If multiple actors have the same number of splits, it will be randomly picked.
533        // When the number of source actors is larger than the number of splits,
534        // It's possible that the assignment is uneven.
535        // e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158
536        // TODO: We should make the assignment rack-aware to make sure it's even.
537        let mut peek_ref = heap.peek_mut().unwrap();
538        peek_ref
539            .splits
540            .push(discovered_splits.get(&split_id).cloned().unwrap());
541    }
542
543    Some(
544        heap.into_iter()
545            .map(|ActorSplitsAssignment { actor_id, splits }| (actor_id, splits))
546            .collect(),
547    )
548}
549
550pub enum SplitAlignmentContext<'a> {
551    Stored(&'a SharedActorInfos),
552    Pending(&'a HashMap<ActorId, Vec<SplitImpl>>),
553}
554
555/// Assign splits to a new set of actors, according to existing assignment.
556///
557/// illustration:
558/// ```text
559/// upstream                               new
560/// actor x1 [split 1, split2]      ->     actor y1 [split 1, split2]
561/// actor x2 [split 3]              ->     actor y2 [split 3]
562/// ...
563/// ```
564fn align_splits(
565    // (actor_id, upstream_actor_id)
566    aligned_actors: impl IntoIterator<Item = (ActorId, ActorId)>,
567    alignment_context: SplitAlignmentContext<'_>,
568    fragment_id: FragmentId,
569    upstream_source_fragment_id: FragmentId,
570) -> anyhow::Result<HashMap<ActorId, Vec<SplitImpl>>> {
571    aligned_actors
572        .into_iter()
573        .map(|(actor_id, upstream_actor_id)| {
574            let Some(splits) = (match alignment_context {
575                SplitAlignmentContext::Stored(info) => {
576                    let guard = info.read_guard();
577                    let actor_info = guard.iter_over_fragments().find_map(|(_, fragment_info)| fragment_info.actors.get(&upstream_actor_id));
578                    actor_info.map(|info| info.splits.clone())
579                }
580                SplitAlignmentContext::Pending(existing_assignment) => {
581                    existing_assignment.get(&upstream_actor_id).cloned()
582                }
583            }) else {
584                return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_actor_id: {upstream_actor_id:?}"));
585            };
586
587            Ok((
588                actor_id,
589                splits,
590            ))
591        })
592        .collect()
593}
594
595/// Note: the `PartialEq` and `Ord` impl just compares the number of splits.
596#[derive(Debug)]
597struct ActorSplitsAssignment<T: SplitMetaData> {
598    actor_id: ActorId,
599    splits: Vec<T>,
600}
601
602impl<T: SplitMetaData + Clone> Eq for ActorSplitsAssignment<T> {}
603
604impl<T: SplitMetaData + Clone> PartialEq<Self> for ActorSplitsAssignment<T> {
605    fn eq(&self, other: &Self) -> bool {
606        self.splits.len() == other.splits.len()
607    }
608}
609
610impl<T: SplitMetaData + Clone> PartialOrd<Self> for ActorSplitsAssignment<T> {
611    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
612        Some(self.cmp(other))
613    }
614}
615
616impl<T: SplitMetaData + Clone> Ord for ActorSplitsAssignment<T> {
617    fn cmp(&self, other: &Self) -> Ordering {
618        // Note: this is reversed order, to make BinaryHeap a min heap.
619        other
620            .splits
621            .len()
622            .cmp(&self.splits.len())
623            .then(self.actor_id.cmp(&other.actor_id))
624    }
625}
626
627#[derive(Debug)]
628pub struct SplitDiffOptions {
629    pub enable_scale_in: bool,
630
631    /// For most connectors, this should be false. When enabled, RisingWave will not track any progress.
632    pub enable_adaptive: bool,
633}
634
635#[allow(clippy::derivable_impls)]
636impl Default for SplitDiffOptions {
637    fn default() -> Self {
638        SplitDiffOptions {
639            enable_scale_in: false,
640            enable_adaptive: false,
641        }
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use std::collections::{BTreeMap, HashMap, HashSet};
648
649    use risingwave_common::types::JsonbVal;
650    use risingwave_connector::error::ConnectorResult;
651    use risingwave_connector::source::{SplitId, SplitMetaData};
652    use serde::{Deserialize, Serialize};
653
654    use super::*;
655    use crate::model::{ActorId, FragmentId};
656
657    #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
658    struct TestSplit {
659        id: u32,
660    }
661
662    impl SplitMetaData for TestSplit {
663        fn id(&self) -> SplitId {
664            format!("{}", self.id).into()
665        }
666
667        fn encode_to_json(&self) -> JsonbVal {
668            serde_json::to_value(*self).unwrap().into()
669        }
670
671        fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
672            serde_json::from_value(value.take()).map_err(Into::into)
673        }
674
675        fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> {
676            Ok(())
677        }
678    }
679
680    fn check_all_splits(
681        discovered_splits: &BTreeMap<SplitId, TestSplit>,
682        diff: &HashMap<ActorId, Vec<TestSplit>>,
683    ) {
684        let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
685
686        for splits in diff.values() {
687            for split in splits {
688                assert!(split_ids.remove(&split.id()))
689            }
690        }
691
692        assert!(split_ids.is_empty());
693    }
694
695    #[test]
696    fn test_drop_splits() {
697        let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
698        actor_splits.insert(0, vec![TestSplit { id: 0 }, TestSplit { id: 1 }]);
699        actor_splits.insert(1, vec![TestSplit { id: 2 }, TestSplit { id: 3 }]);
700        actor_splits.insert(2, vec![TestSplit { id: 4 }, TestSplit { id: 5 }]);
701
702        let mut prev_split_to_actor = HashMap::new();
703        for (actor_id, splits) in &actor_splits {
704            for split in splits {
705                prev_split_to_actor.insert(split.id(), *actor_id);
706            }
707        }
708
709        let discovered_splits: BTreeMap<SplitId, TestSplit> = (1..5)
710            .map(|i| {
711                let split = TestSplit { id: i };
712                (split.id(), split)
713            })
714            .collect();
715
716        let opts = SplitDiffOptions {
717            enable_scale_in: true,
718            enable_adaptive: false,
719        };
720
721        let prev_split_ids: HashSet<_> = actor_splits
722            .values()
723            .flat_map(|splits| splits.iter().map(|split| split.id()))
724            .collect();
725
726        let diff = reassign_splits(
727            FragmentId::default(),
728            actor_splits,
729            &discovered_splits,
730            opts,
731        )
732        .unwrap();
733        check_all_splits(&discovered_splits, &diff);
734
735        let mut after_split_to_actor = HashMap::new();
736        for (actor_id, splits) in &diff {
737            for split in splits {
738                after_split_to_actor.insert(split.id(), *actor_id);
739            }
740        }
741
742        let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
743
744        let retained_split_ids: HashSet<_> =
745            prev_split_ids.intersection(&discovered_split_ids).collect();
746
747        for retained_split_id in retained_split_ids {
748            assert_eq!(
749                prev_split_to_actor.get(retained_split_id),
750                after_split_to_actor.get(retained_split_id)
751            )
752        }
753    }
754
755    #[test]
756    fn test_drop_splits_to_empty() {
757        let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
758        actor_splits.insert(0, vec![TestSplit { id: 0 }]);
759
760        let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
761
762        let opts = SplitDiffOptions {
763            enable_scale_in: true,
764            enable_adaptive: false,
765        };
766
767        let diff = reassign_splits(
768            FragmentId::default(),
769            actor_splits,
770            &discovered_splits,
771            opts,
772        )
773        .unwrap();
774
775        assert!(!diff.is_empty())
776    }
777
778    #[test]
779    fn test_reassign_splits() {
780        let actor_splits = HashMap::new();
781        let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
782        assert!(
783            reassign_splits(
784                FragmentId::default(),
785                actor_splits,
786                &discovered_splits,
787                Default::default()
788            )
789            .is_none()
790        );
791
792        let actor_splits = (0..3).map(|i| (i, vec![])).collect();
793        let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
794        let diff = reassign_splits(
795            FragmentId::default(),
796            actor_splits,
797            &discovered_splits,
798            Default::default(),
799        )
800        .unwrap();
801        assert_eq!(diff.len(), 3);
802        for splits in diff.values() {
803            assert!(splits.is_empty())
804        }
805
806        let actor_splits = (0..3).map(|i| (i, vec![])).collect();
807        let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..3)
808            .map(|i| {
809                let split = TestSplit { id: i };
810                (split.id(), split)
811            })
812            .collect();
813
814        let diff = reassign_splits(
815            FragmentId::default(),
816            actor_splits,
817            &discovered_splits,
818            Default::default(),
819        )
820        .unwrap();
821        assert_eq!(diff.len(), 3);
822        for splits in diff.values() {
823            assert_eq!(splits.len(), 1);
824        }
825
826        check_all_splits(&discovered_splits, &diff);
827
828        let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
829        let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
830            .map(|i| {
831                let split = TestSplit { id: i };
832                (split.id(), split)
833            })
834            .collect();
835
836        let diff = reassign_splits(
837            FragmentId::default(),
838            actor_splits,
839            &discovered_splits,
840            Default::default(),
841        )
842        .unwrap();
843        assert_eq!(diff.len(), 3);
844        for splits in diff.values() {
845            let len = splits.len();
846            assert!(len == 1 || len == 2);
847        }
848
849        check_all_splits(&discovered_splits, &diff);
850
851        let mut actor_splits: HashMap<ActorId, Vec<TestSplit>> =
852            (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
853        actor_splits.insert(3, vec![]);
854        actor_splits.insert(4, vec![]);
855
856        let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
857            .map(|i| {
858                let split = TestSplit { id: i };
859                (split.id(), split)
860            })
861            .collect();
862
863        let diff = reassign_splits(
864            FragmentId::default(),
865            actor_splits,
866            &discovered_splits,
867            Default::default(),
868        )
869        .unwrap();
870        assert_eq!(diff.len(), 5);
871        for splits in diff.values() {
872            assert_eq!(splits.len(), 1);
873        }
874
875        check_all_splits(&discovered_splits, &diff);
876    }
877}