risingwave_meta/stream/stream_graph/
schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(
16    clippy::collapsible_if,
17    clippy::explicit_iter_loop,
18    reason = "generated by crepe"
19)]
20
21use std::collections::{BTreeMap, HashMap};
22use std::num::NonZeroUsize;
23
24use anyhow::Context;
25use either::Either;
26use enum_as_inner::EnumAsInner;
27use itertools::Itertools;
28use risingwave_common::hash::{
29    ActorAlignmentId, ActorAlignmentMapping, ActorMapping, VnodeCountCompat,
30};
31use risingwave_common::util::stream_graph_visitor::visit_fragment;
32use risingwave_common::{bail, hash};
33use risingwave_connector::source::cdc::{CDC_BACKFILL_MAX_PARALLELISM, CdcScanOptions};
34use risingwave_meta_model::WorkerId;
35use risingwave_meta_model::fragment::DistributionType;
36use risingwave_pb::common::{ActorInfo, WorkerNode};
37use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
38use risingwave_pb::stream_plan::DispatcherType::{self, *};
39
40use crate::MetaResult;
41use crate::barrier::SharedFragmentInfo;
42use crate::model::ActorId;
43use crate::stream::AssignerBuilder;
44use crate::stream::stream_graph::fragment::CompleteStreamFragmentGraph;
45use crate::stream::stream_graph::id::GlobalFragmentId as Id;
46
47type HashMappingId = usize;
48
49/// The internal structure for processing scheduling requirements in the scheduler.
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
51enum Req {
52    /// The fragment must be singleton and is scheduled to the given worker id.
53    Singleton(WorkerId),
54    /// The fragment must be hash-distributed and is scheduled by the given hash mapping.
55    Hash(HashMappingId),
56    /// The fragment must have the given vnode count, but can be scheduled anywhere.
57    /// When the vnode count is 1, it means the fragment must be singleton.
58    AnyVnodeCount(usize),
59}
60
61impl Req {
62    /// Equivalent to `Req::AnyVnodeCount(1)`.
63    #[allow(non_upper_case_globals)]
64    const AnySingleton: Self = Self::AnyVnodeCount(1);
65
66    /// Merge two requirements. Returns an error if the requirements are incompatible.
67    ///
68    /// The `mapping_len` function is used to get the vnode count of a hash mapping by its id.
69    fn merge(a: Self, b: Self, mapping_len: impl Fn(HashMappingId) -> usize) -> MetaResult<Self> {
70        // Note that a and b are always different, as they come from a set.
71        let merge = |a, b| match (a, b) {
72            (Self::AnySingleton, Self::Singleton(id)) => Some(Self::Singleton(id)),
73            (Self::AnyVnodeCount(count), Self::Hash(id)) if mapping_len(id) == count => {
74                Some(Self::Hash(id))
75            }
76            _ => None,
77        };
78
79        match merge(a, b).or_else(|| merge(b, a)) {
80            Some(req) => Ok(req),
81            None => bail!("incompatible requirements `{a:?}` and `{b:?}`"),
82        }
83    }
84}
85
86/// Facts as the input of the scheduler.
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
88enum Fact {
89    /// An edge in the fragment graph.
90    Edge {
91        from: Id,
92        to: Id,
93        dt: DispatcherType,
94    },
95    /// A scheduling requirement for a fragment.
96    Req { id: Id, req: Req },
97}
98
99crepe::crepe! {
100    @input
101    struct Input(Fact);
102
103    struct Edge(Id, Id, DispatcherType);
104    struct ExternalReq(Id, Req);
105
106    @output
107    struct Requirement(Id, Req);
108
109    // Extract facts.
110    Edge(from, to, dt) <- Input(f), let Fact::Edge { from, to, dt } = f;
111    Requirement(id, req) <- Input(f), let Fact::Req { id, req } = f;
112
113    // The downstream fragment of a `Simple` edge must be singleton.
114    Requirement(y, Req::AnySingleton) <- Edge(_, y, Simple);
115    // Requirements propagate through `NoShuffle` edges.
116    Requirement(x, d) <- Edge(x, y, NoShuffle), Requirement(y, d);
117    Requirement(y, d) <- Edge(x, y, NoShuffle), Requirement(x, d);
118}
119
120/// The distribution (scheduling result) of a fragment.
121#[derive(Debug, Clone, EnumAsInner)]
122pub(super) enum Distribution {
123    /// The fragment is singleton and is scheduled to the given worker slot.
124    Singleton(WorkerId),
125
126    /// The fragment is hash-distributed and is scheduled by the given hash mapping.
127    Hash(ActorAlignmentMapping),
128}
129
130impl Distribution {
131    /// The parallelism required by the distribution.
132    pub fn parallelism(&self) -> usize {
133        self.actors().count()
134    }
135
136    /// All worker slots required by the distribution.
137    pub fn actors(&self) -> impl Iterator<Item = ActorAlignmentId> + '_ {
138        match self {
139            Distribution::Singleton(p) => {
140                Either::Left(std::iter::once(ActorAlignmentId::new(*p as _, 0)))
141            }
142            Distribution::Hash(mapping) => Either::Right(mapping.iter_unique()),
143        }
144    }
145
146    /// Get the vnode count of the distribution.
147    pub fn vnode_count(&self) -> usize {
148        match self {
149            Distribution::Singleton(_) => 1, // only `SINGLETON_VNODE`
150            Distribution::Hash(mapping) => mapping.len(),
151        }
152    }
153
154    /// Create a distribution from a persisted protobuf `Fragment`.
155    pub fn from_fragment(
156        fragment: &SharedFragmentInfo,
157        actor_location: &HashMap<ActorId, WorkerId>,
158    ) -> Self {
159        match fragment.distribution_type {
160            DistributionType::Single => {
161                let (actor_id, _) = fragment.actors.iter().exactly_one().unwrap();
162                let location = actor_location.get(actor_id).unwrap();
163                Distribution::Singleton(*location)
164            }
165            DistributionType::Hash => {
166                let actor_bitmaps: HashMap<_, _> = fragment
167                    .actors
168                    .iter()
169                    .map(|(actor_id, actor_info)| {
170                        (
171                            *actor_id as hash::ActorId,
172                            actor_info.vnode_bitmap.clone().unwrap(),
173                        )
174                    })
175                    .collect();
176
177                let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
178                let actor_location = actor_location
179                    .iter()
180                    .map(|(&k, &v)| (k, v as u32))
181                    .collect();
182                let mapping = actor_mapping.to_actor_alignment(&actor_location);
183
184                Distribution::Hash(mapping)
185            }
186        }
187    }
188
189    /// Convert the distribution to [`PbFragmentDistributionType`].
190    pub fn to_distribution_type(&self) -> PbFragmentDistributionType {
191        match self {
192            Distribution::Singleton(_) => PbFragmentDistributionType::Single,
193            Distribution::Hash(_) => PbFragmentDistributionType::Hash,
194        }
195    }
196}
197
198/// [`Scheduler`] schedules the distribution of fragments in a stream graph.
199pub(super) struct Scheduler {
200    /// The default hash mapping for hash-distributed fragments, if there's no requirement derived.
201    default_hash_mapping: ActorAlignmentMapping,
202
203    /// The default worker for singleton fragments, if there's no requirement derived.
204    default_singleton_worker: WorkerId,
205
206    /// Use to generate mapping if a vnode count other than the default is required.
207    dynamic_mapping_fn: Box<dyn Fn(usize, Option<usize>) -> anyhow::Result<ActorAlignmentMapping>>,
208}
209
210impl Scheduler {
211    /// Create a new [`Scheduler`] with the given workers and the default parallelism.
212    ///
213    /// Each hash-distributed fragment will be scheduled to at most `default_parallelism` parallel
214    /// units, in a round-robin fashion on all compute nodes. If the `default_parallelism` is
215    /// `None`, all workers will be used.
216    ///
217    /// For different streaming jobs, we even out possible scheduling skew by using the streaming job id as the salt for the scheduling algorithm.
218    pub fn new(
219        streaming_job_id: u32,
220        workers: &HashMap<u32, WorkerNode>,
221        default_parallelism: NonZeroUsize,
222        expected_vnode_count: usize,
223    ) -> MetaResult<Self> {
224        let parallelism = default_parallelism.get();
225        assert!(
226            parallelism <= expected_vnode_count,
227            "parallelism should be limited by vnode count in previous steps"
228        );
229
230        let assigner = AssignerBuilder::new(streaming_job_id).build();
231
232        let worker_weights = workers
233            .iter()
234            .map(|(worker_id, worker)| {
235                (
236                    *worker_id,
237                    NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
238                )
239            })
240            .collect();
241
242        let actor_idxes = (0..parallelism).collect_vec();
243        let vnodes = (0..expected_vnode_count).collect_vec();
244
245        let assignment = assigner.assign_hierarchical(&worker_weights, &actor_idxes, &vnodes)?;
246
247        let default_hash_mapping =
248            ActorAlignmentMapping::from_assignment(assignment, expected_vnode_count);
249
250        let single_actor_idxes = std::iter::once(0).collect_vec();
251
252        let single_assignment =
253            assigner.assign_hierarchical(&worker_weights, &single_actor_idxes, &vnodes)?;
254
255        let default_singleton_worker =
256            single_assignment.keys().exactly_one().cloned().unwrap() as _;
257
258        let dynamic_mapping_fn = Box::new(
259            move |limited_count: usize, force_parallelism: Option<usize>| {
260                let parallelism = if let Some(force_parallelism) = force_parallelism {
261                    force_parallelism.min(limited_count)
262                } else {
263                    parallelism.min(limited_count)
264                };
265                let assignment = assigner.assign_hierarchical(
266                    &worker_weights,
267                    &(0..parallelism).collect_vec(),
268                    &(0..limited_count).collect_vec(),
269                )?;
270
271                let mapping = ActorAlignmentMapping::from_assignment(assignment, limited_count);
272                Ok(mapping)
273            },
274        );
275        Ok(Self {
276            default_hash_mapping,
277            default_singleton_worker,
278            dynamic_mapping_fn,
279        })
280    }
281
282    /// Schedule the given complete graph and returns the distribution of each **building
283    /// fragment**.
284    pub fn schedule(
285        &self,
286        graph: &CompleteStreamFragmentGraph,
287    ) -> MetaResult<HashMap<Id, Distribution>> {
288        let existing_distribution = graph.existing_distribution();
289
290        // Build an index map for all hash mappings.
291        let all_hash_mappings = existing_distribution
292            .values()
293            .flat_map(|dist| dist.as_hash())
294            .cloned()
295            .unique()
296            .collect_vec();
297        let hash_mapping_id: HashMap<_, _> = all_hash_mappings
298            .iter()
299            .enumerate()
300            .map(|(i, m)| (m.clone(), i))
301            .collect();
302
303        let mut facts = Vec::new();
304
305        // Singletons.
306        for (&id, fragment) in graph.building_fragments() {
307            if fragment.requires_singleton {
308                facts.push(Fact::Req {
309                    id,
310                    req: Req::AnySingleton,
311                });
312            }
313        }
314        let mut force_parallelism_fragment_ids: HashMap<_, _> = HashMap::default();
315        // Vnode count requirements: if a fragment is going to look up an existing table,
316        // it must have the same vnode count as that table.
317        for (&id, fragment) in graph.building_fragments() {
318            visit_fragment(fragment, |node| {
319                use risingwave_pb::stream_plan::stream_node::NodeBody;
320                let vnode_count = match node {
321                    NodeBody::StreamScan(node) => {
322                        if let Some(table) = &node.arrangement_table {
323                            table.vnode_count()
324                        } else if let Some(table) = &node.table_desc {
325                            table.vnode_count()
326                        } else {
327                            return;
328                        }
329                    }
330                    NodeBody::TemporalJoin(node) => node.get_table_desc().unwrap().vnode_count(),
331                    NodeBody::BatchPlan(node) => node.get_table_desc().unwrap().vnode_count(),
332                    NodeBody::Lookup(node) => node
333                        .get_arrangement_table_info()
334                        .unwrap()
335                        .get_table_desc()
336                        .unwrap()
337                        .vnode_count(),
338                    NodeBody::StreamCdcScan(node) => {
339                        let Some(ref options) = node.options else {
340                            return;
341                        };
342                        let options = CdcScanOptions::from_proto(options);
343                        if options.is_parallelized_backfill() {
344                            force_parallelism_fragment_ids
345                                .insert(id, options.backfill_parallelism as usize);
346                            CDC_BACKFILL_MAX_PARALLELISM as usize
347                        } else {
348                            return;
349                        }
350                    }
351                    _ => return,
352                };
353                facts.push(Fact::Req {
354                    id,
355                    req: Req::AnyVnodeCount(vnode_count),
356                });
357            });
358        }
359        // Distributions of existing fragments.
360        for (id, dist) in existing_distribution {
361            let req = match dist {
362                Distribution::Singleton(worker_id) => Req::Singleton(worker_id),
363                Distribution::Hash(mapping) => Req::Hash(hash_mapping_id[&mapping]),
364            };
365            facts.push(Fact::Req { id, req });
366        }
367        // Edges.
368        for (from, to, edge) in graph.all_edges() {
369            facts.push(Fact::Edge {
370                from,
371                to,
372                dt: edge.dispatch_strategy.r#type(),
373            });
374        }
375
376        // Run the algorithm to propagate requirements.
377        let mut crepe = Crepe::new();
378        crepe.extend(facts.into_iter().map(Input));
379        let (reqs,) = crepe.run();
380        let reqs = reqs
381            .into_iter()
382            .map(|Requirement(id, req)| (id, req))
383            .into_group_map();
384
385        // Derive scheduling result from requirements.
386        let mut distributions = HashMap::new();
387        for &id in graph.building_fragments().keys() {
388            let dist = match reqs.get(&id) {
389                // Merge all requirements.
390                Some(reqs) => {
391                    let req = (reqs.iter().copied())
392                        .try_reduce(|a, b| Req::merge(a, b, |id| all_hash_mappings[id].len()))
393                        .with_context(|| {
394                            format!("cannot fulfill scheduling requirements for fragment {id:?}")
395                        })?
396                        .unwrap();
397
398                    // Derive distribution from the merged requirement.
399                    match req {
400                        Req::Singleton(worker_id) => Distribution::Singleton(worker_id),
401                        Req::Hash(mapping) => {
402                            Distribution::Hash(all_hash_mappings[mapping].clone())
403                        }
404                        Req::AnySingleton => Distribution::Singleton(self.default_singleton_worker),
405                        Req::AnyVnodeCount(vnode_count) => {
406                            let force_parallelism =
407                                force_parallelism_fragment_ids.get(&id).copied();
408                            let mapping = (self.dynamic_mapping_fn)(vnode_count, force_parallelism)
409                                .with_context(|| {
410                                    format!(
411                                        "failed to build dynamic mapping for fragment {id:?} with vnode count {vnode_count}"
412                                    )
413                                })?;
414
415                            Distribution::Hash(mapping)
416                        }
417                    }
418                }
419                // No requirement, use the default.
420                None => Distribution::Hash(self.default_hash_mapping.clone()),
421            };
422
423            distributions.insert(id, dist);
424        }
425
426        tracing::debug!(?distributions, "schedule fragments");
427
428        Ok(distributions)
429    }
430}
431
432/// [`Locations`] represents the locations of the actors.
433#[cfg_attr(test, derive(Default))]
434pub struct Locations {
435    /// actor location map.
436    pub actor_locations: BTreeMap<ActorId, ActorAlignmentId>,
437    /// worker location map.
438    pub worker_locations: HashMap<WorkerId, WorkerNode>,
439}
440
441impl Locations {
442    /// Returns all actors for every worker node.
443    pub fn worker_actors(&self) -> HashMap<WorkerId, Vec<ActorId>> {
444        self.actor_locations
445            .iter()
446            .map(|(actor_id, alignment_id)| (alignment_id.worker_id() as WorkerId, *actor_id))
447            .into_group_map()
448    }
449
450    /// Returns an iterator of `ActorInfo`.
451    pub fn actor_infos(&self) -> impl Iterator<Item = ActorInfo> + '_ {
452        self.actor_locations
453            .iter()
454            .map(|(actor_id, alignment_id)| ActorInfo {
455                actor_id: *actor_id,
456                host: self.worker_locations[&(alignment_id.worker_id() as WorkerId)]
457                    .host
458                    .clone(),
459            })
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[derive(Debug)]
468    enum Result {
469        DefaultHash,
470        Required(Req),
471    }
472
473    impl Result {
474        #[allow(non_upper_case_globals)]
475        const DefaultSingleton: Self = Self::Required(Req::AnySingleton);
476    }
477
478    fn run_and_merge(
479        facts: impl IntoIterator<Item = Fact>,
480        mapping_len: impl Fn(HashMappingId) -> usize,
481    ) -> MetaResult<HashMap<Id, Req>> {
482        let mut crepe = Crepe::new();
483        crepe.extend(facts.into_iter().map(Input));
484        let (reqs,) = crepe.run();
485
486        let reqs = reqs
487            .into_iter()
488            .map(|Requirement(id, req)| (id, req))
489            .into_group_map();
490
491        let mut merged = HashMap::new();
492        for (id, reqs) in reqs {
493            let req = (reqs.iter().copied())
494                .try_reduce(|a, b| Req::merge(a, b, &mapping_len))
495                .with_context(|| {
496                    format!("cannot fulfill scheduling requirements for fragment {id:?}")
497                })?
498                .unwrap();
499            merged.insert(id, req);
500        }
501
502        Ok(merged)
503    }
504
505    fn test_success(facts: impl IntoIterator<Item = Fact>, expected: HashMap<Id, Result>) {
506        test_success_with_mapping_len(facts, expected, |_| 0);
507    }
508
509    fn test_success_with_mapping_len(
510        facts: impl IntoIterator<Item = Fact>,
511        expected: HashMap<Id, Result>,
512        mapping_len: impl Fn(HashMappingId) -> usize,
513    ) {
514        let reqs = run_and_merge(facts, mapping_len).unwrap();
515
516        for (id, expected) in expected {
517            match (reqs.get(&id), expected) {
518                (None, Result::DefaultHash) => {}
519                (Some(actual), Result::Required(expected)) if *actual == expected => {}
520                (actual, expected) => panic!(
521                    "unexpected result for fragment {id:?}\nactual: {actual:?}\nexpected: {expected:?}"
522                ),
523            }
524        }
525    }
526
527    fn test_failed(facts: impl IntoIterator<Item = Fact>) {
528        run_and_merge(facts, |_| 0).unwrap_err();
529    }
530
531    // 101
532    #[test]
533    fn test_single_fragment_hash() {
534        #[rustfmt::skip]
535        let facts = [];
536
537        let expected = maplit::hashmap! {
538            101.into() => Result::DefaultHash,
539        };
540
541        test_success(facts, expected);
542    }
543
544    // 101
545    #[test]
546    fn test_single_fragment_singleton() {
547        #[rustfmt::skip]
548        let facts = [
549            Fact::Req { id: 101.into(), req: Req::AnySingleton },
550        ];
551
552        let expected = maplit::hashmap! {
553            101.into() => Result::DefaultSingleton,
554        };
555
556        test_success(facts, expected);
557    }
558
559    // 1 -|-> 101 -->
560    //                103 --> 104
561    // 2 -|-> 102 -->
562    #[test]
563    fn test_scheduling_mv_on_mv() {
564        #[rustfmt::skip]
565        let facts = [
566            Fact::Req { id: 1.into(), req: Req::Hash(1) },
567            Fact::Req { id: 2.into(), req: Req::Singleton(0) },
568            Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
569            Fact::Edge { from: 2.into(), to: 102.into(), dt: NoShuffle },
570            Fact::Edge { from: 101.into(), to: 103.into(), dt: Hash },
571            Fact::Edge { from: 102.into(), to: 103.into(), dt: Hash },
572            Fact::Edge { from: 103.into(), to: 104.into(), dt: Simple },
573        ];
574
575        let expected = maplit::hashmap! {
576            101.into() => Result::Required(Req::Hash(1)),
577            102.into() => Result::Required(Req::Singleton(0)),
578            103.into() => Result::DefaultHash,
579            104.into() => Result::DefaultSingleton,
580        };
581
582        test_success(facts, expected);
583    }
584
585    // 1 -|-> 101 --> 103 -->
586    //             X          105
587    // 2 -|-> 102 --> 104 -->
588    #[test]
589    fn test_delta_join() {
590        #[rustfmt::skip]
591        let facts = [
592            Fact::Req { id: 1.into(), req: Req::Hash(1) },
593            Fact::Req { id: 2.into(), req: Req::Hash(2) },
594            Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
595            Fact::Edge { from: 2.into(), to: 102.into(), dt: NoShuffle },
596            Fact::Edge { from: 101.into(), to: 103.into(), dt: NoShuffle },
597            Fact::Edge { from: 102.into(), to: 104.into(), dt: NoShuffle },
598            Fact::Edge { from: 101.into(), to: 104.into(), dt: Hash },
599            Fact::Edge { from: 102.into(), to: 103.into(), dt: Hash },
600            Fact::Edge { from: 103.into(), to: 105.into(), dt: Hash },
601            Fact::Edge { from: 104.into(), to: 105.into(), dt: Hash },
602        ];
603
604        let expected = maplit::hashmap! {
605            101.into() => Result::Required(Req::Hash(1)),
606            102.into() => Result::Required(Req::Hash(2)),
607            103.into() => Result::Required(Req::Hash(1)),
608            104.into() => Result::Required(Req::Hash(2)),
609            105.into() => Result::DefaultHash,
610        };
611
612        test_success(facts, expected);
613    }
614
615    // 1 -|-> 101 -->
616    //                103
617    //        102 -->
618    #[test]
619    fn test_singleton_leaf() {
620        #[rustfmt::skip]
621        let facts = [
622            Fact::Req { id: 1.into(), req: Req::Hash(1) },
623            Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
624            Fact::Req { id: 102.into(), req: Req::AnySingleton }, // like `Now`
625            Fact::Edge { from: 101.into(), to: 103.into(), dt: Hash },
626            Fact::Edge { from: 102.into(), to: 103.into(), dt: Broadcast },
627        ];
628
629        let expected = maplit::hashmap! {
630            101.into() => Result::Required(Req::Hash(1)),
631            102.into() => Result::DefaultSingleton,
632            103.into() => Result::DefaultHash,
633        };
634
635        test_success(facts, expected);
636    }
637
638    // 1 -|->
639    //        101
640    // 2 -|->
641    #[test]
642    fn test_upstream_hash_shard_failed() {
643        #[rustfmt::skip]
644        let facts = [
645            Fact::Req { id: 1.into(), req: Req::Hash(1) },
646            Fact::Req { id: 2.into(), req: Req::Hash(2) },
647            Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
648            Fact::Edge { from: 2.into(), to: 101.into(), dt: NoShuffle },
649        ];
650
651        test_failed(facts);
652    }
653
654    // 1 -|~> 101
655    #[test]
656    fn test_arrangement_backfill_vnode_count() {
657        #[rustfmt::skip]
658        let facts = [
659            Fact::Req { id: 1.into(), req: Req::Hash(1) },
660            Fact::Req { id: 101.into(), req: Req::AnyVnodeCount(128) },
661            Fact::Edge { from: 1.into(), to: 101.into(), dt: Hash },
662        ];
663
664        let expected = maplit::hashmap! {
665            101.into() => Result::Required(Req::AnyVnodeCount(128)),
666        };
667
668        test_success(facts, expected);
669    }
670
671    // 1 -|~> 101
672    #[test]
673    fn test_no_shuffle_backfill_vnode_count() {
674        #[rustfmt::skip]
675        let facts = [
676            Fact::Req { id: 1.into(), req: Req::Hash(1) },
677            Fact::Req { id: 101.into(), req: Req::AnyVnodeCount(128) },
678            Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
679        ];
680
681        let expected = maplit::hashmap! {
682            101.into() => Result::Required(Req::Hash(1)),
683        };
684
685        test_success_with_mapping_len(facts, expected, |id| {
686            assert_eq!(id, 1);
687            128
688        });
689    }
690
691    // 1 -|~> 101
692    #[test]
693    fn test_no_shuffle_backfill_mismatched_vnode_count() {
694        #[rustfmt::skip]
695        let facts = [
696            Fact::Req { id: 1.into(), req: Req::Hash(1) },
697            Fact::Req { id: 101.into(), req: Req::AnyVnodeCount(128) },
698            Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
699        ];
700
701        // Not specifying `mapping_len` should fail.
702        test_failed(facts);
703    }
704
705    // 1 -|~> 101
706    #[test]
707    fn test_backfill_singleton_vnode_count() {
708        #[rustfmt::skip]
709        let facts = [
710            Fact::Req { id: 1.into(), req: Req::Singleton(0) },
711            Fact::Req { id: 101.into(), req: Req::AnySingleton },
712            Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle }, // or `Simple`
713        ];
714
715        let expected = maplit::hashmap! {
716            101.into() => Result::Required(Req::Singleton(0)),
717        };
718
719        test_success(facts, expected);
720    }
721}