risingwave_meta/controller/
scale.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
23use risingwave_common::id::JobId;
24use risingwave_common::system_param::AdaptiveParallelismStrategy;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_connector::source::{SplitImpl, SplitMetaData};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::prelude::{
29    Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
30};
31use risingwave_meta_model::{
32    CreateType, DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism,
33    WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink, source,
34    streaming_job, table,
35};
36use risingwave_meta_model_migration::Condition;
37use sea_orm::{
38    ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
39    RelationTrait,
40};
41
42use crate::MetaResult;
43use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
44use crate::manager::ActiveStreamingWorkerNodes;
45use crate::model::{ActorId, StreamActor, StreamingJobModelContextExt};
46use crate::stream::{AssignerBuilder, SplitDiffOptions};
47
48pub(crate) async fn resolve_streaming_job_definition<C>(
49    txn: &C,
50    job_ids: &HashSet<JobId>,
51) -> MetaResult<HashMap<JobId, String>>
52where
53    C: ConnectionTrait,
54{
55    let job_ids = job_ids.iter().cloned().collect_vec();
56
57    // including table, materialized view, index
58    let common_job_definitions: Vec<(JobId, String)> = Table::find()
59        .select_only()
60        .columns([
61            table::Column::TableId,
62            #[cfg(not(debug_assertions))]
63            table::Column::Name,
64            #[cfg(debug_assertions)]
65            table::Column::Definition,
66        ])
67        .filter(table::Column::TableId.is_in(job_ids.clone()))
68        .into_tuple()
69        .all(txn)
70        .await?;
71
72    let sink_definitions: Vec<(JobId, String)> = Sink::find()
73        .select_only()
74        .columns([
75            sink::Column::SinkId,
76            #[cfg(not(debug_assertions))]
77            sink::Column::Name,
78            #[cfg(debug_assertions)]
79            sink::Column::Definition,
80        ])
81        .filter(sink::Column::SinkId.is_in(job_ids.clone()))
82        .into_tuple()
83        .all(txn)
84        .await?;
85
86    let source_definitions: Vec<(JobId, String)> = Source::find()
87        .select_only()
88        .columns([
89            source::Column::SourceId,
90            #[cfg(not(debug_assertions))]
91            source::Column::Name,
92            #[cfg(debug_assertions)]
93            source::Column::Definition,
94        ])
95        .filter(source::Column::SourceId.is_in(job_ids.clone()))
96        .into_tuple()
97        .all(txn)
98        .await?;
99
100    let definitions: HashMap<JobId, String> = common_job_definitions
101        .into_iter()
102        .chain(sink_definitions.into_iter())
103        .chain(source_definitions.into_iter())
104        .collect();
105
106    Ok(definitions)
107}
108
109pub async fn load_fragment_info<C>(
110    txn: &C,
111    actor_id_counter: &AtomicU32,
112    database_id: Option<DatabaseId>,
113    worker_nodes: &ActiveStreamingWorkerNodes,
114    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
115) -> MetaResult<FragmentRenderMap>
116where
117    C: ConnectionTrait,
118{
119    let mut query = StreamingJob::find()
120        .select_only()
121        .column(streaming_job::Column::JobId);
122
123    if let Some(database_id) = database_id {
124        query = query
125            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
126            .filter(object::Column::DatabaseId.eq(database_id));
127    }
128
129    let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
130
131    if jobs.is_empty() {
132        return Ok(HashMap::new());
133    }
134
135    let jobs: HashSet<JobId> = jobs.into_iter().collect();
136
137    let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
138
139    if loaded.is_empty() {
140        return Ok(HashMap::new());
141    }
142
143    let available_workers: BTreeMap<_, _> = worker_nodes
144        .current()
145        .values()
146        .filter(|worker| worker.is_streaming_schedulable())
147        .map(|worker| {
148            (
149                worker.id,
150                WorkerInfo {
151                    parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
152                    resource_group: worker.resource_group(),
153                },
154            )
155        })
156        .collect();
157
158    let RenderedGraph { fragments, .. } = render_actor_assignments(
159        actor_id_counter,
160        &available_workers,
161        adaptive_parallelism_strategy,
162        &loaded,
163    )?;
164
165    Ok(fragments)
166}
167
168#[derive(Debug)]
169pub struct TargetResourcePolicy {
170    pub resource_group: Option<String>,
171    pub parallelism: StreamingParallelism,
172}
173
174#[derive(Debug, Clone)]
175pub struct WorkerInfo {
176    pub parallelism: NonZeroUsize,
177    pub resource_group: Option<String>,
178}
179
180pub type FragmentRenderMap =
181    HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
182
183#[derive(Default)]
184pub struct RenderedGraph {
185    pub fragments: FragmentRenderMap,
186    pub ensembles: Vec<NoShuffleEnsemble>,
187}
188
189impl RenderedGraph {
190    pub fn empty() -> Self {
191        Self::default()
192    }
193}
194
195/// Context loaded asynchronously from database, containing all metadata
196/// required to render actor assignments. This separates async I/O from
197/// sync rendering logic.
198#[derive(Default)]
199pub struct LoadedFragmentContext {
200    pub ensembles: Vec<NoShuffleEnsemble>,
201    pub fragment_map: HashMap<FragmentId, fragment::Model>,
202    pub job_map: HashMap<JobId, streaming_job::Model>,
203    pub streaming_job_databases: HashMap<JobId, DatabaseId>,
204    pub database_map: HashMap<DatabaseId, database::Model>,
205    pub fragment_source_ids: HashMap<FragmentId, SourceId>,
206    pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
207}
208
209impl LoadedFragmentContext {
210    pub fn is_empty(&self) -> bool {
211        self.ensembles.is_empty()
212    }
213}
214
215/// Fragment-scoped rendering entry point used by operational tooling.
216/// It validates that the requested fragments are roots of their no-shuffle ensembles,
217/// resolves only the metadata required for those components, and then reuses the shared
218/// rendering pipeline to materialize actor assignments.
219pub async fn render_fragments<C>(
220    txn: &C,
221    actor_id_counter: &AtomicU32,
222    ensembles: Vec<NoShuffleEnsemble>,
223    workers: BTreeMap<WorkerId, WorkerInfo>,
224    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
225) -> MetaResult<RenderedGraph>
226where
227    C: ConnectionTrait,
228{
229    let loaded = load_fragment_context(txn, ensembles).await?;
230
231    if loaded.is_empty() {
232        return Ok(RenderedGraph::empty());
233    }
234
235    render_actor_assignments(
236        actor_id_counter,
237        &workers,
238        adaptive_parallelism_strategy,
239        &loaded,
240    )
241}
242
243/// Async load stage for fragment-scoped rendering. It resolves all metadata required to later
244/// render actor assignments with arbitrary worker sets.
245pub async fn load_fragment_context<C>(
246    txn: &C,
247    ensembles: Vec<NoShuffleEnsemble>,
248) -> MetaResult<LoadedFragmentContext>
249where
250    C: ConnectionTrait,
251{
252    if ensembles.is_empty() {
253        return Ok(LoadedFragmentContext::default());
254    }
255
256    let required_fragment_ids: HashSet<_> = ensembles
257        .iter()
258        .flat_map(|ensemble| ensemble.components.iter().copied())
259        .collect();
260
261    let fragment_models = Fragment::find()
262        .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
263        .all(txn)
264        .await?;
265
266    let found_fragment_ids: HashSet<_> = fragment_models
267        .iter()
268        .map(|fragment| fragment.fragment_id)
269        .collect();
270
271    if found_fragment_ids.len() != required_fragment_ids.len() {
272        let missing = required_fragment_ids
273            .difference(&found_fragment_ids)
274            .copied()
275            .collect_vec();
276        return Err(anyhow!("fragments {:?} not found", missing).into());
277    }
278
279    let fragment_map: HashMap<_, _> = fragment_models
280        .into_iter()
281        .map(|fragment| (fragment.fragment_id, fragment))
282        .collect();
283
284    let job_ids: HashSet<_> = fragment_map
285        .values()
286        .map(|fragment| fragment.job_id)
287        .collect();
288
289    if job_ids.is_empty() {
290        return Ok(LoadedFragmentContext::default());
291    }
292
293    let jobs: HashMap<_, _> = StreamingJob::find()
294        .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
295        .all(txn)
296        .await?
297        .into_iter()
298        .map(|job| (job.job_id, job))
299        .collect();
300
301    let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
302    if found_job_ids.len() != job_ids.len() {
303        let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
304        return Err(anyhow!("streaming jobs {:?} not found", missing).into());
305    }
306
307    build_loaded_context(txn, ensembles, fragment_map, jobs).await
308}
309
310/// Job-scoped rendering entry point that walks every no-shuffle root belonging to the
311/// provided streaming jobs before delegating to the shared rendering backend.
312pub async fn render_jobs<C>(
313    txn: &C,
314    actor_id_counter: &AtomicU32,
315    job_ids: HashSet<JobId>,
316    workers: BTreeMap<WorkerId, WorkerInfo>,
317    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
318) -> MetaResult<RenderedGraph>
319where
320    C: ConnectionTrait,
321{
322    let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
323
324    if loaded.is_empty() {
325        return Ok(RenderedGraph::empty());
326    }
327
328    render_actor_assignments(
329        actor_id_counter,
330        &workers,
331        adaptive_parallelism_strategy,
332        &loaded,
333    )
334}
335
336/// Async load stage for job-scoped rendering. It collects all no-shuffle ensembles and the
337/// metadata required to render actor assignments later with a provided worker set.
338pub async fn load_fragment_context_for_jobs<C>(
339    txn: &C,
340    job_ids: HashSet<JobId>,
341) -> MetaResult<LoadedFragmentContext>
342where
343    C: ConnectionTrait,
344{
345    if job_ids.is_empty() {
346        return Ok(LoadedFragmentContext::default());
347    }
348
349    let excluded_fragments_query = FragmentRelation::find()
350        .select_only()
351        .column(fragment_relation::Column::TargetFragmentId)
352        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
353        .into_query();
354
355    let condition = Condition::all()
356        .add(fragment::Column::JobId.is_in(job_ids.clone()))
357        .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
358
359    let fragments: Vec<FragmentId> = Fragment::find()
360        .select_only()
361        .column(fragment::Column::FragmentId)
362        .filter(condition)
363        .into_tuple()
364        .all(txn)
365        .await?;
366
367    let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
368
369    let fragments = Fragment::find()
370        .filter(
371            fragment::Column::FragmentId.is_in(
372                ensembles
373                    .iter()
374                    .flat_map(|graph| graph.components.iter())
375                    .cloned()
376                    .collect_vec(),
377            ),
378        )
379        .all(txn)
380        .await?;
381
382    let fragment_map: HashMap<_, _> = fragments
383        .into_iter()
384        .map(|fragment| (fragment.fragment_id, fragment))
385        .collect();
386
387    let job_ids = fragment_map
388        .values()
389        .map(|fragment| fragment.job_id)
390        .collect::<BTreeSet<_>>()
391        .into_iter()
392        .collect_vec();
393
394    let jobs: HashMap<_, _> = StreamingJob::find()
395        .filter(streaming_job::Column::JobId.is_in(job_ids))
396        .all(txn)
397        .await?
398        .into_iter()
399        .map(|job| (job.job_id, job))
400        .collect();
401
402    build_loaded_context(txn, ensembles, fragment_map, jobs).await
403}
404
405/// Sync render stage: uses loaded fragment context and current worker info
406/// to produce actor-to-worker assignments and vnode bitmaps.
407pub(crate) fn render_actor_assignments(
408    actor_id_counter: &AtomicU32,
409    worker_map: &BTreeMap<WorkerId, WorkerInfo>,
410    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
411    loaded: &LoadedFragmentContext,
412) -> MetaResult<RenderedGraph> {
413    if loaded.is_empty() {
414        return Ok(RenderedGraph::empty());
415    }
416
417    let backfill_jobs: HashSet<JobId> = loaded
418        .job_map
419        .iter()
420        .filter(|(_, job)| {
421            job.create_type == CreateType::Background && job.job_status == JobStatus::Creating
422        })
423        .map(|(id, _)| *id)
424        .collect();
425
426    let render_context = RenderActorsContext {
427        fragment_source_ids: &loaded.fragment_source_ids,
428        fragment_splits: &loaded.fragment_splits,
429        streaming_job_databases: &loaded.streaming_job_databases,
430        database_map: &loaded.database_map,
431        backfill_jobs: &backfill_jobs,
432    };
433
434    let fragments = render_actors(
435        actor_id_counter,
436        &loaded.ensembles,
437        &loaded.fragment_map,
438        &loaded.job_map,
439        worker_map,
440        adaptive_parallelism_strategy,
441        render_context,
442    )?;
443
444    Ok(RenderedGraph {
445        fragments,
446        ensembles: loaded.ensembles.clone(),
447    })
448}
449
450async fn build_loaded_context<C>(
451    txn: &C,
452    ensembles: Vec<NoShuffleEnsemble>,
453    fragment_map: HashMap<FragmentId, fragment::Model>,
454    job_map: HashMap<JobId, streaming_job::Model>,
455) -> MetaResult<LoadedFragmentContext>
456where
457    C: ConnectionTrait,
458{
459    if ensembles.is_empty() {
460        return Ok(LoadedFragmentContext::default());
461    }
462
463    #[cfg(debug_assertions)]
464    {
465        debug_sanity_check(&ensembles, &fragment_map, &job_map);
466    }
467
468    let (fragment_source_ids, fragment_splits) =
469        resolve_source_fragments(txn, &fragment_map).await?;
470
471    let job_ids = job_map.keys().copied().collect_vec();
472
473    let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
474        .select_only()
475        .column(streaming_job::Column::JobId)
476        .column(object::Column::DatabaseId)
477        .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
478        .filter(streaming_job::Column::JobId.is_in(job_ids))
479        .into_tuple()
480        .all(txn)
481        .await?
482        .into_iter()
483        .collect();
484
485    let database_map: HashMap<_, _> = Database::find()
486        .filter(
487            database::Column::DatabaseId
488                .is_in(streaming_job_databases.values().copied().collect_vec()),
489        )
490        .all(txn)
491        .await?
492        .into_iter()
493        .map(|db| (db.database_id, db))
494        .collect();
495
496    Ok(LoadedFragmentContext {
497        ensembles,
498        fragment_map,
499        job_map,
500        streaming_job_databases,
501        database_map,
502        fragment_source_ids,
503        fragment_splits,
504    })
505}
506
507// Only metadata resolved asynchronously lives here so the renderer stays synchronous
508// and the call site keeps the runtime dependencies (maps, strategy, actor counter, etc.) explicit.
509struct RenderActorsContext<'a> {
510    fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
511    fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
512    streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
513    database_map: &'a HashMap<DatabaseId, database::Model>,
514    backfill_jobs: &'a HashSet<JobId>,
515}
516
517fn render_actors(
518    actor_id_counter: &AtomicU32,
519    ensembles: &[NoShuffleEnsemble],
520    fragment_map: &HashMap<FragmentId, fragment::Model>,
521    job_map: &HashMap<JobId, streaming_job::Model>,
522    worker_map: &BTreeMap<WorkerId, WorkerInfo>,
523    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
524    context: RenderActorsContext<'_>,
525) -> MetaResult<FragmentRenderMap> {
526    let RenderActorsContext {
527        fragment_source_ids,
528        fragment_splits: fragment_splits_map,
529        streaming_job_databases,
530        database_map,
531        backfill_jobs,
532    } = context;
533
534    let mut all_fragments: FragmentRenderMap = HashMap::new();
535
536    for NoShuffleEnsemble {
537        entries,
538        components,
539    } in ensembles
540    {
541        tracing::debug!("rendering ensemble entries {:?}", entries);
542
543        let entry_fragments = entries
544            .iter()
545            .map(|fragment_id| fragment_map.get(fragment_id).unwrap())
546            .collect_vec();
547
548        let entry_fragment_parallelism = entry_fragments
549            .iter()
550            .map(|fragment| fragment.parallelism.clone())
551            .dedup()
552            .exactly_one()
553            .map_err(|_| {
554                anyhow!(
555                    "entry fragments {:?} have inconsistent parallelism settings",
556                    entries.iter().copied().collect_vec()
557                )
558            })?;
559
560        let (job_id, vnode_count) = entry_fragments
561            .iter()
562            .map(|f| (f.job_id, f.vnode_count as usize))
563            .dedup()
564            .exactly_one()
565            .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
566
567        let job = job_map
568            .get(&job_id)
569            .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
570
571        let job_strategy = job
572            .stream_context()
573            .adaptive_parallelism_strategy
574            .unwrap_or(adaptive_parallelism_strategy);
575
576        let resource_group = match &job.specific_resource_group {
577            None => {
578                let database = streaming_job_databases
579                    .get(&job_id)
580                    .and_then(|database_id| database_map.get(database_id))
581                    .unwrap();
582                database.resource_group.clone()
583            }
584            Some(resource_group) => resource_group.clone(),
585        };
586
587        let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
588            .iter()
589            .filter_map(|(worker_id, worker)| {
590                if worker
591                    .resource_group
592                    .as_deref()
593                    .unwrap_or(DEFAULT_RESOURCE_GROUP)
594                    == resource_group.as_str()
595                {
596                    Some((*worker_id, worker.parallelism))
597                } else {
598                    None
599                }
600            })
601            .collect();
602
603        let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
604
605        let effective_job_parallelism = if backfill_jobs.contains(&job_id) {
606            job.backfill_parallelism
607                .as_ref()
608                .unwrap_or(&job.parallelism)
609        } else {
610            &job.parallelism
611        };
612
613        let actual_parallelism = match entry_fragment_parallelism
614            .as_ref()
615            .unwrap_or(effective_job_parallelism)
616        {
617            StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
618                job_strategy.compute_target_parallelism(total_parallelism)
619            }
620            StreamingParallelism::Fixed(n) => *n,
621        }
622        .min(vnode_count)
623        .min(job.max_parallelism as usize);
624
625        tracing::debug!(
626            "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
627            job_id,
628            actual_parallelism,
629            job.parallelism,
630            total_parallelism,
631            job.max_parallelism,
632            vnode_count,
633            entry_fragment_parallelism
634        );
635
636        let assigner = AssignerBuilder::new(job_id).build();
637
638        let actors = (0..(actual_parallelism as u32))
639            .map_into::<ActorId>()
640            .collect_vec();
641        let vnodes = (0..vnode_count).collect_vec();
642
643        let assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
644
645        let source_entry_fragment = entry_fragments.iter().find(|f| {
646            let mask = FragmentTypeMask::from(f.fragment_type_mask);
647            if mask.contains(FragmentTypeFlag::Source) {
648                assert!(!mask.contains(FragmentTypeFlag::SourceScan))
649            }
650            mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
651        });
652
653        let (fragment_splits, shared_source_id) = match source_entry_fragment {
654            Some(entry_fragment) => {
655                let source_id = fragment_source_ids
656                    .get(&entry_fragment.fragment_id)
657                    .ok_or_else(|| {
658                        anyhow!(
659                            "missing source id in source fragment {}",
660                            entry_fragment.fragment_id
661                        )
662                    })?;
663
664                let entry_fragment_id = entry_fragment.fragment_id;
665
666                let empty_actor_splits: HashMap<_, _> =
667                    actors.iter().map(|actor_id| (*actor_id, vec![])).collect();
668
669                let splits = fragment_splits_map
670                    .get(&entry_fragment_id)
671                    .cloned()
672                    .unwrap_or_default();
673
674                let splits: BTreeMap<_, _> = splits.into_iter().map(|s| (s.id(), s)).collect();
675
676                let fragment_splits = crate::stream::source_manager::reassign_splits(
677                    entry_fragment_id,
678                    empty_actor_splits,
679                    &splits,
680                    SplitDiffOptions::default(),
681                )
682                .unwrap_or_default();
683                (fragment_splits, Some(*source_id))
684            }
685            None => (HashMap::new(), None),
686        };
687
688        for component_fragment_id in components {
689            let &fragment::Model {
690                fragment_id,
691                job_id,
692                fragment_type_mask,
693                distribution_type,
694                ref stream_node,
695                ref state_table_ids,
696                ..
697            } = fragment_map.get(component_fragment_id).unwrap();
698
699            let actor_count =
700                u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
701            let actor_id_base = actor_id_counter.fetch_add(actor_count, Ordering::Relaxed);
702
703            let actors: HashMap<ActorId, InflightActorInfo> = assignment
704                .iter()
705                .flat_map(|(worker_id, actors)| {
706                    actors
707                        .iter()
708                        .map(move |(actor_id, vnodes)| (worker_id, actor_id, vnodes))
709                })
710                .map(|(&worker_id, &actor_idx, vnodes)| {
711                    let vnode_bitmap = match distribution_type {
712                        DistributionType::Single => None,
713                        DistributionType::Hash => Some(Bitmap::from_indices(vnode_count, vnodes)),
714                    };
715
716                    let actor_id = actor_idx + actor_id_base;
717
718                    let splits = if let Some(source_id) = fragment_source_ids.get(&fragment_id) {
719                        assert_eq!(shared_source_id, Some(*source_id));
720
721                        fragment_splits
722                            .get(&(actor_idx))
723                            .cloned()
724                            .unwrap_or_default()
725                    } else {
726                        vec![]
727                    };
728
729                    (
730                        actor_id,
731                        InflightActorInfo {
732                            worker_id,
733                            vnode_bitmap,
734                            splits,
735                        },
736                    )
737                })
738                .collect();
739
740            let fragment = InflightFragmentInfo {
741                fragment_id,
742                distribution_type,
743                fragment_type_mask: fragment_type_mask.into(),
744                vnode_count,
745                nodes: stream_node.to_protobuf(),
746                actors,
747                state_table_ids: state_table_ids.inner_ref().iter().copied().collect(),
748            };
749
750            let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
751                anyhow!("streaming job {job_id} not found in streaming_job_databases")
752            })?;
753
754            all_fragments
755                .entry(database_id)
756                .or_default()
757                .entry(job_id)
758                .or_default()
759                .insert(fragment_id, fragment);
760        }
761    }
762
763    Ok(all_fragments)
764}
765
766#[cfg(debug_assertions)]
767fn debug_sanity_check(
768    ensembles: &[NoShuffleEnsemble],
769    fragment_map: &HashMap<FragmentId, fragment::Model>,
770    jobs: &HashMap<JobId, streaming_job::Model>,
771) {
772    // Debug-only assertions to catch inconsistent ensemble metadata early.
773    debug_assert!(
774        ensembles
775            .iter()
776            .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
777        "entries must be subset of components"
778    );
779
780    let mut missing_fragments = BTreeSet::new();
781    let mut missing_jobs = BTreeSet::new();
782
783    for fragment_id in ensembles
784        .iter()
785        .flat_map(|ensemble| ensemble.components.iter())
786    {
787        match fragment_map.get(fragment_id) {
788            Some(fragment) => {
789                if !jobs.contains_key(&fragment.job_id) {
790                    missing_jobs.insert(fragment.job_id);
791                }
792            }
793            None => {
794                missing_fragments.insert(*fragment_id);
795            }
796        }
797    }
798
799    debug_assert!(
800        missing_fragments.is_empty(),
801        "missing fragments in fragment_map: {:?}",
802        missing_fragments
803    );
804
805    debug_assert!(
806        missing_jobs.is_empty(),
807        "missing jobs for fragments' job_id: {:?}",
808        missing_jobs
809    );
810
811    for ensemble in ensembles {
812        let unique_vnode_counts: Vec<_> = ensemble
813            .components
814            .iter()
815            .flat_map(|fragment_id| {
816                fragment_map
817                    .get(fragment_id)
818                    .map(|fragment| fragment.vnode_count)
819            })
820            .unique()
821            .collect();
822
823        debug_assert!(
824            unique_vnode_counts.len() <= 1,
825            "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
826            ensemble.components,
827            unique_vnode_counts
828        );
829    }
830}
831
832async fn resolve_source_fragments<C>(
833    txn: &C,
834    fragment_map: &HashMap<FragmentId, fragment::Model>,
835) -> MetaResult<(
836    HashMap<FragmentId, SourceId>,
837    HashMap<FragmentId, Vec<SplitImpl>>,
838)>
839where
840    C: ConnectionTrait,
841{
842    let mut source_fragment_ids: HashMap<SourceId, _> = HashMap::new();
843    for (fragment_id, fragment) in fragment_map {
844        let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
845        if mask.contains(FragmentTypeFlag::Source)
846            && let Some(source_id) = fragment.stream_node.to_protobuf().find_stream_source()
847        {
848            source_fragment_ids
849                .entry(source_id)
850                .or_insert_with(BTreeSet::new)
851                .insert(fragment_id);
852        }
853
854        if mask.contains(FragmentTypeFlag::SourceScan)
855            && let Some((source_id, _)) = fragment.stream_node.to_protobuf().find_source_backfill()
856        {
857            source_fragment_ids
858                .entry(source_id)
859                .or_insert_with(BTreeSet::new)
860                .insert(fragment_id);
861        }
862    }
863
864    let fragment_source_ids: HashMap<_, _> = source_fragment_ids
865        .iter()
866        .flat_map(|(source_id, fragment_ids)| {
867            fragment_ids
868                .iter()
869                .map(|fragment_id| (**fragment_id, *source_id as SourceId))
870        })
871        .collect();
872
873    let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
874
875    let fragment_splits: Vec<_> = FragmentSplits::find()
876        .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
877        .all(txn)
878        .await?;
879
880    let fragment_splits: HashMap<_, _> = fragment_splits
881        .into_iter()
882        .flat_map(|model| {
883            model.splits.map(|splits| {
884                (
885                    model.fragment_id,
886                    splits
887                        .to_protobuf()
888                        .splits
889                        .iter()
890                        .flat_map(SplitImpl::try_from)
891                        .collect_vec(),
892                )
893            })
894        })
895        .collect();
896
897    Ok((fragment_source_ids, fragment_splits))
898}
899
900// Helper struct to make the function signature cleaner and to properly bundle the required data.
901#[derive(Debug)]
902pub struct ActorGraph<'a> {
903    pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
904    pub locations: &'a HashMap<ActorId, WorkerId>,
905}
906
907#[derive(Debug, Clone)]
908pub struct NoShuffleEnsemble {
909    entries: HashSet<FragmentId>,
910    components: HashSet<FragmentId>,
911}
912
913impl NoShuffleEnsemble {
914    pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
915        self.components.iter().cloned()
916    }
917
918    pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
919        self.entries.iter().copied()
920    }
921
922    pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
923        self.components.iter().copied()
924    }
925
926    pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
927        self.entries.contains(fragment_id)
928    }
929}
930
931pub async fn find_fragment_no_shuffle_dags_detailed(
932    db: &impl ConnectionTrait,
933    initial_fragment_ids: &[FragmentId],
934) -> MetaResult<Vec<NoShuffleEnsemble>> {
935    let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
936        .columns([
937            fragment_relation::Column::SourceFragmentId,
938            fragment_relation::Column::TargetFragmentId,
939        ])
940        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
941        .into_tuple()
942        .all(db)
943        .await?;
944
945    let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
946    let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
947
948    for (src, dst) in all_no_shuffle_relations {
949        forward_edges.entry(src).or_default().push(dst);
950        backward_edges.entry(dst).or_default().push(src);
951    }
952
953    find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
954}
955
956fn find_no_shuffle_graphs(
957    initial_fragment_ids: &[impl Into<FragmentId> + Copy],
958    forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
959    backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
960) -> MetaResult<Vec<NoShuffleEnsemble>> {
961    let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
962    let mut globally_visited: HashSet<FragmentId> = HashSet::new();
963
964    for &init_id in initial_fragment_ids {
965        let init_id = init_id.into();
966        if globally_visited.contains(&init_id) {
967            continue;
968        }
969
970        // Found a new component. Traverse it to find all its nodes.
971        let mut components = HashSet::new();
972        let mut queue: VecDeque<FragmentId> = VecDeque::new();
973
974        queue.push_back(init_id);
975        globally_visited.insert(init_id);
976
977        while let Some(current_id) = queue.pop_front() {
978            components.insert(current_id);
979            let neighbors = forward_edges
980                .get(&current_id)
981                .into_iter()
982                .flatten()
983                .chain(backward_edges.get(&current_id).into_iter().flatten());
984
985            for &neighbor_id in neighbors {
986                if globally_visited.insert(neighbor_id) {
987                    queue.push_back(neighbor_id);
988                }
989            }
990        }
991
992        // For the newly found component, identify its roots.
993        let mut entries = HashSet::new();
994        for &node_id in &components {
995            let is_root = match backward_edges.get(&node_id) {
996                Some(parents) => parents.iter().all(|p| !components.contains(p)),
997                None => true,
998            };
999            if is_root {
1000                entries.insert(node_id);
1001            }
1002        }
1003
1004        // Store the detailed DAG structure (roots, all nodes in this DAG).
1005        if !entries.is_empty() {
1006            graphs.push(NoShuffleEnsemble {
1007                entries,
1008                components,
1009            });
1010        }
1011    }
1012
1013    Ok(graphs)
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018    use std::collections::{BTreeSet, HashMap, HashSet};
1019    use std::sync::Arc;
1020
1021    use risingwave_connector::source::SplitImpl;
1022    use risingwave_connector::source::test_source::TestSourceSplit;
1023    use risingwave_meta_model::{CreateType, I32Array, JobStatus, StreamNode, TableIdArray};
1024    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1025
1026    use super::*;
1027
1028    // Helper type aliases for cleaner test code
1029    // Using the actual FragmentId type from the module
1030    type Edges = (
1031        HashMap<FragmentId, Vec<FragmentId>>,
1032        HashMap<FragmentId, Vec<FragmentId>>,
1033    );
1034
1035    /// A helper function to build forward and backward edge maps from a simple list of tuples.
1036    /// This reduces boilerplate in each test.
1037    fn build_edges(relations: &[(u32, u32)]) -> Edges {
1038        let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1039        let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1040        for &(src, dst) in relations {
1041            forward_edges
1042                .entry(src.into())
1043                .or_default()
1044                .push(dst.into());
1045            backward_edges
1046                .entry(dst.into())
1047                .or_default()
1048                .push(src.into());
1049        }
1050        (forward_edges, backward_edges)
1051    }
1052
1053    /// Helper function to create a `HashSet` from a slice easily.
1054    fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1055        ids.iter().map(|id| (*id).into()).collect()
1056    }
1057
1058    #[allow(deprecated)]
1059    fn build_fragment(
1060        fragment_id: FragmentId,
1061        job_id: JobId,
1062        fragment_type_mask: i32,
1063        distribution_type: DistributionType,
1064        vnode_count: i32,
1065        parallelism: StreamingParallelism,
1066    ) -> fragment::Model {
1067        fragment::Model {
1068            fragment_id,
1069            job_id,
1070            fragment_type_mask,
1071            distribution_type,
1072            stream_node: StreamNode::from(&PbStreamNode::default()),
1073            state_table_ids: TableIdArray::default(),
1074            upstream_fragment_id: I32Array::default(),
1075            vnode_count,
1076            parallelism: Some(parallelism),
1077        }
1078    }
1079
1080    type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1081
1082    fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1083        let base = fragment.actors.keys().copied().min().unwrap_or_default();
1084
1085        let mut entries: Vec<_> = fragment
1086            .actors
1087            .iter()
1088            .map(|(&actor_id, info)| {
1089                let idx = actor_id.as_raw_id() - base.as_raw_id();
1090                let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1091                    bitmap
1092                        .iter()
1093                        .enumerate()
1094                        .filter_map(|(pos, is_set)| is_set.then_some(pos))
1095                        .collect::<Vec<_>>()
1096                });
1097                let splits = info
1098                    .splits
1099                    .iter()
1100                    .map(|split| split.id().to_string())
1101                    .collect::<Vec<_>>();
1102                (idx.into(), info.worker_id, vnode_indices, splits)
1103            })
1104            .collect();
1105
1106        entries.sort_by_key(|(idx, _, _, _)| *idx);
1107        entries
1108    }
1109
1110    #[test]
1111    fn test_single_linear_chain() {
1112        // Scenario: A simple linear graph 1 -> 2 -> 3.
1113        // We start from the middle node (2).
1114        let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1115        let initial_ids = &[2];
1116
1117        // Act
1118        let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1119
1120        // Assert
1121        assert!(result.is_ok());
1122        let graphs = result.unwrap();
1123
1124        assert_eq!(graphs.len(), 1);
1125        let graph = &graphs[0];
1126        assert_eq!(graph.entries, to_hashset(&[1]));
1127        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1128    }
1129
1130    #[test]
1131    fn test_two_disconnected_graphs() {
1132        // Scenario: Two separate graphs: 1->2 and 10->11.
1133        // We start with one node from each graph.
1134        let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1135        let initial_ids = &[2, 10];
1136
1137        // Act
1138        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1139
1140        // Assert
1141        assert_eq!(graphs.len(), 2);
1142
1143        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1144        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1145
1146        // Graph 1
1147        assert_eq!(graphs[0].entries, to_hashset(&[1]));
1148        assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1149
1150        // Graph 2
1151        assert_eq!(graphs[1].entries, to_hashset(&[10]));
1152        assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1153    }
1154
1155    #[test]
1156    fn test_multiple_entries_in_one_graph() {
1157        // Scenario: A graph with two roots feeding into one node: 1->3, 2->3.
1158        let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1159        let initial_ids = &[3];
1160
1161        // Act
1162        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1163
1164        // Assert
1165        assert_eq!(graphs.len(), 1);
1166        let graph = &graphs[0];
1167        assert_eq!(graph.entries, to_hashset(&[1, 2]));
1168        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1169    }
1170
1171    #[test]
1172    fn test_diamond_shape_graph() {
1173        // Scenario: A diamond shape: 1->2, 1->3, 2->4, 3->4
1174        let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1175        let initial_ids = &[4];
1176
1177        // Act
1178        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1179
1180        // Assert
1181        assert_eq!(graphs.len(), 1);
1182        let graph = &graphs[0];
1183        assert_eq!(graph.entries, to_hashset(&[1]));
1184        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1185    }
1186
1187    #[test]
1188    fn test_starting_with_multiple_nodes_in_same_graph() {
1189        // Scenario: Start with two different nodes (2 and 4) from the same component.
1190        // Should only identify one graph, not two.
1191        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1192        let initial_ids = &[2, 4];
1193
1194        // Act
1195        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1196
1197        // Assert
1198        assert_eq!(graphs.len(), 1);
1199        let graph = &graphs[0];
1200        assert_eq!(graph.entries, to_hashset(&[1]));
1201        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1202    }
1203
1204    #[test]
1205    fn test_empty_initial_ids() {
1206        // Scenario: The initial ID list is empty.
1207        let (forward, backward) = build_edges(&[(1, 2)]);
1208        let initial_ids: &[u32] = &[];
1209
1210        // Act
1211        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1212
1213        // Assert
1214        assert!(graphs.is_empty());
1215    }
1216
1217    #[test]
1218    fn test_isolated_node_as_input() {
1219        // Scenario: Start with an ID that has no relations.
1220        let (forward, backward) = build_edges(&[(1, 2)]);
1221        let initial_ids = &[100];
1222
1223        // Act
1224        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1225
1226        // Assert
1227        assert_eq!(graphs.len(), 1);
1228        let graph = &graphs[0];
1229        assert_eq!(graph.entries, to_hashset(&[100]));
1230        assert_eq!(graph.components, to_hashset(&[100]));
1231    }
1232
1233    #[test]
1234    fn test_graph_with_a_cycle() {
1235        // Scenario: A graph with a cycle: 1 -> 2 -> 3 -> 1.
1236        // The algorithm should correctly identify all nodes in the component.
1237        // Crucially, NO node is a root because every node has a parent *within the component*.
1238        // Therefore, the `entries` set should be empty, and the graph should not be included in the results.
1239        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1240        let initial_ids = &[2];
1241
1242        // Act
1243        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1244
1245        // Assert
1246        assert!(
1247            graphs.is_empty(),
1248            "A graph with no entries should not be returned"
1249        );
1250    }
1251    #[test]
1252    fn test_custom_complex() {
1253        let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1254        let initial_ids = &[1, 2, 4, 6];
1255
1256        // Act
1257        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1258
1259        // Assert
1260        assert_eq!(graphs.len(), 2);
1261        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1262        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1263
1264        // Graph 1
1265        assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1266        assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1267
1268        // Graph 2
1269        assert_eq!(graphs[1].entries, to_hashset(&[6]));
1270        assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1271    }
1272
1273    #[test]
1274    fn render_actors_increments_actor_counter() {
1275        let actor_id_counter = AtomicU32::new(100);
1276        let fragment_id: FragmentId = 1.into();
1277        let job_id: JobId = 10.into();
1278        let database_id: DatabaseId = DatabaseId::new(3);
1279
1280        let fragment_model = build_fragment(
1281            fragment_id,
1282            job_id,
1283            0,
1284            DistributionType::Single,
1285            1,
1286            StreamingParallelism::Fixed(1),
1287        );
1288
1289        let job_model = streaming_job::Model {
1290            job_id,
1291            job_status: JobStatus::Created,
1292            create_type: CreateType::Foreground,
1293            timezone: None,
1294            config_override: None,
1295            adaptive_parallelism_strategy: None,
1296            parallelism: StreamingParallelism::Fixed(1),
1297            backfill_parallelism: None,
1298            backfill_orders: None,
1299            max_parallelism: 1,
1300            specific_resource_group: None,
1301            is_serverless_backfill: false,
1302        };
1303
1304        let database_model = database::Model {
1305            database_id,
1306            name: "test_db".into(),
1307            resource_group: "rg-a".into(),
1308            barrier_interval_ms: None,
1309            checkpoint_frequency: None,
1310        };
1311
1312        let ensembles = vec![NoShuffleEnsemble {
1313            entries: HashSet::from([fragment_id]),
1314            components: HashSet::from([fragment_id]),
1315        }];
1316
1317        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1318        let job_map = HashMap::from([(job_id, job_model)]);
1319
1320        let worker_map = BTreeMap::from([(
1321            1.into(),
1322            WorkerInfo {
1323                parallelism: NonZeroUsize::new(1).unwrap(),
1324                resource_group: Some("rg-a".into()),
1325            },
1326        )]);
1327
1328        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1329        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1330        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1331        let database_map = HashMap::from([(database_id, database_model)]);
1332        let backfill_jobs = HashSet::new();
1333
1334        let context = RenderActorsContext {
1335            fragment_source_ids: &fragment_source_ids,
1336            fragment_splits: &fragment_splits,
1337            streaming_job_databases: &streaming_job_databases,
1338            database_map: &database_map,
1339            backfill_jobs: &backfill_jobs,
1340        };
1341
1342        let result = render_actors(
1343            &actor_id_counter,
1344            &ensembles,
1345            &fragment_map,
1346            &job_map,
1347            &worker_map,
1348            AdaptiveParallelismStrategy::Auto,
1349            context,
1350        )
1351        .expect("actor rendering succeeds");
1352
1353        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1354        assert_eq!(state.len(), 1);
1355        assert!(
1356            state[0].2.is_none(),
1357            "single distribution should not assign vnode bitmaps"
1358        );
1359        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
1360    }
1361
1362    #[test]
1363    fn render_actors_aligns_hash_vnode_bitmaps() {
1364        let actor_id_counter = AtomicU32::new(0);
1365        let entry_fragment_id: FragmentId = 1.into();
1366        let downstream_fragment_id: FragmentId = 2.into();
1367        let job_id: JobId = 20.into();
1368        let database_id: DatabaseId = DatabaseId::new(5);
1369
1370        let entry_fragment = build_fragment(
1371            entry_fragment_id,
1372            job_id,
1373            0,
1374            DistributionType::Hash,
1375            4,
1376            StreamingParallelism::Fixed(2),
1377        );
1378
1379        let downstream_fragment = build_fragment(
1380            downstream_fragment_id,
1381            job_id,
1382            0,
1383            DistributionType::Hash,
1384            4,
1385            StreamingParallelism::Fixed(2),
1386        );
1387
1388        let job_model = streaming_job::Model {
1389            job_id,
1390            job_status: JobStatus::Created,
1391            create_type: CreateType::Background,
1392            timezone: None,
1393            config_override: None,
1394            adaptive_parallelism_strategy: None,
1395            parallelism: StreamingParallelism::Fixed(2),
1396            backfill_parallelism: None,
1397            backfill_orders: None,
1398            max_parallelism: 2,
1399            specific_resource_group: None,
1400            is_serverless_backfill: false,
1401        };
1402
1403        let database_model = database::Model {
1404            database_id,
1405            name: "test_db_hash".into(),
1406            resource_group: "rg-hash".into(),
1407            barrier_interval_ms: None,
1408            checkpoint_frequency: None,
1409        };
1410
1411        let ensembles = vec![NoShuffleEnsemble {
1412            entries: HashSet::from([entry_fragment_id]),
1413            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1414        }];
1415
1416        let fragment_map = HashMap::from([
1417            (entry_fragment_id, entry_fragment),
1418            (downstream_fragment_id, downstream_fragment),
1419        ]);
1420        let job_map = HashMap::from([(job_id, job_model)]);
1421
1422        let worker_map = BTreeMap::from([
1423            (
1424                1.into(),
1425                WorkerInfo {
1426                    parallelism: NonZeroUsize::new(1).unwrap(),
1427                    resource_group: Some("rg-hash".into()),
1428                },
1429            ),
1430            (
1431                2.into(),
1432                WorkerInfo {
1433                    parallelism: NonZeroUsize::new(1).unwrap(),
1434                    resource_group: Some("rg-hash".into()),
1435                },
1436            ),
1437        ]);
1438
1439        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1440        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1441        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1442        let database_map = HashMap::from([(database_id, database_model)]);
1443        let backfill_jobs = HashSet::new();
1444
1445        let context = RenderActorsContext {
1446            fragment_source_ids: &fragment_source_ids,
1447            fragment_splits: &fragment_splits,
1448            streaming_job_databases: &streaming_job_databases,
1449            database_map: &database_map,
1450            backfill_jobs: &backfill_jobs,
1451        };
1452
1453        let result = render_actors(
1454            &actor_id_counter,
1455            &ensembles,
1456            &fragment_map,
1457            &job_map,
1458            &worker_map,
1459            AdaptiveParallelismStrategy::Auto,
1460            context,
1461        )
1462        .expect("actor rendering succeeds");
1463
1464        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1465        let downstream_state =
1466            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1467
1468        assert_eq!(entry_state.len(), 2);
1469        assert_eq!(entry_state, downstream_state);
1470
1471        let assigned_vnodes: BTreeSet<_> = entry_state
1472            .iter()
1473            .flat_map(|(_, _, vnodes, _)| {
1474                vnodes
1475                    .as_ref()
1476                    .expect("hash distribution should populate vnode bitmap")
1477                    .iter()
1478                    .copied()
1479            })
1480            .collect();
1481        assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
1482        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1483    }
1484
1485    #[test]
1486    fn render_actors_propagates_source_splits() {
1487        let actor_id_counter = AtomicU32::new(0);
1488        let entry_fragment_id: FragmentId = 11.into();
1489        let downstream_fragment_id: FragmentId = 12.into();
1490        let job_id: JobId = 30.into();
1491        let database_id: DatabaseId = DatabaseId::new(7);
1492        let source_id: SourceId = 99.into();
1493
1494        let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
1495        let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
1496
1497        let entry_fragment = build_fragment(
1498            entry_fragment_id,
1499            job_id,
1500            source_mask,
1501            DistributionType::Hash,
1502            4,
1503            StreamingParallelism::Fixed(2),
1504        );
1505
1506        let downstream_fragment = build_fragment(
1507            downstream_fragment_id,
1508            job_id,
1509            source_scan_mask,
1510            DistributionType::Hash,
1511            4,
1512            StreamingParallelism::Fixed(2),
1513        );
1514
1515        let job_model = streaming_job::Model {
1516            job_id,
1517            job_status: JobStatus::Created,
1518            create_type: CreateType::Background,
1519            timezone: None,
1520            config_override: None,
1521            adaptive_parallelism_strategy: None,
1522            parallelism: StreamingParallelism::Fixed(2),
1523            backfill_parallelism: None,
1524            backfill_orders: None,
1525            max_parallelism: 2,
1526            specific_resource_group: None,
1527            is_serverless_backfill: false,
1528        };
1529
1530        let database_model = database::Model {
1531            database_id,
1532            name: "split_db".into(),
1533            resource_group: "rg-source".into(),
1534            barrier_interval_ms: None,
1535            checkpoint_frequency: None,
1536        };
1537
1538        let ensembles = vec![NoShuffleEnsemble {
1539            entries: HashSet::from([entry_fragment_id]),
1540            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1541        }];
1542
1543        let fragment_map = HashMap::from([
1544            (entry_fragment_id, entry_fragment),
1545            (downstream_fragment_id, downstream_fragment),
1546        ]);
1547        let job_map = HashMap::from([(job_id, job_model)]);
1548
1549        let worker_map = BTreeMap::from([
1550            (
1551                1.into(),
1552                WorkerInfo {
1553                    parallelism: NonZeroUsize::new(1).unwrap(),
1554                    resource_group: Some("rg-source".into()),
1555                },
1556            ),
1557            (
1558                2.into(),
1559                WorkerInfo {
1560                    parallelism: NonZeroUsize::new(1).unwrap(),
1561                    resource_group: Some("rg-source".into()),
1562                },
1563            ),
1564        ]);
1565
1566        let split_a = SplitImpl::Test(TestSourceSplit {
1567            id: Arc::<str>::from("split-a"),
1568            properties: HashMap::new(),
1569            offset: "0".into(),
1570        });
1571        let split_b = SplitImpl::Test(TestSourceSplit {
1572            id: Arc::<str>::from("split-b"),
1573            properties: HashMap::new(),
1574            offset: "0".into(),
1575        });
1576
1577        let fragment_source_ids = HashMap::from([
1578            (entry_fragment_id, source_id),
1579            (downstream_fragment_id, source_id),
1580        ]);
1581        let fragment_splits =
1582            HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
1583        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1584        let database_map = HashMap::from([(database_id, database_model)]);
1585        let backfill_jobs = HashSet::new();
1586
1587        let context = RenderActorsContext {
1588            fragment_source_ids: &fragment_source_ids,
1589            fragment_splits: &fragment_splits,
1590            streaming_job_databases: &streaming_job_databases,
1591            database_map: &database_map,
1592            backfill_jobs: &backfill_jobs,
1593        };
1594
1595        let result = render_actors(
1596            &actor_id_counter,
1597            &ensembles,
1598            &fragment_map,
1599            &job_map,
1600            &worker_map,
1601            AdaptiveParallelismStrategy::Auto,
1602            context,
1603        )
1604        .expect("actor rendering succeeds");
1605
1606        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1607        let downstream_state =
1608            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1609
1610        assert_eq!(entry_state, downstream_state);
1611
1612        let split_ids: BTreeSet<_> = entry_state
1613            .iter()
1614            .flat_map(|(_, _, _, splits)| splits.iter().cloned())
1615            .collect();
1616        assert_eq!(
1617            split_ids,
1618            BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
1619        );
1620        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1621    }
1622
1623    /// Test that job-level strategy overrides global strategy for Adaptive parallelism.
1624    #[test]
1625    fn render_actors_job_strategy_overrides_global() {
1626        let actor_id_counter = AtomicU32::new(0);
1627        let fragment_id: FragmentId = 1.into();
1628        let job_id: JobId = 100.into();
1629        let database_id: DatabaseId = DatabaseId::new(10);
1630
1631        // Fragment with Adaptive parallelism, vnode_count = 8
1632        let fragment_model = build_fragment(
1633            fragment_id,
1634            job_id,
1635            0,
1636            DistributionType::Hash,
1637            8,
1638            StreamingParallelism::Adaptive,
1639        );
1640
1641        // Job has custom strategy: BOUNDED(2)
1642        let job_model = streaming_job::Model {
1643            job_id,
1644            job_status: JobStatus::Created,
1645            create_type: CreateType::Foreground,
1646            timezone: None,
1647            config_override: None,
1648            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1649            parallelism: StreamingParallelism::Adaptive,
1650            backfill_parallelism: None,
1651            backfill_orders: None,
1652            max_parallelism: 8,
1653            specific_resource_group: None,
1654            is_serverless_backfill: false,
1655        };
1656
1657        let database_model = database::Model {
1658            database_id,
1659            name: "test_db".into(),
1660            resource_group: "default".into(),
1661            barrier_interval_ms: None,
1662            checkpoint_frequency: None,
1663        };
1664
1665        let ensembles = vec![NoShuffleEnsemble {
1666            entries: HashSet::from([fragment_id]),
1667            components: HashSet::from([fragment_id]),
1668        }];
1669
1670        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1671        let job_map = HashMap::from([(job_id, job_model)]);
1672
1673        // 4 workers with 1 parallelism each = total 4 parallelism
1674        let worker_map = BTreeMap::from([
1675            (
1676                1.into(),
1677                WorkerInfo {
1678                    parallelism: NonZeroUsize::new(1).unwrap(),
1679                    resource_group: Some("default".into()),
1680                },
1681            ),
1682            (
1683                2.into(),
1684                WorkerInfo {
1685                    parallelism: NonZeroUsize::new(1).unwrap(),
1686                    resource_group: Some("default".into()),
1687                },
1688            ),
1689            (
1690                3.into(),
1691                WorkerInfo {
1692                    parallelism: NonZeroUsize::new(1).unwrap(),
1693                    resource_group: Some("default".into()),
1694                },
1695            ),
1696            (
1697                4.into(),
1698                WorkerInfo {
1699                    parallelism: NonZeroUsize::new(1).unwrap(),
1700                    resource_group: Some("default".into()),
1701                },
1702            ),
1703        ]);
1704
1705        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1706        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1707        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1708        let database_map = HashMap::from([(database_id, database_model)]);
1709        let backfill_jobs = HashSet::new();
1710
1711        let context = RenderActorsContext {
1712            fragment_source_ids: &fragment_source_ids,
1713            fragment_splits: &fragment_splits,
1714            streaming_job_databases: &streaming_job_databases,
1715            database_map: &database_map,
1716            backfill_jobs: &backfill_jobs,
1717        };
1718
1719        // Global strategy is FULL (would give 4 actors), but job strategy is BOUNDED(2)
1720        let result = render_actors(
1721            &actor_id_counter,
1722            &ensembles,
1723            &fragment_map,
1724            &job_map,
1725            &worker_map,
1726            AdaptiveParallelismStrategy::Full,
1727            context,
1728        )
1729        .expect("actor rendering succeeds");
1730
1731        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1732        // Job strategy BOUNDED(2) should limit to 2 actors, not 4 (global FULL)
1733        assert_eq!(
1734            state.len(),
1735            2,
1736            "Job strategy BOUNDED(2) should override global FULL"
1737        );
1738    }
1739
1740    /// Test that global strategy is used when job has no custom strategy.
1741    #[test]
1742    fn render_actors_uses_global_strategy_when_job_has_none() {
1743        let actor_id_counter = AtomicU32::new(0);
1744        let fragment_id: FragmentId = 1.into();
1745        let job_id: JobId = 101.into();
1746        let database_id: DatabaseId = DatabaseId::new(11);
1747
1748        let fragment_model = build_fragment(
1749            fragment_id,
1750            job_id,
1751            0,
1752            DistributionType::Hash,
1753            8,
1754            StreamingParallelism::Adaptive,
1755        );
1756
1757        // Job has NO custom strategy (None)
1758        let job_model = streaming_job::Model {
1759            job_id,
1760            job_status: JobStatus::Created,
1761            create_type: CreateType::Foreground,
1762            timezone: None,
1763            config_override: None,
1764            adaptive_parallelism_strategy: None, // No custom strategy
1765            parallelism: StreamingParallelism::Adaptive,
1766            backfill_parallelism: None,
1767            backfill_orders: None,
1768            max_parallelism: 8,
1769            specific_resource_group: None,
1770            is_serverless_backfill: false,
1771        };
1772
1773        let database_model = database::Model {
1774            database_id,
1775            name: "test_db".into(),
1776            resource_group: "default".into(),
1777            barrier_interval_ms: None,
1778            checkpoint_frequency: None,
1779        };
1780
1781        let ensembles = vec![NoShuffleEnsemble {
1782            entries: HashSet::from([fragment_id]),
1783            components: HashSet::from([fragment_id]),
1784        }];
1785
1786        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1787        let job_map = HashMap::from([(job_id, job_model)]);
1788
1789        // 4 workers = total 4 parallelism
1790        let worker_map = BTreeMap::from([
1791            (
1792                1.into(),
1793                WorkerInfo {
1794                    parallelism: NonZeroUsize::new(1).unwrap(),
1795                    resource_group: Some("default".into()),
1796                },
1797            ),
1798            (
1799                2.into(),
1800                WorkerInfo {
1801                    parallelism: NonZeroUsize::new(1).unwrap(),
1802                    resource_group: Some("default".into()),
1803                },
1804            ),
1805            (
1806                3.into(),
1807                WorkerInfo {
1808                    parallelism: NonZeroUsize::new(1).unwrap(),
1809                    resource_group: Some("default".into()),
1810                },
1811            ),
1812            (
1813                4.into(),
1814                WorkerInfo {
1815                    parallelism: NonZeroUsize::new(1).unwrap(),
1816                    resource_group: Some("default".into()),
1817                },
1818            ),
1819        ]);
1820
1821        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1822        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1823        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1824        let database_map = HashMap::from([(database_id, database_model)]);
1825        let backfill_jobs = HashSet::new();
1826
1827        let context = RenderActorsContext {
1828            fragment_source_ids: &fragment_source_ids,
1829            fragment_splits: &fragment_splits,
1830            streaming_job_databases: &streaming_job_databases,
1831            database_map: &database_map,
1832            backfill_jobs: &backfill_jobs,
1833        };
1834
1835        // Global strategy is BOUNDED(3)
1836        let result = render_actors(
1837            &actor_id_counter,
1838            &ensembles,
1839            &fragment_map,
1840            &job_map,
1841            &worker_map,
1842            AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(3).unwrap()),
1843            context,
1844        )
1845        .expect("actor rendering succeeds");
1846
1847        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1848        // Should use global strategy BOUNDED(3)
1849        assert_eq!(
1850            state.len(),
1851            3,
1852            "Should use global strategy BOUNDED(3) when job has no custom strategy"
1853        );
1854    }
1855
1856    /// Test that Fixed parallelism ignores strategy entirely.
1857    #[test]
1858    fn render_actors_fixed_parallelism_ignores_strategy() {
1859        let actor_id_counter = AtomicU32::new(0);
1860        let fragment_id: FragmentId = 1.into();
1861        let job_id: JobId = 102.into();
1862        let database_id: DatabaseId = DatabaseId::new(12);
1863
1864        // Fragment with FIXED parallelism
1865        let fragment_model = build_fragment(
1866            fragment_id,
1867            job_id,
1868            0,
1869            DistributionType::Hash,
1870            8,
1871            StreamingParallelism::Fixed(5),
1872        );
1873
1874        // Job has custom strategy, but it should be ignored for Fixed parallelism
1875        let job_model = streaming_job::Model {
1876            job_id,
1877            job_status: JobStatus::Created,
1878            create_type: CreateType::Foreground,
1879            timezone: None,
1880            config_override: None,
1881            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1882            parallelism: StreamingParallelism::Fixed(5),
1883            backfill_parallelism: None,
1884            backfill_orders: None,
1885            max_parallelism: 8,
1886            specific_resource_group: None,
1887            is_serverless_backfill: false,
1888        };
1889
1890        let database_model = database::Model {
1891            database_id,
1892            name: "test_db".into(),
1893            resource_group: "default".into(),
1894            barrier_interval_ms: None,
1895            checkpoint_frequency: None,
1896        };
1897
1898        let ensembles = vec![NoShuffleEnsemble {
1899            entries: HashSet::from([fragment_id]),
1900            components: HashSet::from([fragment_id]),
1901        }];
1902
1903        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1904        let job_map = HashMap::from([(job_id, job_model)]);
1905
1906        // 6 workers = total 6 parallelism
1907        let worker_map = BTreeMap::from([
1908            (
1909                1.into(),
1910                WorkerInfo {
1911                    parallelism: NonZeroUsize::new(1).unwrap(),
1912                    resource_group: Some("default".into()),
1913                },
1914            ),
1915            (
1916                2.into(),
1917                WorkerInfo {
1918                    parallelism: NonZeroUsize::new(1).unwrap(),
1919                    resource_group: Some("default".into()),
1920                },
1921            ),
1922            (
1923                3.into(),
1924                WorkerInfo {
1925                    parallelism: NonZeroUsize::new(1).unwrap(),
1926                    resource_group: Some("default".into()),
1927                },
1928            ),
1929            (
1930                4.into(),
1931                WorkerInfo {
1932                    parallelism: NonZeroUsize::new(1).unwrap(),
1933                    resource_group: Some("default".into()),
1934                },
1935            ),
1936            (
1937                5.into(),
1938                WorkerInfo {
1939                    parallelism: NonZeroUsize::new(1).unwrap(),
1940                    resource_group: Some("default".into()),
1941                },
1942            ),
1943            (
1944                6.into(),
1945                WorkerInfo {
1946                    parallelism: NonZeroUsize::new(1).unwrap(),
1947                    resource_group: Some("default".into()),
1948                },
1949            ),
1950        ]);
1951
1952        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1953        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1954        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1955        let database_map = HashMap::from([(database_id, database_model)]);
1956        let backfill_jobs = HashSet::new();
1957
1958        let context = RenderActorsContext {
1959            fragment_source_ids: &fragment_source_ids,
1960            fragment_splits: &fragment_splits,
1961            streaming_job_databases: &streaming_job_databases,
1962            database_map: &database_map,
1963            backfill_jobs: &backfill_jobs,
1964        };
1965
1966        let result = render_actors(
1967            &actor_id_counter,
1968            &ensembles,
1969            &fragment_map,
1970            &job_map,
1971            &worker_map,
1972            AdaptiveParallelismStrategy::Full,
1973            context,
1974        )
1975        .expect("actor rendering succeeds");
1976
1977        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1978        // Fixed(5) should be used, ignoring both job strategy BOUNDED(2) and global FULL
1979        assert_eq!(
1980            state.len(),
1981            5,
1982            "Fixed parallelism should ignore all strategies"
1983        );
1984    }
1985
1986    /// Test RATIO strategy calculation.
1987    #[test]
1988    fn render_actors_ratio_strategy() {
1989        let actor_id_counter = AtomicU32::new(0);
1990        let fragment_id: FragmentId = 1.into();
1991        let job_id: JobId = 103.into();
1992        let database_id: DatabaseId = DatabaseId::new(13);
1993
1994        let fragment_model = build_fragment(
1995            fragment_id,
1996            job_id,
1997            0,
1998            DistributionType::Hash,
1999            16,
2000            StreamingParallelism::Adaptive,
2001        );
2002
2003        // Job has RATIO(0.5) strategy
2004        let job_model = streaming_job::Model {
2005            job_id,
2006            job_status: JobStatus::Created,
2007            create_type: CreateType::Foreground,
2008            timezone: None,
2009            config_override: None,
2010            adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2011            parallelism: StreamingParallelism::Adaptive,
2012            backfill_parallelism: None,
2013            backfill_orders: None,
2014            max_parallelism: 16,
2015            specific_resource_group: None,
2016            is_serverless_backfill: false,
2017        };
2018
2019        let database_model = database::Model {
2020            database_id,
2021            name: "test_db".into(),
2022            resource_group: "default".into(),
2023            barrier_interval_ms: None,
2024            checkpoint_frequency: None,
2025        };
2026
2027        let ensembles = vec![NoShuffleEnsemble {
2028            entries: HashSet::from([fragment_id]),
2029            components: HashSet::from([fragment_id]),
2030        }];
2031
2032        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
2033        let job_map = HashMap::from([(job_id, job_model)]);
2034
2035        // 8 workers = total 8 parallelism
2036        let worker_map = BTreeMap::from([
2037            (
2038                1.into(),
2039                WorkerInfo {
2040                    parallelism: NonZeroUsize::new(1).unwrap(),
2041                    resource_group: Some("default".into()),
2042                },
2043            ),
2044            (
2045                2.into(),
2046                WorkerInfo {
2047                    parallelism: NonZeroUsize::new(1).unwrap(),
2048                    resource_group: Some("default".into()),
2049                },
2050            ),
2051            (
2052                3.into(),
2053                WorkerInfo {
2054                    parallelism: NonZeroUsize::new(1).unwrap(),
2055                    resource_group: Some("default".into()),
2056                },
2057            ),
2058            (
2059                4.into(),
2060                WorkerInfo {
2061                    parallelism: NonZeroUsize::new(1).unwrap(),
2062                    resource_group: Some("default".into()),
2063                },
2064            ),
2065            (
2066                5.into(),
2067                WorkerInfo {
2068                    parallelism: NonZeroUsize::new(1).unwrap(),
2069                    resource_group: Some("default".into()),
2070                },
2071            ),
2072            (
2073                6.into(),
2074                WorkerInfo {
2075                    parallelism: NonZeroUsize::new(1).unwrap(),
2076                    resource_group: Some("default".into()),
2077                },
2078            ),
2079            (
2080                7.into(),
2081                WorkerInfo {
2082                    parallelism: NonZeroUsize::new(1).unwrap(),
2083                    resource_group: Some("default".into()),
2084                },
2085            ),
2086            (
2087                8.into(),
2088                WorkerInfo {
2089                    parallelism: NonZeroUsize::new(1).unwrap(),
2090                    resource_group: Some("default".into()),
2091                },
2092            ),
2093        ]);
2094
2095        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2096        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2097        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2098        let database_map = HashMap::from([(database_id, database_model)]);
2099        let backfill_jobs = HashSet::new();
2100
2101        let context = RenderActorsContext {
2102            fragment_source_ids: &fragment_source_ids,
2103            fragment_splits: &fragment_splits,
2104            streaming_job_databases: &streaming_job_databases,
2105            database_map: &database_map,
2106            backfill_jobs: &backfill_jobs,
2107        };
2108
2109        let result = render_actors(
2110            &actor_id_counter,
2111            &ensembles,
2112            &fragment_map,
2113            &job_map,
2114            &worker_map,
2115            AdaptiveParallelismStrategy::Full,
2116            context,
2117        )
2118        .expect("actor rendering succeeds");
2119
2120        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2121        // RATIO(0.5) of 8 = 4
2122        assert_eq!(
2123            state.len(),
2124            4,
2125            "RATIO(0.5) of 8 workers should give 4 actors"
2126        );
2127    }
2128}