risingwave_meta/stream/source_manager/
split_assignment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use itertools::Itertools;
17use risingwave_connector::source::fill_adaptive_split;
18
19use super::*;
20use crate::model::{ActorNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments};
21
22#[derive(Debug, Clone)]
23pub struct SplitState {
24    pub split_assignment: SplitAssignment,
25}
26
27impl SourceManager {
28    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
29    pub fn migrate_splits_for_backfill_actors(
30        &self,
31        fragment_id: FragmentId,
32        upstream_source_fragment_id: FragmentId,
33        curr_actor_ids: &[ActorId],
34        fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
35        no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
36    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
37        // align splits for backfill fragments with its upstream source fragment
38        let actors = no_shuffle_upstream_actor_map
39            .iter()
40            .filter(|(id, _)| curr_actor_ids.contains(id))
41            .map(|(id, upstream_fragment_actors)| {
42                (
43                    *id,
44                    *upstream_fragment_actors
45                        .get(&upstream_source_fragment_id)
46                        .unwrap(),
47                )
48            });
49        let upstream_assignment = fragment_actor_splits
50            .get(&upstream_source_fragment_id)
51            .unwrap();
52        tracing::info!(
53            %fragment_id,
54            %upstream_source_fragment_id,
55            ?upstream_assignment,
56            "migrate_splits_for_backfill_actors"
57        );
58        Ok(align_splits(
59            actors,
60            |upstream_actor_id| upstream_assignment.get(&upstream_actor_id).cloned(),
61            fragment_id,
62            upstream_source_fragment_id,
63        )?)
64    }
65
66    /// Discovers splits for a newly created source executor.
67    /// Returns fragment-level split information. Actor-level assignment
68    /// will happen during Phase 2 inside the barrier worker.
69    #[await_tree::instrument]
70    pub async fn discover_splits(
71        &self,
72        table_fragments: &StreamJobFragments,
73    ) -> MetaResult<SourceSplitAssignment> {
74        let core = self.core.lock().await;
75
76        let source_fragments = table_fragments.stream_source_fragments();
77
78        let mut assigned = HashMap::new();
79
80        for (source_id, _fragments) in source_fragments {
81            let handle = core
82                .managed_sources
83                .get(&source_id)
84                .with_context(|| format!("could not find source {}", source_id))?;
85
86            if handle.splits.lock().await.splits.is_none() {
87                handle.force_tick().await?;
88            }
89
90            let Some(discovered) = handle.discovered_splits(source_id).await? else {
91                tracing::warn!(%source_id, "no splits detected (not ready)");
92                continue;
93            };
94            if let DiscoveredSplits::Fixed(ref splits) = discovered
95                && splits.is_empty()
96            {
97                tracing::warn!(%source_id, "no splits detected");
98                continue;
99            }
100            // Store the discovered splits enum at source level.
101            // The enum is preserved until the barrier worker resolves it
102            // to concrete per-fragment, per-actor splits during actor rendering.
103            assigned.insert(source_id, discovered);
104        }
105
106        Ok(assigned)
107    }
108
109    /// Resolve a [`SourceSplitAssignment`] (source-level, containing [`DiscoveredSplits`])
110    /// into an actor-level [`SplitAssignment`] using the source→fragment mapping and
111    /// the rendered actor IDs for each fragment.
112    ///
113    /// `fragment_actor_ids` provides the actor IDs assigned to each fragment from rendering.
114    /// For `Fixed` splits, splits are distributed across actors via [`reassign_splits`].
115    /// For `Adaptive` splits, `fill_adaptive_split` expands the template per actor count.
116    pub fn resolve_fragment_to_actor_splits(
117        table_fragments: &StreamJobFragments,
118        source_assignment: &SourceSplitAssignment,
119        fragment_actor_ids: &HashMap<FragmentId, Vec<ActorId>>,
120    ) -> MetaResult<SplitAssignment> {
121        let source_fragments = table_fragments.stream_source_fragments();
122        let mut result: SplitAssignment = HashMap::new();
123        for (source_id, discovered) in source_assignment {
124            let fragment_ids = source_fragments.get(source_id).unwrap_or_else(|| {
125                panic!("source {} not found in stream job fragments", source_id)
126            });
127            for fragment_id in fragment_ids {
128                let actor_ids = fragment_actor_ids.get(fragment_id).unwrap_or_else(|| {
129                    panic!("fragment {} not found in rendered actor IDs", fragment_id)
130                });
131                let actor_count = actor_ids.len();
132                match discovered {
133                    DiscoveredSplits::Fixed(splits) => {
134                        let empty_actor_splits: HashMap<ActorId, Vec<SplitImpl>> = actor_ids
135                            .iter()
136                            .map(|actor_id| (*actor_id, vec![]))
137                            .collect();
138                        if let Some(diff) = reassign_splits(
139                            *fragment_id,
140                            empty_actor_splits,
141                            splits,
142                            SplitDiffOptions::default(),
143                        ) {
144                            result.insert(*fragment_id, diff);
145                        }
146                    }
147                    DiscoveredSplits::Adaptive(template) => {
148                        let expanded = fill_adaptive_split(template, actor_count)?;
149                        let expanded_splits: Vec<SplitImpl> = expanded.into_values().collect();
150                        let mut actor_splits = HashMap::new();
151                        for (i, actor_id) in actor_ids.iter().enumerate() {
152                            actor_splits.insert(*actor_id, vec![expanded_splits[i].clone()]);
153                        }
154                        result.insert(*fragment_id, actor_splits);
155                    }
156                }
157            }
158        }
159        Ok(result)
160    }
161
162    /// Phase 1 for replace source: gathers fragment-level split information.
163    ///
164    /// When there are no existing downstream (`upstream_updates` is empty), we can
165    /// re-allocate splits fresh by delegating to [`Self::discover_splits`].
166    /// When there are existing downstream, returns `None` — the actual
167    /// split alignment happens in Phase 2 inside the barrier worker via
168    /// [`Self::resolve_replace_source_splits`].
169    pub async fn discover_splits_for_replace_source(
170        &self,
171        table_fragments: &StreamJobFragments,
172        upstream_updates: &FragmentReplaceUpstream,
173    ) -> MetaResult<Option<SourceSplitAssignment>> {
174        if upstream_updates.is_empty() {
175            // No existing downstream. We can just re-allocate splits arbitrarily.
176            return Ok(Some(self.discover_splits(table_fragments).await?));
177        }
178        // Splits will be aligned with the previous fragment in Phase 2
179        // using new_no_shuffle and inflight database info inside the barrier worker.
180        Ok(None)
181    }
182
183    /// Phase 2 resolve for replace source: align the new source fragment's actor splits
184    /// with the previous source fragment using the `new_no_shuffle` actor mapping and
185    /// the existing split assignment looked up via the provided closure.
186    pub fn resolve_replace_source_splits(
187        table_fragments: &StreamJobFragments,
188        upstream_updates: &FragmentReplaceUpstream,
189        // actor_no_shuffle:
190        //     upstream fragment_id ->
191        //     downstream fragment_id ->
192        //     upstream actor_id ->
193        //     downstream actor_id
194        actor_no_shuffle: &ActorNewNoShuffle,
195        get_upstream_actor_splits: impl Fn(FragmentId, ActorId) -> Option<Vec<SplitImpl>>,
196    ) -> MetaResult<SplitAssignment> {
197        tracing::debug!(?upstream_updates, "allocate_splits_for_replace_source");
198        assert!(!upstream_updates.is_empty());
199
200        let source_fragments = table_fragments.stream_source_fragments();
201        assert_eq!(
202            source_fragments.len(),
203            1,
204            "replace source job should only have one source"
205        );
206        let (_source_id, fragments) = source_fragments.into_iter().next().unwrap();
207        assert_eq!(
208            fragments.len(),
209            1,
210            "replace source job should only have one fragment"
211        );
212        let fragment_id = fragments.into_iter().next().unwrap();
213
214        debug_assert!(
215            upstream_updates.values().flatten().next().is_some()
216                && upstream_updates
217                    .values()
218                    .flatten()
219                    .all(|(_, new_upstream_fragment_id)| {
220                        *new_upstream_fragment_id == fragment_id
221                    })
222                && upstream_updates
223                    .values()
224                    .flatten()
225                    .map(|(upstream_fragment_id, _)| upstream_fragment_id)
226                    .all_equal(),
227            "upstream update should only replace one fragment: {:?}",
228            upstream_updates
229        );
230        let prev_fragment_id = upstream_updates
231            .values()
232            .flatten()
233            .next()
234            .map(|(upstream_fragment_id, _)| *upstream_fragment_id)
235            .expect("non-empty");
236        // Here we align the new source executor to backfill executors
237        //
238        // old_source => new_source            backfill_1
239        // actor_x1   => actor_y1 -----┬------>actor_a1
240        // actor_x2   => actor_y2 -----┼-┬---->actor_a2
241        //                             │ │
242        //                             │ │     backfill_2
243        //                             └─┼---->actor_b1
244        //                               └---->actor_b2
245        //
246        // Note: we can choose any backfill actor to align here.
247        // We use `HashMap` to dedup.
248        let aligned_actors: HashMap<ActorId, ActorId> = actor_no_shuffle
249            .get(&fragment_id)
250            .map(HashMap::values)
251            .into_iter()
252            .flatten()
253            .flatten()
254            .map(|(upstream_actor_id, actor_id)| (*upstream_actor_id, *actor_id))
255            .collect();
256        let assignment = align_splits(
257            aligned_actors.into_iter(),
258            |actor_id| get_upstream_actor_splits(prev_fragment_id, actor_id),
259            fragment_id,
260            prev_fragment_id,
261        )?;
262        Ok(HashMap::from([(fragment_id, assignment)]))
263    }
264
265    /// Phase 2 resolve for source backfill: align each backfill fragment's actor splits
266    /// with its upstream source fragment using the `new_no_shuffle` actor mapping and
267    /// the existing split assignment looked up via the provided closure.
268    pub fn resolve_backfill_splits(
269        table_fragments: &StreamJobFragments,
270        actor_no_shuffle: &ActorNewNoShuffle,
271        get_upstream_actor_splits: impl Fn(FragmentId, ActorId) -> Option<Vec<SplitImpl>>,
272    ) -> MetaResult<SplitAssignment> {
273        let source_backfill_fragments = table_fragments.source_backfill_fragments();
274
275        let mut assigned = HashMap::new();
276
277        for (_source_id, fragments) in source_backfill_fragments {
278            for (fragment_id, upstream_source_fragment_id) in fragments {
279                // Get upstream actors from actor_no_shuffle mapping
280                let upstream_actors: HashSet<ActorId> = actor_no_shuffle
281                    .get(&upstream_source_fragment_id)
282                    .and_then(|m| m.get(&fragment_id))
283                    .ok_or_else(|| {
284                        anyhow!(
285                            "no upstream actors found from fragment {} to upstream source fragment {}",
286                            fragment_id,
287                            upstream_source_fragment_id
288                        )
289                    })?.keys().copied().collect();
290
291                let mut backfill_actors = vec![];
292                let Some(source_new_no_shuffle) = actor_no_shuffle
293                    .get(&upstream_source_fragment_id)
294                    .and_then(|source_upstream_actor_no_shuffle| {
295                        source_upstream_actor_no_shuffle.get(&fragment_id)
296                    })
297                else {
298                    return Err(anyhow::anyhow!(
299                            "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_no_shuffle: {actor_no_shuffle:?}",
300                            fragment_id = fragment_id,
301                            upstream_source_fragment_id = upstream_source_fragment_id,
302                            actor_no_shuffle = actor_no_shuffle,
303                        ).into());
304                };
305                for upstream_actor in &upstream_actors {
306                    let Some(no_shuffle_backfill_actor) = source_new_no_shuffle.get(upstream_actor)
307                    else {
308                        return Err(anyhow::anyhow!(
309                            "source backfill fragment's upstream fragment should have one-on-one no_shuffle mapping, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, source_new_no_shuffle: {source_new_no_shuffle:?}",
310                            fragment_id = fragment_id,
311                            upstream_source_fragment_id = upstream_source_fragment_id,
312                            upstream_actor = upstream_actor,
313                            source_new_no_shuffle = source_new_no_shuffle
314                        ).into());
315                    };
316                    backfill_actors.push((*no_shuffle_backfill_actor, *upstream_actor));
317                }
318                assigned.insert(
319                    fragment_id,
320                    align_splits(
321                        backfill_actors,
322                        |actor_id| get_upstream_actor_splits(upstream_source_fragment_id, actor_id),
323                        fragment_id,
324                        upstream_source_fragment_id,
325                    )?,
326                );
327            }
328        }
329
330        Ok(assigned)
331    }
332}
333
334impl SourceManagerCore {
335    /// Checks whether the external source metadata has changed,
336    /// and re-assigns splits if there's a diff.
337    ///
338    /// `self.actor_splits` will not be updated. It will be updated by `Self::apply_source_change`,
339    /// after the mutation barrier has been collected.
340    pub async fn reassign_splits(&self) -> MetaResult<HashMap<DatabaseId, SplitState>> {
341        let mut split_assignment: SplitAssignment = HashMap::new();
342
343        'loop_source: for (source_id, handle) in &self.managed_sources {
344            let source_fragment_ids = match self.source_fragments.get(source_id) {
345                Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids,
346                _ => {
347                    continue;
348                }
349            };
350            let backfill_fragment_ids = self.backfill_fragments.get(source_id);
351
352            'loop_fragment: for &fragment_id in source_fragment_ids {
353                let actors = match self
354                    .metadata_manager
355                    .get_running_actors_of_fragment(fragment_id)
356                {
357                    Ok(actors) => {
358                        if actors.is_empty() {
359                            tracing::warn!("No actors found for fragment {}", fragment_id);
360                            continue 'loop_fragment;
361                        }
362                        actors
363                    }
364                    Err(err) => {
365                        tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
366                        continue 'loop_fragment;
367                    }
368                };
369
370                let Some(discovered) = handle.discovered_splits(*source_id).await? else {
371                    // The discover loop for this source is not ready yet; we'll wait for the next run
372                    continue 'loop_source;
373                };
374                let discovered_splits = match discovered {
375                    DiscoveredSplits::Fixed(splits) => {
376                        if splits.is_empty() {
377                            continue 'loop_source;
378                        }
379                        splits
380                    }
381                    DiscoveredSplits::Adaptive(template) => {
382                        fill_adaptive_split(&template, actors.len())?
383                    }
384                };
385
386                let prev_actor_splits = {
387                    let guard = self.env.shared_actor_infos().read_guard();
388
389                    guard
390                        .get_fragment(fragment_id)
391                        .and_then(|info| {
392                            info.actors
393                                .iter()
394                                .map(|(actor_id, actor_info)| {
395                                    (*actor_id, actor_info.splits.clone())
396                                })
397                                .collect::<HashMap<_, _>>()
398                                .into()
399                        })
400                        .unwrap_or_default()
401                };
402
403                if let Some(new_assignment) = reassign_splits(
404                    fragment_id,
405                    prev_actor_splits,
406                    &discovered_splits,
407                    SplitDiffOptions {
408                        enable_scale_in: handle.enable_drop_split,
409                        enable_adaptive: handle.enable_adaptive_splits,
410                    },
411                ) {
412                    split_assignment.insert(fragment_id, new_assignment);
413                }
414            }
415
416            if let Some(backfill_fragment_ids) = backfill_fragment_ids {
417                // align splits for backfill fragments with its upstream source fragment
418                for (fragment_id, upstream_fragment_id) in backfill_fragment_ids {
419                    let Some(upstream_assignment): Option<&HashMap<ActorId, Vec<SplitImpl>>> =
420                        split_assignment.get(upstream_fragment_id)
421                    else {
422                        // upstream fragment unchanged, do not update backfill fragment too
423                        continue;
424                    };
425                    let actors = match self
426                        .metadata_manager
427                        .get_running_actors_for_source_backfill(*fragment_id, *upstream_fragment_id)
428                        .await
429                    {
430                        Ok(actors) => {
431                            if actors.is_empty() {
432                                tracing::warn!("No actors found for fragment {}", fragment_id);
433                                continue;
434                            }
435                            actors
436                        }
437                        Err(err) => {
438                            tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
439                            continue;
440                        }
441                    };
442                    split_assignment.insert(
443                        *fragment_id,
444                        align_splits(
445                            actors,
446                            |upstream_actor_id| {
447                                upstream_assignment.get(&upstream_actor_id).cloned()
448                            },
449                            *fragment_id,
450                            *upstream_fragment_id,
451                        )?,
452                    );
453                }
454            }
455        }
456
457        let assignments = self
458            .metadata_manager
459            .split_fragment_map_by_database(split_assignment)
460            .await?;
461
462        let mut result = HashMap::new();
463        for (database_id, assignment) in assignments {
464            result.insert(
465                database_id,
466                SplitState {
467                    split_assignment: assignment,
468                },
469            );
470        }
471
472        Ok(result)
473    }
474}
475
476/// Reassigns splits if there are new splits or dropped splits,
477/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled.
478///
479/// The existing splits will remain unmoved in their currently assigned actor.
480///
481/// If an actor has an upstream actor, it should be a backfill executor,
482/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case.
483/// Use [`align_splits`] instead.**
484///
485/// - `fragment_id`: just for logging
486///
487/// ## Different connectors' behavior of split change
488///
489/// ### Kafka and Pulsar
490/// They only support increasing the number of splits via adding new empty splits.
491/// Old data is not moved.
492///
493/// ### Kinesis
494/// It supports *pairwise* shard split and merge.
495///
496/// In both cases, old data remain in the old shard(s) and the old shard is still available.
497/// New data are routed to the new shard(s).
498/// After the retention period has expired, the old shard will become `EXPIRED` and isn't
499/// listed any more. In other words, the total number of shards will first increase and then decrease.
500///
501/// See also:
502/// - [Kinesis resharding doc](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing)
503/// - An example of how the shards can be like: <https://stackoverflow.com/questions/72272034/list-shard-show-more-shards-than-provisioned>
504pub fn reassign_splits<I, T>(
505    fragment_id: FragmentId,
506    actor_splits: HashMap<I, Vec<T>>,
507    discovered_splits: &BTreeMap<SplitId, T>,
508    opts: SplitDiffOptions,
509) -> Option<HashMap<I, Vec<T>>>
510where
511    I: Ord + std::hash::Hash + Eq,
512    T: SplitMetaData + Clone,
513{
514    // if no actors, return
515    if actor_splits.is_empty() {
516        return None;
517    }
518
519    let prev_split_ids: HashSet<_> = actor_splits
520        .values()
521        .flat_map(|splits| splits.iter().map(SplitMetaData::id))
522        .collect();
523
524    tracing::trace!(%fragment_id, prev_split_ids = ?prev_split_ids, "previous splits");
525    tracing::trace!(%fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits");
526
527    let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
528
529    let dropped_splits: HashSet<_> = prev_split_ids
530        .difference(&discovered_split_ids)
531        .cloned()
532        .collect();
533
534    if !dropped_splits.is_empty() {
535        if opts.enable_scale_in {
536            tracing::info!(%fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits");
537        } else {
538            tracing::warn!(%fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed");
539        }
540    }
541
542    let new_discovered_splits: BTreeSet<_> = discovered_split_ids
543        .into_iter()
544        .filter(|split_id| !prev_split_ids.contains(split_id))
545        .collect();
546
547    if opts.enable_scale_in || opts.enable_adaptive {
548        // if we support scale in, no more splits are discovered, and no splits are dropped, return
549        // we need to check if discovered_split_ids is empty, because if it is empty, we need to
550        // handle the case of scale in to zero (like deleting all objects from s3)
551        if dropped_splits.is_empty()
552            && new_discovered_splits.is_empty()
553            && !discovered_splits.is_empty()
554        {
555            return None;
556        }
557    } else {
558        // if we do not support scale in, and no more splits are discovered, return
559        if new_discovered_splits.is_empty() && !discovered_splits.is_empty() {
560            return None;
561        }
562    }
563
564    tracing::info!(%fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits");
565
566    let mut heap = BinaryHeap::with_capacity(actor_splits.len());
567
568    for (actor_id, mut splits) in actor_splits {
569        if opts.enable_scale_in || opts.enable_adaptive {
570            splits.retain(|split| !dropped_splits.contains(&split.id()));
571        }
572
573        heap.push(SplitsAssignment { actor_id, splits })
574    }
575
576    for split_id in new_discovered_splits {
577        // SplitsAssignment's Ord is reversed, so this is min heap, i.e.,
578        // we get the assignment with the least splits here.
579
580        // Note: If multiple actors have the same number of splits, it will be randomly picked.
581        // When the number of source actors is larger than the number of splits,
582        // It's possible that the assignment is uneven.
583        // e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158
584        // TODO: We should make the assignment rack-aware to make sure it's even.
585        let mut peek_ref = heap.peek_mut().unwrap();
586        peek_ref
587            .splits
588            .push(discovered_splits.get(&split_id).cloned().unwrap());
589    }
590
591    Some(
592        heap.into_iter()
593            .map(|SplitsAssignment { actor_id, splits }| (actor_id, splits))
594            .collect(),
595    )
596}
597
598/// Assign splits to a new set of actors, according to existing assignment.
599///
600/// The `get_upstream_actor_splits` closure looks up the current splits for a given
601/// upstream actor ID. How exactly this lookup works depends on the caller:
602/// - Inside the barrier worker, it reads from `InflightDatabaseInfo`.
603/// - During reassignment, it reads from the pending upstream assignment.
604///
605/// illustration:
606/// ```text
607/// upstream                               new
608/// actor x1 [split 1, split2]      ->     actor y1 [split 1, split2]
609/// actor x2 [split 3]              ->     actor y2 [split 3]
610/// ...
611/// ```
612pub fn align_splits(
613    // (actor_id, upstream_actor_id)
614    aligned_actors: impl IntoIterator<Item = (ActorId, ActorId)>,
615    get_upstream_actor_splits: impl Fn(ActorId) -> Option<Vec<SplitImpl>>,
616    fragment_id: FragmentId,
617    upstream_source_fragment_id: FragmentId,
618) -> anyhow::Result<HashMap<ActorId, Vec<SplitImpl>>> {
619    aligned_actors
620        .into_iter()
621        .map(|(actor_id, upstream_actor_id)| {
622            let Some(splits) = get_upstream_actor_splits(upstream_actor_id) else {
623                return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_actor_id: {upstream_actor_id:?}"));
624            };
625
626            Ok((
627                actor_id,
628                splits,
629            ))
630        })
631        .collect()
632}
633
634/// Note: the `PartialEq` and `Ord` impl just compares the number of splits.
635#[derive(Debug)]
636struct SplitsAssignment<I, T: SplitMetaData> {
637    actor_id: I,
638    splits: Vec<T>,
639}
640
641impl<I, T: SplitMetaData + Clone> Eq for SplitsAssignment<I, T> {}
642
643impl<I, T: SplitMetaData + Clone> PartialEq<Self> for SplitsAssignment<I, T> {
644    fn eq(&self, other: &Self) -> bool {
645        self.splits.len() == other.splits.len()
646    }
647}
648
649impl<I: Ord, T: SplitMetaData + Clone> PartialOrd<Self> for SplitsAssignment<I, T> {
650    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
651        Some(self.cmp(other))
652    }
653}
654
655impl<I: Ord, T: SplitMetaData + Clone> Ord for SplitsAssignment<I, T> {
656    fn cmp(&self, other: &Self) -> Ordering {
657        // Note: this is reversed order, to make BinaryHeap a min heap.
658        other
659            .splits
660            .len()
661            .cmp(&self.splits.len())
662            .then(self.actor_id.cmp(&other.actor_id))
663    }
664}
665
666#[derive(Debug)]
667pub struct SplitDiffOptions {
668    pub enable_scale_in: bool,
669
670    /// For most connectors, this should be false. When enabled, RisingWave will not track any progress.
671    pub enable_adaptive: bool,
672}
673
674#[allow(clippy::derivable_impls)]
675impl Default for SplitDiffOptions {
676    fn default() -> Self {
677        SplitDiffOptions {
678            enable_scale_in: false,
679            enable_adaptive: false,
680        }
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use std::collections::{BTreeMap, HashMap, HashSet};
687
688    use risingwave_common::types::JsonbVal;
689    use risingwave_connector::error::ConnectorResult;
690    use risingwave_connector::source::{SplitId, SplitMetaData};
691    use serde::{Deserialize, Serialize};
692
693    use super::*;
694    use crate::model::{ActorId, FragmentId};
695
696    #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
697    struct TestSplit {
698        id: u32,
699    }
700
701    impl SplitMetaData for TestSplit {
702        fn id(&self) -> SplitId {
703            format!("{}", self.id).into()
704        }
705
706        fn encode_to_json(&self) -> JsonbVal {
707            serde_json::to_value(*self).unwrap().into()
708        }
709
710        fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
711            serde_json::from_value(value.take()).map_err(Into::into)
712        }
713
714        fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> {
715            Ok(())
716        }
717    }
718
719    fn check_all_splits(
720        discovered_splits: &BTreeMap<SplitId, TestSplit>,
721        diff: &HashMap<ActorId, Vec<TestSplit>>,
722    ) {
723        let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
724
725        for splits in diff.values() {
726            for split in splits {
727                assert!(split_ids.remove(&split.id()))
728            }
729        }
730
731        assert!(split_ids.is_empty());
732    }
733
734    #[test]
735    fn test_drop_splits() {
736        let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
737        actor_splits.insert(0.into(), vec![TestSplit { id: 0 }, TestSplit { id: 1 }]);
738        actor_splits.insert(1.into(), vec![TestSplit { id: 2 }, TestSplit { id: 3 }]);
739        actor_splits.insert(2.into(), vec![TestSplit { id: 4 }, TestSplit { id: 5 }]);
740
741        let mut prev_split_to_actor = HashMap::new();
742        for (actor_id, splits) in &actor_splits {
743            for split in splits {
744                prev_split_to_actor.insert(split.id(), *actor_id);
745            }
746        }
747
748        let discovered_splits: BTreeMap<SplitId, TestSplit> = (1..5)
749            .map(|i| {
750                let split = TestSplit { id: i };
751                (split.id(), split)
752            })
753            .collect();
754
755        let opts = SplitDiffOptions {
756            enable_scale_in: true,
757            enable_adaptive: false,
758        };
759
760        let prev_split_ids: HashSet<_> = actor_splits
761            .values()
762            .flat_map(|splits| splits.iter().map(|split| split.id()))
763            .collect();
764
765        let diff = reassign_splits(
766            FragmentId::default(),
767            actor_splits,
768            &discovered_splits,
769            opts,
770        )
771        .unwrap();
772        check_all_splits(&discovered_splits, &diff);
773
774        let mut after_split_to_actor = HashMap::new();
775        for (actor_id, splits) in &diff {
776            for split in splits {
777                after_split_to_actor.insert(split.id(), *actor_id);
778            }
779        }
780
781        let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
782
783        let retained_split_ids: HashSet<_> =
784            prev_split_ids.intersection(&discovered_split_ids).collect();
785
786        for retained_split_id in retained_split_ids {
787            assert_eq!(
788                prev_split_to_actor.get(retained_split_id),
789                after_split_to_actor.get(retained_split_id)
790            )
791        }
792    }
793
794    #[test]
795    fn test_drop_splits_to_empty() {
796        let mut actor_splits: HashMap<ActorId, _> = HashMap::new();
797        actor_splits.insert(0.into(), vec![TestSplit { id: 0 }]);
798
799        let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
800
801        let opts = SplitDiffOptions {
802            enable_scale_in: true,
803            enable_adaptive: false,
804        };
805
806        let diff = reassign_splits(
807            FragmentId::default(),
808            actor_splits,
809            &discovered_splits,
810            opts,
811        )
812        .unwrap();
813
814        assert!(!diff.is_empty())
815    }
816
817    #[test]
818    fn test_reassign_splits() {
819        let actor_splits: HashMap<u32, _> = HashMap::new();
820        let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
821        assert!(
822            reassign_splits(
823                FragmentId::default(),
824                actor_splits,
825                &discovered_splits,
826                Default::default()
827            )
828            .is_none()
829        );
830
831        let actor_splits = (0..3).map(|i| (i, vec![])).collect();
832        let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
833        let diff = reassign_splits(
834            FragmentId::default(),
835            actor_splits,
836            &discovered_splits,
837            Default::default(),
838        )
839        .unwrap();
840        assert_eq!(diff.len(), 3);
841        for splits in diff.values() {
842            assert!(splits.is_empty())
843        }
844
845        let actor_splits = (0..3).map(|i| (i.into(), vec![])).collect();
846        let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..3)
847            .map(|i| {
848                let split = TestSplit { id: i };
849                (split.id(), split)
850            })
851            .collect();
852
853        let diff = reassign_splits(
854            FragmentId::default(),
855            actor_splits,
856            &discovered_splits,
857            Default::default(),
858        )
859        .unwrap();
860        assert_eq!(diff.len(), 3);
861        for splits in diff.values() {
862            assert_eq!(splits.len(), 1);
863        }
864
865        check_all_splits(&discovered_splits, &diff);
866
867        let actor_splits = (0..3)
868            .map(|i| (i.into(), vec![TestSplit { id: i }]))
869            .collect();
870        let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
871            .map(|i| {
872                let split = TestSplit { id: i };
873                (split.id(), split)
874            })
875            .collect();
876
877        let diff = reassign_splits(
878            FragmentId::default(),
879            actor_splits,
880            &discovered_splits,
881            Default::default(),
882        )
883        .unwrap();
884        assert_eq!(diff.len(), 3);
885        for splits in diff.values() {
886            let len = splits.len();
887            assert!(len == 1 || len == 2);
888        }
889
890        check_all_splits(&discovered_splits, &diff);
891
892        let mut actor_splits: HashMap<ActorId, Vec<TestSplit>> = (0..3)
893            .map(|i| (i.into(), vec![TestSplit { id: i }]))
894            .collect();
895        actor_splits.insert(3.into(), vec![]);
896        actor_splits.insert(4.into(), vec![]);
897
898        let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
899            .map(|i| {
900                let split = TestSplit { id: i };
901                (split.id(), split)
902            })
903            .collect();
904
905        let diff = reassign_splits(
906            FragmentId::default(),
907            actor_splits,
908            &discovered_splits,
909            Default::default(),
910        )
911        .unwrap();
912        assert_eq!(diff.len(), 5);
913        for splits in diff.values() {
914            assert_eq!(splits.len(), 1);
915        }
916
917        check_all_splits(&discovered_splits, &diff);
918    }
919}