risingwave_meta/controller/
scale.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23#[cfg(debug_assertions)]
24use risingwave_common::hash::VnodeBitmapExt;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
28use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_meta_model::prelude::{
31    Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
32};
33use risingwave_meta_model::{
34    DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism, WorkerId,
35    database, fragment, fragment_relation, fragment_splits, object, sink, source, streaming_job,
36    table,
37};
38use risingwave_meta_model_migration::Condition;
39use risingwave_pb::common::WorkerNode;
40use risingwave_pb::stream_plan::PbStreamNode;
41use sea_orm::{
42    ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
43    RelationTrait,
44};
45
46use crate::MetaResult;
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::controller::utils::resolve_no_shuffle_actor_mapping;
49use crate::manager::ActiveStreamingWorkerNodes;
50use crate::model::{ActorId, StreamActor, StreamingJobModelContextExt};
51use crate::stream::{AssignerBuilder, SplitDiffOptions};
52
53pub(crate) async fn resolve_streaming_job_definition<C>(
54    txn: &C,
55    job_ids: &HashSet<JobId>,
56) -> MetaResult<HashMap<JobId, String>>
57where
58    C: ConnectionTrait,
59{
60    let job_ids = job_ids.iter().cloned().collect_vec();
61
62    // including table, materialized view, index
63    let common_job_definitions: Vec<(JobId, String)> = Table::find()
64        .select_only()
65        .columns([
66            table::Column::TableId,
67            #[cfg(not(debug_assertions))]
68            table::Column::Name,
69            #[cfg(debug_assertions)]
70            table::Column::Definition,
71        ])
72        .filter(table::Column::TableId.is_in(job_ids.clone()))
73        .into_tuple()
74        .all(txn)
75        .await?;
76
77    let sink_definitions: Vec<(JobId, String)> = Sink::find()
78        .select_only()
79        .columns([
80            sink::Column::SinkId,
81            #[cfg(not(debug_assertions))]
82            sink::Column::Name,
83            #[cfg(debug_assertions)]
84            sink::Column::Definition,
85        ])
86        .filter(sink::Column::SinkId.is_in(job_ids.clone()))
87        .into_tuple()
88        .all(txn)
89        .await?;
90
91    let source_definitions: Vec<(JobId, String)> = Source::find()
92        .select_only()
93        .columns([
94            source::Column::SourceId,
95            #[cfg(not(debug_assertions))]
96            source::Column::Name,
97            #[cfg(debug_assertions)]
98            source::Column::Definition,
99        ])
100        .filter(source::Column::SourceId.is_in(job_ids.clone()))
101        .into_tuple()
102        .all(txn)
103        .await?;
104
105    let definitions: HashMap<JobId, String> = common_job_definitions
106        .into_iter()
107        .chain(sink_definitions.into_iter())
108        .chain(source_definitions.into_iter())
109        .collect();
110
111    Ok(definitions)
112}
113
114pub async fn load_fragment_info<C>(
115    txn: &C,
116    actor_id_counter: &AtomicU32,
117    database_id: Option<DatabaseId>,
118    worker_nodes: &ActiveStreamingWorkerNodes,
119    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
120) -> MetaResult<FragmentRenderMap>
121where
122    C: ConnectionTrait,
123{
124    let mut query = StreamingJob::find()
125        .select_only()
126        .column(streaming_job::Column::JobId);
127
128    if let Some(database_id) = database_id {
129        query = query
130            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
131            .filter(object::Column::DatabaseId.eq(database_id));
132    }
133
134    let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
135
136    if jobs.is_empty() {
137        return Ok(HashMap::new());
138    }
139
140    let jobs: HashSet<JobId> = jobs.into_iter().collect();
141
142    let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
143
144    if loaded.is_empty() {
145        return Ok(HashMap::new());
146    }
147
148    let RenderedGraph { fragments, .. } = render_actor_assignments(
149        actor_id_counter,
150        worker_nodes.current(),
151        adaptive_parallelism_strategy,
152        &loaded,
153    )?;
154
155    Ok(fragments)
156}
157
158#[derive(Debug)]
159pub struct TargetResourcePolicy {
160    pub resource_group: Option<String>,
161    pub parallelism: StreamingParallelism,
162}
163
164#[derive(Debug, Clone)]
165pub struct WorkerInfo {
166    pub parallelism: NonZeroUsize,
167    pub resource_group: Option<String>,
168}
169
170pub type FragmentRenderMap =
171    HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
172
173#[derive(Default)]
174pub struct RenderedGraph {
175    pub fragments: FragmentRenderMap,
176    pub ensembles: Vec<NoShuffleEnsemble>,
177}
178
179impl RenderedGraph {
180    pub fn empty() -> Self {
181        Self::default()
182    }
183}
184
185/// Context loaded asynchronously from database, containing all metadata
186/// required to render actor assignments. This separates async I/O from
187/// sync rendering logic.
188#[derive(Clone, Debug)]
189pub struct LoadedFragment {
190    pub fragment_id: FragmentId,
191    pub job_id: JobId,
192    pub fragment_type_mask: FragmentTypeMask,
193    pub distribution_type: DistributionType,
194    pub vnode_count: usize,
195    pub nodes: PbStreamNode,
196    pub state_table_ids: HashSet<TableId>,
197    pub parallelism: Option<StreamingParallelism>,
198}
199
200impl From<fragment::Model> for LoadedFragment {
201    fn from(model: fragment::Model) -> Self {
202        Self {
203            fragment_id: model.fragment_id,
204            job_id: model.job_id,
205            fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
206            distribution_type: model.distribution_type,
207            vnode_count: model.vnode_count as usize,
208            nodes: model.stream_node.to_protobuf(),
209            state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
210            parallelism: model.parallelism,
211        }
212    }
213}
214
215#[derive(Default, Debug, Clone)]
216pub struct LoadedFragmentContext {
217    pub ensembles: Vec<NoShuffleEnsemble>,
218    pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
219    pub job_map: HashMap<JobId, streaming_job::Model>,
220    pub streaming_job_databases: HashMap<JobId, DatabaseId>,
221    pub database_map: HashMap<DatabaseId, database::Model>,
222    pub fragment_source_ids: HashMap<FragmentId, SourceId>,
223    pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
224}
225
226impl LoadedFragmentContext {
227    pub fn is_empty(&self) -> bool {
228        if self.ensembles.is_empty() {
229            assert!(
230                self.job_fragments.is_empty(),
231                "non-empty job fragments for empty ensembles: {:?}",
232                self.job_fragments
233            );
234            true
235        } else {
236            false
237        }
238    }
239
240    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
241        let job_ids: HashSet<JobId> = self
242            .streaming_job_databases
243            .iter()
244            .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
245            .collect();
246
247        if job_ids.is_empty() {
248            return None;
249        }
250
251        let job_fragments: HashMap<_, _> = job_ids
252            .iter()
253            .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
254            .collect();
255
256        let fragment_ids: HashSet<_> = job_fragments
257            .values()
258            .flat_map(|fragments| fragments.keys().copied())
259            .collect();
260
261        assert!(
262            !fragment_ids.is_empty(),
263            "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
264        );
265
266        let ensembles: Vec<NoShuffleEnsemble> = self
267            .ensembles
268            .iter()
269            .filter(|ensemble| {
270                if ensemble
271                    .components
272                    .iter()
273                    .any(|fragment_id| fragment_ids.contains(fragment_id))
274                {
275                    assert!(
276                        ensemble
277                            .components
278                            .iter()
279                            .all(|fragment_id| fragment_ids.contains(fragment_id)),
280                        "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
281                    );
282                    true
283                } else {
284                    false
285                }
286            })
287            .cloned()
288            .collect();
289
290        assert!(
291            !ensembles.is_empty(),
292            "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
293        );
294
295        let job_map = job_ids
296            .iter()
297            .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
298            .collect();
299
300        let streaming_job_databases = job_ids
301            .iter()
302            .filter_map(|job_id| {
303                self.streaming_job_databases
304                    .get(job_id)
305                    .map(|db_id| (*job_id, *db_id))
306            })
307            .collect();
308
309        let database_model = self.database_map[&database_id].clone();
310        let database_map = HashMap::from([(database_id, database_model)]);
311
312        let fragment_source_ids = self
313            .fragment_source_ids
314            .iter()
315            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
316            .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
317            .collect();
318
319        let fragment_splits = self
320            .fragment_splits
321            .iter()
322            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
323            .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
324            .collect();
325
326        Some(Self {
327            ensembles,
328            job_fragments,
329            job_map,
330            streaming_job_databases,
331            database_map,
332            fragment_source_ids,
333            fragment_splits,
334        })
335    }
336
337    /// Split this loaded context by database in a single ownership pass to avoid cloning large
338    /// fragment payloads (for example `stream_node` in `LoadedFragment`).
339    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
340        let Self {
341            ensembles,
342            mut job_fragments,
343            mut job_map,
344            streaming_job_databases,
345            mut database_map,
346            mut fragment_source_ids,
347            mut fragment_splits,
348        } = self;
349
350        let mut contexts = HashMap::<DatabaseId, Self>::new();
351        let mut fragment_databases = HashMap::<FragmentId, DatabaseId>::new();
352        let mut unresolved_ensembles = 0usize;
353        let mut unresolved_ensemble_sample: Option<Vec<FragmentId>> = None;
354
355        for (job_id, database_id) in streaming_job_databases {
356            let context = contexts.entry(database_id).or_insert_with(|| {
357                let database_model = database_map
358                    .remove(&database_id)
359                    .expect("database should exist for streaming job");
360                Self {
361                    ensembles: Vec::new(),
362                    job_fragments: HashMap::new(),
363                    job_map: HashMap::new(),
364                    streaming_job_databases: HashMap::new(),
365                    database_map: HashMap::from([(database_id, database_model)]),
366                    fragment_source_ids: HashMap::new(),
367                    fragment_splits: HashMap::new(),
368                }
369            });
370
371            let fragments = job_fragments
372                .remove(&job_id)
373                .expect("job fragments should exist for streaming job");
374            for fragment_id in fragments.keys().copied() {
375                fragment_databases.insert(fragment_id, database_id);
376                if let Some(source_id) = fragment_source_ids.remove(&fragment_id) {
377                    context.fragment_source_ids.insert(fragment_id, source_id);
378                }
379                if let Some(splits) = fragment_splits.remove(&fragment_id) {
380                    context.fragment_splits.insert(fragment_id, splits);
381                }
382            }
383
384            assert!(
385                context
386                    .job_map
387                    .insert(
388                        job_id,
389                        job_map
390                            .remove(&job_id)
391                            .expect("streaming job should exist for loaded context"),
392                    )
393                    .is_none(),
394                "duplicated streaming job"
395            );
396            assert!(
397                context.job_fragments.insert(job_id, fragments).is_none(),
398                "duplicated job fragments"
399            );
400            assert!(
401                context
402                    .streaming_job_databases
403                    .insert(job_id, database_id)
404                    .is_none(),
405                "duplicated job database mapping"
406            );
407        }
408
409        for ensemble in ensembles {
410            let Some(database_id) = ensemble
411                .components
412                .iter()
413                .find_map(|fragment_id| fragment_databases.get(fragment_id).copied())
414            else {
415                unresolved_ensembles += 1;
416                if unresolved_ensemble_sample.is_none() {
417                    unresolved_ensemble_sample =
418                        Some(ensemble.components.iter().copied().collect());
419                }
420                continue;
421            };
422
423            debug_assert!(
424                ensemble
425                    .components
426                    .iter()
427                    .all(|fragment_id| fragment_databases.get(fragment_id) == Some(&database_id)),
428                "ensemble {ensemble:?} should belong to a single database"
429            );
430
431            contexts
432                .get_mut(&database_id)
433                .expect("database context should exist for ensemble")
434                .ensembles
435                .push(ensemble);
436        }
437
438        if unresolved_ensembles > 0 {
439            tracing::warn!(
440                unresolved_ensembles,
441                ?unresolved_ensemble_sample,
442                known_fragments = fragment_databases.len(),
443                "skip ensembles without resolved database while splitting loaded context"
444            );
445        }
446        debug_assert_eq!(
447            unresolved_ensembles, 0,
448            "all ensembles should be mappable to a database"
449        );
450
451        contexts
452    }
453}
454
455/// Async load stage for fragment-scoped rendering. It resolves all metadata required to later
456/// render actor assignments with arbitrary worker sets.
457pub async fn load_fragment_context<C>(
458    txn: &C,
459    ensembles: Vec<NoShuffleEnsemble>,
460) -> MetaResult<LoadedFragmentContext>
461where
462    C: ConnectionTrait,
463{
464    if ensembles.is_empty() {
465        return Ok(LoadedFragmentContext::default());
466    }
467
468    let required_fragment_ids: HashSet<_> = ensembles
469        .iter()
470        .flat_map(|ensemble| ensemble.components.iter().copied())
471        .collect();
472
473    let fragment_models = Fragment::find()
474        .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
475        .all(txn)
476        .await?;
477
478    let found_fragment_ids: HashSet<_> = fragment_models
479        .iter()
480        .map(|fragment| fragment.fragment_id)
481        .collect();
482
483    if found_fragment_ids.len() != required_fragment_ids.len() {
484        let missing = required_fragment_ids
485            .difference(&found_fragment_ids)
486            .copied()
487            .collect_vec();
488        return Err(anyhow!("fragments {:?} not found", missing).into());
489    }
490
491    let fragment_models: HashMap<_, _> = fragment_models
492        .into_iter()
493        .map(|fragment| (fragment.fragment_id, fragment))
494        .collect();
495
496    let job_ids: HashSet<_> = fragment_models
497        .values()
498        .map(|fragment| fragment.job_id)
499        .collect();
500
501    if job_ids.is_empty() {
502        return Ok(LoadedFragmentContext::default());
503    }
504
505    let jobs: HashMap<_, _> = StreamingJob::find()
506        .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
507        .all(txn)
508        .await?
509        .into_iter()
510        .map(|job| (job.job_id, job))
511        .collect();
512
513    let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
514    if found_job_ids.len() != job_ids.len() {
515        let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
516        return Err(anyhow!("streaming jobs {:?} not found", missing).into());
517    }
518
519    build_loaded_context(txn, ensembles, fragment_models, jobs).await
520}
521
522/// Async load stage for job-scoped rendering. It collects all no-shuffle ensembles and the
523/// metadata required to render actor assignments later with a provided worker set.
524pub async fn load_fragment_context_for_jobs<C>(
525    txn: &C,
526    job_ids: HashSet<JobId>,
527) -> MetaResult<LoadedFragmentContext>
528where
529    C: ConnectionTrait,
530{
531    if job_ids.is_empty() {
532        return Ok(LoadedFragmentContext::default());
533    }
534
535    let excluded_fragments_query = FragmentRelation::find()
536        .select_only()
537        .column(fragment_relation::Column::TargetFragmentId)
538        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
539        .into_query();
540
541    let condition = Condition::all()
542        .add(fragment::Column::JobId.is_in(job_ids.clone()))
543        .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
544
545    let fragments: Vec<FragmentId> = Fragment::find()
546        .select_only()
547        .column(fragment::Column::FragmentId)
548        .filter(condition)
549        .into_tuple()
550        .all(txn)
551        .await?;
552
553    let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
554
555    let fragments = Fragment::find()
556        .filter(
557            fragment::Column::FragmentId.is_in(
558                ensembles
559                    .iter()
560                    .flat_map(|graph| graph.components.iter())
561                    .cloned()
562                    .collect_vec(),
563            ),
564        )
565        .all(txn)
566        .await?;
567
568    let fragment_map: HashMap<_, _> = fragments
569        .into_iter()
570        .map(|fragment| (fragment.fragment_id, fragment))
571        .collect();
572
573    let job_ids = fragment_map
574        .values()
575        .map(|fragment| fragment.job_id)
576        .collect::<BTreeSet<_>>()
577        .into_iter()
578        .collect_vec();
579
580    let jobs: HashMap<_, _> = StreamingJob::find()
581        .filter(streaming_job::Column::JobId.is_in(job_ids))
582        .all(txn)
583        .await?
584        .into_iter()
585        .map(|job| (job.job_id, job))
586        .collect();
587
588    build_loaded_context(txn, ensembles, fragment_map, jobs).await
589}
590
591/// Sync render stage: uses loaded fragment context and current worker info
592/// to produce actor-to-worker assignments and vnode bitmaps.
593pub(crate) fn render_actor_assignments(
594    actor_id_counter: &AtomicU32,
595    worker_map: &HashMap<WorkerId, WorkerNode>,
596    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
597    loaded: &LoadedFragmentContext,
598) -> MetaResult<RenderedGraph> {
599    let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
600    render_actor_assignments_with_allocator(
601        &mut actor_id_allocator,
602        worker_map,
603        adaptive_parallelism_strategy,
604        loaded,
605    )
606}
607
608/// Render a graph with preview-only actor ids so callers can compare layouts
609/// without consuming the global actor id generator.
610pub(crate) fn preview_actor_assignments(
611    worker_map: &HashMap<WorkerId, WorkerNode>,
612    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
613    loaded: &LoadedFragmentContext,
614) -> MetaResult<RenderedGraph> {
615    let mut actor_id_allocator = RenderActorIdAllocator::Preview { next_actor_id: 0 };
616    render_actor_assignments_with_allocator(
617        &mut actor_id_allocator,
618        worker_map,
619        adaptive_parallelism_strategy,
620        loaded,
621    )
622}
623
624/// Replace preview actor ids with real actor ids after the caller has decided
625/// the rendered layout is not a no-op.
626pub(crate) fn materialize_actor_assignments(
627    actor_id_counter: &AtomicU32,
628    rendered: RenderedGraph,
629) -> RenderedGraph {
630    let RenderedGraph {
631        fragments,
632        ensembles,
633    } = rendered;
634    #[cfg(debug_assertions)]
635    let preview_slots = collect_relative_actor_slots_by_fragment(&fragments);
636
637    let mut ordered_fragments = fragments
638        .into_iter()
639        .flat_map(|(database_id, jobs)| {
640            jobs.into_iter().flat_map(move |(job_id, fragments)| {
641                fragments.into_iter().map(move |(fragment_id, fragment)| {
642                    (database_id, job_id, fragment_id, fragment)
643                })
644            })
645        })
646        .collect_vec();
647    // Preserve preview order when remapping ids so fragments from the same
648    // no-shuffle ensemble keep their relative slot alignment.
649    //
650    // This works because the preview allocator assigns actor ids with a single
651    // monotonically-increasing counter, so `min(actor_id)` per fragment faithfully
652    // reproduces the original allocation order. If the preview allocator ever
653    // changes to a non-monotonic scheme this sort key must be revisited.
654    //
655    // Empty fragments should not appear, but fragment_id keeps the ordering
656    // deterministic if they do.
657    ordered_fragments.sort_by_key(|(_, _, fragment_id, fragment)| {
658        (
659            fragment
660                .actors
661                .keys()
662                .copied()
663                .min()
664                .map(|actor_id| actor_id.as_raw_id())
665                .unwrap_or(u32::MAX),
666            *fragment_id,
667        )
668    });
669
670    let mut materialized = FragmentRenderMap::new();
671    for (database_id, job_id, fragment_id, fragment) in ordered_fragments {
672        materialized
673            .entry(database_id)
674            .or_default()
675            .entry(job_id)
676            .or_default()
677            .insert(
678                fragment_id,
679                materialize_fragment(fragment, actor_id_counter),
680            );
681    }
682
683    #[cfg(debug_assertions)]
684    assert_materialization_preserves_preview_slots(&preview_slots, &materialized);
685
686    RenderedGraph {
687        fragments: materialized,
688        ensembles,
689    }
690}
691
692#[cfg(debug_assertions)]
693type RelativeActorSlot = (u32, WorkerId, Option<Vec<usize>>, Vec<SplitId>);
694
695#[cfg(debug_assertions)]
696fn collect_relative_actor_slots(fragment: &InflightFragmentInfo) -> Vec<RelativeActorSlot> {
697    let base = fragment
698        .actors
699        .keys()
700        .copied()
701        .min()
702        .map(|actor_id| actor_id.as_raw_id())
703        .unwrap_or_default();
704
705    let mut entries = fragment
706        .actors
707        .iter()
708        .map(|(&actor_id, info)| {
709            let idx = actor_id.as_raw_id() - base;
710            let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
711                bitmap
712                    .iter_vnodes()
713                    .map(|vnode| vnode.to_index())
714                    .collect_vec()
715            });
716            let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
717            splits.sort_unstable();
718            (idx, info.worker_id, vnode_bitmap, splits)
719        })
720        .collect_vec();
721    entries.sort_unstable_by_key(|(idx, ..)| *idx);
722    entries
723}
724
725#[cfg(debug_assertions)]
726fn collect_relative_actor_slots_by_fragment(
727    fragments: &FragmentRenderMap,
728) -> HashMap<FragmentId, Vec<RelativeActorSlot>> {
729    fragments
730        .values()
731        .flat_map(|jobs| jobs.values())
732        .flatten()
733        .map(|(fragment_id, fragment)| (*fragment_id, collect_relative_actor_slots(fragment)))
734        .collect()
735}
736
737#[cfg(debug_assertions)]
738fn assert_materialization_preserves_preview_slots(
739    preview_slots: &HashMap<FragmentId, Vec<RelativeActorSlot>>,
740    materialized_fragments: &FragmentRenderMap,
741) {
742    for (database_id, jobs) in materialized_fragments {
743        for (job_id, fragments) in jobs {
744            for (fragment_id, fragment) in fragments {
745                let expected_slots = preview_slots.get(fragment_id).unwrap_or_else(|| {
746                    panic!(
747                        "preview fragment {fragment_id} for database {database_id}, job {job_id} not found"
748                    )
749                });
750
751                assert_eq!(
752                    collect_relative_actor_slots(fragment),
753                    *expected_slots,
754                    "materialization changed preview slot ordering for database {database_id}, job {job_id}, fragment {fragment_id}"
755                );
756            }
757        }
758    }
759}
760
761fn render_actor_assignments_with_allocator(
762    actor_id_allocator: &mut RenderActorIdAllocator<'_>,
763    worker_map: &HashMap<WorkerId, WorkerNode>,
764    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
765    loaded: &LoadedFragmentContext,
766) -> MetaResult<RenderedGraph> {
767    if loaded.is_empty() {
768        return Ok(RenderedGraph::empty());
769    }
770
771    let render_context = RenderActorsContext {
772        fragment_source_ids: &loaded.fragment_source_ids,
773        fragment_splits: &loaded.fragment_splits,
774        streaming_job_databases: &loaded.streaming_job_databases,
775        database_map: &loaded.database_map,
776    };
777
778    let fragments = render_actors_with_allocator(
779        actor_id_allocator,
780        &loaded.ensembles,
781        &loaded.job_fragments,
782        &loaded.job_map,
783        worker_map,
784        adaptive_parallelism_strategy,
785        render_context,
786    )?;
787
788    Ok(RenderedGraph {
789        fragments,
790        ensembles: loaded.ensembles.clone(),
791    })
792}
793
794fn materialize_fragment(
795    mut fragment: InflightFragmentInfo,
796    actor_id_counter: &AtomicU32,
797) -> InflightFragmentInfo {
798    let actor_count = u32::try_from(fragment.actors.len()).expect("actor count exceeds u32::MAX");
799    let actor_id_base: ActorId = actor_id_counter
800        .fetch_add(actor_count, Ordering::Relaxed)
801        .into();
802
803    let mut actors = fragment.actors.into_iter().collect_vec();
804    actors.sort_by_key(|(actor_id, _)| *actor_id);
805
806    fragment.actors = actors
807        .into_iter()
808        .enumerate()
809        .map(|(idx, (_, info))| {
810            let actor_offset = u32::try_from(idx).expect("actor index exceeds u32::MAX");
811            (actor_id_base + actor_offset, info)
812        })
813        .collect();
814
815    fragment
816}
817
818async fn build_loaded_context<C>(
819    txn: &C,
820    ensembles: Vec<NoShuffleEnsemble>,
821    fragment_models: HashMap<FragmentId, fragment::Model>,
822    job_map: HashMap<JobId, streaming_job::Model>,
823) -> MetaResult<LoadedFragmentContext>
824where
825    C: ConnectionTrait,
826{
827    if ensembles.is_empty() {
828        return Ok(LoadedFragmentContext::default());
829    }
830
831    let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
832    for (fragment_id, model) in fragment_models {
833        job_fragments
834            .entry(model.job_id)
835            .or_default()
836            .try_insert(fragment_id, LoadedFragment::from(model))
837            .expect("duplicate fragment id for job");
838    }
839
840    #[cfg(debug_assertions)]
841    {
842        debug_sanity_check(&ensembles, &job_fragments, &job_map);
843    }
844
845    let (fragment_source_ids, fragment_splits) =
846        resolve_source_fragments(txn, &job_fragments).await?;
847
848    let job_ids = job_map.keys().copied().collect_vec();
849
850    let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
851        .select_only()
852        .column(streaming_job::Column::JobId)
853        .column(object::Column::DatabaseId)
854        .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
855        .filter(streaming_job::Column::JobId.is_in(job_ids))
856        .into_tuple()
857        .all(txn)
858        .await?
859        .into_iter()
860        .collect();
861
862    let database_map: HashMap<_, _> = Database::find()
863        .filter(
864            database::Column::DatabaseId
865                .is_in(streaming_job_databases.values().copied().collect_vec()),
866        )
867        .all(txn)
868        .await?
869        .into_iter()
870        .map(|db| (db.database_id, db))
871        .collect();
872
873    Ok(LoadedFragmentContext {
874        ensembles,
875        job_fragments,
876        job_map,
877        streaming_job_databases,
878        database_map,
879        fragment_source_ids,
880        fragment_splits,
881    })
882}
883
884// Only metadata resolved asynchronously lives here so the renderer stays synchronous
885// and the call site keeps the runtime dependencies (maps, strategy, actor counter, etc.) explicit.
886struct RenderActorsContext<'a> {
887    fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
888    fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
889    streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
890    database_map: &'a HashMap<DatabaseId, database::Model>,
891}
892
893enum RenderActorIdAllocator<'a> {
894    Persistent(&'a AtomicU32),
895    Preview { next_actor_id: u32 },
896}
897
898impl RenderActorIdAllocator<'_> {
899    fn allocate_block(&mut self, actor_count: u32) -> ActorId {
900        match self {
901            Self::Persistent(actor_id_counter) => {
902                let actor_id_base: ActorId = actor_id_counter
903                    .fetch_add(actor_count, Ordering::Relaxed)
904                    .into();
905                actor_id_base
906            }
907            Self::Preview { next_actor_id } => {
908                let actor_id_base = *next_actor_id;
909                *next_actor_id = next_actor_id
910                    .checked_add(actor_count)
911                    .expect("preview actor id overflow");
912                let actor_id_base: ActorId = actor_id_base.into();
913                actor_id_base
914            }
915        }
916    }
917}
918
919#[cfg(test)]
920fn render_actors(
921    actor_id_counter: &AtomicU32,
922    ensembles: &[NoShuffleEnsemble],
923    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
924    job_map: &HashMap<JobId, streaming_job::Model>,
925    worker_map: &HashMap<WorkerId, WorkerNode>,
926    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
927    context: RenderActorsContext<'_>,
928) -> MetaResult<FragmentRenderMap> {
929    let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
930    render_actors_with_allocator(
931        &mut actor_id_allocator,
932        ensembles,
933        job_fragments,
934        job_map,
935        worker_map,
936        adaptive_parallelism_strategy,
937        context,
938    )
939}
940
941fn render_actors_with_allocator(
942    actor_id_allocator: &mut RenderActorIdAllocator<'_>,
943    ensembles: &[NoShuffleEnsemble],
944    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
945    job_map: &HashMap<JobId, streaming_job::Model>,
946    worker_map: &HashMap<WorkerId, WorkerNode>,
947    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
948    context: RenderActorsContext<'_>,
949) -> MetaResult<FragmentRenderMap> {
950    let RenderActorsContext {
951        fragment_source_ids,
952        fragment_splits: fragment_splits_map,
953        streaming_job_databases,
954        database_map,
955    } = context;
956
957    let mut all_fragments: FragmentRenderMap = HashMap::new();
958    let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
959        .values()
960        .flat_map(|fragments| fragments.iter())
961        .map(|(fragment_id, fragment)| (*fragment_id, fragment))
962        .collect();
963
964    for NoShuffleEnsemble {
965        entries,
966        components,
967    } in ensembles
968    {
969        tracing::debug!("rendering ensemble entries {:?}", entries);
970
971        let entry_fragments = entries
972            .iter()
973            .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
974            .collect_vec();
975
976        let entry_fragment_parallelism = entry_fragments
977            .iter()
978            .map(|fragment| fragment.parallelism.clone())
979            .dedup()
980            .exactly_one()
981            .map_err(|_| {
982                anyhow!(
983                    "entry fragments {:?} have inconsistent parallelism settings",
984                    entries.iter().copied().collect_vec()
985                )
986            })?;
987
988        let (job_id, distribution_type, vnode_count) = entry_fragments
989            .iter()
990            .map(|f| (f.job_id, f.distribution_type, f.vnode_count))
991            .dedup()
992            .exactly_one()
993            .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
994
995        let job = job_map
996            .get(&job_id)
997            .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
998
999        let database_resource_group = streaming_job_databases
1000            .get(&job_id)
1001            .and_then(|database_id| database_map.get(database_id))
1002            .unwrap()
1003            .resource_group
1004            .clone();
1005
1006        let source_entry_fragment = entry_fragments.iter().find(|f| {
1007            let mask = f.fragment_type_mask;
1008            if mask.contains(FragmentTypeFlag::Source) {
1009                assert!(!mask.contains(FragmentTypeFlag::SourceScan))
1010            }
1011            mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
1012        });
1013
1014        let actor_template = EnsembleActorTemplate::render_new(
1015            job,
1016            worker_map,
1017            adaptive_parallelism_strategy,
1018            entry_fragment_parallelism,
1019            database_resource_group,
1020            distribution_type,
1021            vnode_count,
1022        )?;
1023
1024        let source_splits = match source_entry_fragment {
1025            Some(entry_fragment) => {
1026                let source_id = fragment_source_ids
1027                    .get(&entry_fragment.fragment_id)
1028                    .ok_or_else(|| {
1029                        anyhow!(
1030                            "missing source id in source fragment {}",
1031                            entry_fragment.fragment_id
1032                        )
1033                    })?;
1034
1035                let entry_fragment_id = entry_fragment.fragment_id;
1036
1037                let splits = fragment_splits_map
1038                    .get(&entry_fragment_id)
1039                    .cloned()
1040                    .unwrap_or_default();
1041
1042                let splits: std::collections::BTreeMap<_, _> =
1043                    splits.into_iter().map(|s| (s.id(), s)).collect();
1044                let splits = actor_template.assign_splits(entry_fragment_id, splits);
1045                Some((splits, *source_id))
1046            }
1047            None => None,
1048        };
1049
1050        for component_fragment_id in components {
1051            let fragment = fragment_lookup.get(component_fragment_id).unwrap();
1052            let fragment_id = fragment.fragment_id;
1053            let job_id = fragment.job_id;
1054            let fragment_type_mask = fragment.fragment_type_mask;
1055            let distribution_type = fragment.distribution_type;
1056            let stream_node = &fragment.nodes;
1057            let state_table_ids = &fragment.state_table_ids;
1058            let vnode_count = fragment.vnode_count;
1059            let source_id = fragment_source_ids.get(&fragment_id).cloned();
1060
1061            let aligner = ComponentFragmentAligner::new(&actor_template, actor_id_allocator);
1062            let actors = aligner.align_component_actor(distribution_type);
1063            let mut splits = source_id
1064                .map(|source_id| {
1065                    let (fragment_splits, shared_source_id) = source_splits.as_ref().unwrap();
1066                    assert_eq!(*shared_source_id, source_id);
1067                    aligner.align_component_splits(fragment_splits)
1068                })
1069                .unwrap_or_default();
1070
1071            let actors: HashMap<ActorId, InflightActorInfo> = actors
1072                .into_iter()
1073                .map(|(actor_id, (worker_id, vnode_bitmap))| {
1074                    (
1075                        actor_id,
1076                        InflightActorInfo {
1077                            worker_id,
1078                            vnode_bitmap,
1079                            splits: splits.remove(&actor_id).unwrap_or_default(),
1080                        },
1081                    )
1082                })
1083                .collect();
1084
1085            let fragment = InflightFragmentInfo {
1086                fragment_id,
1087                distribution_type,
1088                fragment_type_mask,
1089                vnode_count,
1090                nodes: stream_node.clone(),
1091                actors,
1092                state_table_ids: state_table_ids.clone(),
1093            };
1094
1095            let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
1096                anyhow!("streaming job {job_id} not found in streaming_job_databases")
1097            })?;
1098
1099            all_fragments
1100                .entry(database_id)
1101                .or_default()
1102                .entry(job_id)
1103                .or_default()
1104                .insert(fragment_id, fragment);
1105        }
1106    }
1107
1108    Ok(all_fragments)
1109}
1110
1111pub(crate) struct EnsembleActorTemplate {
1112    assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>>,
1113    distribution_type: DistributionType,
1114    actor_count: u32,
1115}
1116
1117impl EnsembleActorTemplate {
1118    pub(crate) fn render_new(
1119        job: &streaming_job::Model,
1120        worker_map: &HashMap<WorkerId, WorkerNode>,
1121        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1122        entry_fragment_parallelism: Option<StreamingParallelism>,
1123        database_resource_group: String,
1124        distribution_type: DistributionType,
1125        vnode_count: usize,
1126    ) -> MetaResult<Self> {
1127        let job_id = job.job_id;
1128        let job_strategy = job
1129            .stream_context()
1130            .adaptive_parallelism_strategy
1131            .unwrap_or(adaptive_parallelism_strategy);
1132
1133        let resource_group = match &job.specific_resource_group {
1134            None => database_resource_group,
1135            Some(resource_group) => resource_group.clone(),
1136        };
1137
1138        let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
1139            .iter()
1140            .filter_map(|(worker_id, worker)| {
1141                if worker
1142                    .resource_group()
1143                    .as_deref()
1144                    .unwrap_or(DEFAULT_RESOURCE_GROUP)
1145                    == resource_group.as_str()
1146                {
1147                    Some((
1148                        *worker_id,
1149                        worker
1150                            .parallelism()
1151                            .expect("should have parallelism for compute node")
1152                            .try_into()
1153                            .expect("parallelism for compute node"),
1154                    ))
1155                } else {
1156                    None
1157                }
1158            })
1159            .collect();
1160
1161        let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
1162
1163        let effective_job_parallelism = if job.job_status != JobStatus::Created {
1164            job.backfill_parallelism
1165                .as_ref()
1166                .unwrap_or(&job.parallelism)
1167        } else {
1168            &job.parallelism
1169        };
1170
1171        let actual_parallelism = match entry_fragment_parallelism
1172            .as_ref()
1173            .unwrap_or(effective_job_parallelism)
1174        {
1175            StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
1176                job_strategy.compute_target_parallelism(total_parallelism)
1177            }
1178            StreamingParallelism::Fixed(n) => *n,
1179        }
1180        .min(vnode_count)
1181        .min(job.max_parallelism as usize);
1182
1183        tracing::debug!(
1184            "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
1185            job_id,
1186            actual_parallelism,
1187            job.parallelism,
1188            total_parallelism,
1189            job.max_parallelism,
1190            vnode_count,
1191            entry_fragment_parallelism
1192        );
1193
1194        let assigner = AssignerBuilder::new(job_id).build();
1195
1196        let actors = (0..(actual_parallelism as u32)).collect_vec();
1197        let vnodes = (0..vnode_count).collect_vec();
1198
1199        let raw_assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
1200
1201        let assignment = raw_assignment
1202            .into_iter()
1203            .map(|(worker_id, actors)| {
1204                let actors = actors
1205                    .into_iter()
1206                    .map(|(actor_idx, vnodes)| {
1207                        let bitmap = match distribution_type {
1208                            DistributionType::Single => None,
1209                            DistributionType::Hash => {
1210                                Some(Bitmap::from_indices(vnode_count, &vnodes))
1211                            }
1212                        };
1213                        (actor_idx, bitmap)
1214                    })
1215                    .collect();
1216                (worker_id, actors)
1217            })
1218            .collect();
1219
1220        let actor_count = u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
1221
1222        Ok(Self {
1223            assignment,
1224            distribution_type,
1225            actor_count,
1226        })
1227    }
1228
1229    pub(crate) fn from_existing_inflight_fragment(fragment: &InflightFragmentInfo) -> Self {
1230        if fragment.actors.is_empty() {
1231            return Self {
1232                assignment: BTreeMap::new(),
1233                distribution_type: fragment.distribution_type,
1234                actor_count: 0,
1235            };
1236        }
1237
1238        let actor_count = fragment.actors.len() as u32;
1239
1240        let mut assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>> = BTreeMap::new();
1241        // Enumerate actors starting from 0 index, instead of deriving from actor_id.
1242        for (actor_idx, (&_actor_id, actor_info)) in fragment.actors.iter().enumerate() {
1243            let actor_idx = actor_idx as u32;
1244            assignment
1245                .entry(actor_info.worker_id)
1246                .or_default()
1247                .insert(actor_idx, actor_info.vnode_bitmap.clone());
1248        }
1249
1250        Self {
1251            assignment,
1252            distribution_type: fragment.distribution_type,
1253            actor_count,
1254        }
1255    }
1256
1257    /// Assert that two `EnsembleActorTemplate` are aligned: same distribution type,
1258    /// same actor count, and same vnode bitmap / worker placement. Used to verify that
1259    /// multiple existing fragments within the same no-shuffle ensemble are consistent.
1260    ///
1261    /// Internally calls [`resolve_no_shuffle_actor_mapping`] which asserts distribution
1262    /// type, count, and bitmap equality. Then additionally verifies worker placement.
1263    pub(crate) fn assert_aligned_with(
1264        &self,
1265        other: &Self,
1266        self_fragment_id: FragmentId,
1267        other_fragment_id: FragmentId,
1268    ) {
1269        let mapping = resolve_no_shuffle_actor_mapping(
1270            self.distribution_type,
1271            self.assignment
1272                .iter()
1273                .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1274            other.distribution_type,
1275            other
1276                .assignment
1277                .iter()
1278                .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1279        );
1280
1281        for ((self_worker, _self_idx), (other_worker, _other_idx)) in &mapping {
1282            assert_eq!(
1283                self_worker, other_worker,
1284                "fragments {} and {} disagree on worker placement: {:?}",
1285                self_fragment_id, other_fragment_id, mapping,
1286            );
1287        }
1288    }
1289
1290    fn assign_splits(
1291        &self,
1292        entry_fragment_id: FragmentId,
1293        splits: BTreeMap<SplitId, SplitImpl>,
1294    ) -> HashMap<u32, Vec<SplitImpl>> {
1295        {
1296            {
1297                let empty_actor_splits: HashMap<_, _> = self
1298                    .assignment
1299                    .values()
1300                    .flat_map(|actors| actors.keys())
1301                    .map(|actor_id| (*actor_id, vec![]))
1302                    .collect();
1303
1304                crate::stream::source_manager::reassign_splits(
1305                    entry_fragment_id,
1306                    empty_actor_splits,
1307                    &splits,
1308                    SplitDiffOptions::default(),
1309                )
1310                .unwrap_or_default()
1311            }
1312        }
1313    }
1314}
1315
1316pub(crate) struct ComponentFragmentAligner<'a> {
1317    actor_template: &'a EnsembleActorTemplate,
1318    actor_id_base: ActorId,
1319}
1320
1321impl<'a> ComponentFragmentAligner<'a> {
1322    fn new(
1323        actor_template: &'a EnsembleActorTemplate,
1324        actor_id_allocator: &mut RenderActorIdAllocator<'_>,
1325    ) -> Self {
1326        let actor_id_base = actor_id_allocator.allocate_block(actor_template.actor_count);
1327        Self {
1328            actor_template,
1329            actor_id_base,
1330        }
1331    }
1332
1333    pub(crate) fn new_persistent(
1334        actor_template: &'a EnsembleActorTemplate,
1335        actor_id_counter: &AtomicU32,
1336    ) -> Self {
1337        let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
1338        Self::new(actor_template, &mut actor_id_allocator)
1339    }
1340
1341    pub(crate) fn align_component_actor(
1342        &self,
1343        distribution_type: DistributionType,
1344    ) -> HashMap<ActorId, (WorkerId, Option<Bitmap>)> {
1345        let EnsembleActorTemplate {
1346            assignment,
1347            actor_count,
1348            distribution_type: _,
1349        } = &self.actor_template;
1350        let actor_id_base = self.actor_id_base;
1351        {
1352            assignment
1353                .iter()
1354                .flat_map(|(worker_id, actors)| {
1355                    actors
1356                        .iter()
1357                        .map(move |(actor_idx, bitmap)| (worker_id, actor_idx, bitmap))
1358                })
1359                .map(|(&worker_id, &actor_idx, bitmap)| {
1360                    if distribution_type == DistributionType::Single {
1361                        assert_eq!(*actor_count, 1);
1362                    }
1363
1364                    let actor_id = actor_id_base + actor_idx;
1365
1366                    (actor_id, (worker_id, bitmap.clone()))
1367                })
1368                .collect()
1369        }
1370    }
1371
1372    pub(crate) fn align_component_splits(
1373        &self,
1374        split_assignment: &HashMap<u32, Vec<SplitImpl>>,
1375    ) -> HashMap<ActorId, Vec<SplitImpl>> {
1376        (0..self.actor_template.actor_count)
1377            .filter_map(|actor_idx| {
1378                split_assignment
1379                    .get(&actor_idx)
1380                    .map(|splits| ((self.actor_id_base + actor_idx), splits.clone()))
1381            })
1382            .collect()
1383    }
1384}
1385
1386#[cfg(debug_assertions)]
1387fn debug_sanity_check(
1388    ensembles: &[NoShuffleEnsemble],
1389    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1390    jobs: &HashMap<JobId, streaming_job::Model>,
1391) {
1392    let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
1393        .iter()
1394        .flat_map(|(job_id, fragments)| {
1395            fragments
1396                .iter()
1397                .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
1398        })
1399        .collect();
1400
1401    // Debug-only assertions to catch inconsistent ensemble metadata early.
1402    debug_assert!(
1403        ensembles
1404            .iter()
1405            .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
1406        "entries must be subset of components"
1407    );
1408
1409    let mut missing_fragments = BTreeSet::new();
1410    let mut missing_jobs = BTreeSet::new();
1411
1412    for fragment_id in ensembles
1413        .iter()
1414        .flat_map(|ensemble| ensemble.components.iter())
1415    {
1416        match fragment_lookup.get(fragment_id) {
1417            Some((fragment, job_id)) => {
1418                if !jobs.contains_key(&fragment.job_id) {
1419                    missing_jobs.insert(*job_id);
1420                }
1421            }
1422            None => {
1423                missing_fragments.insert(*fragment_id);
1424            }
1425        }
1426    }
1427
1428    debug_assert!(
1429        missing_fragments.is_empty(),
1430        "missing fragments in fragment_map: {:?}",
1431        missing_fragments
1432    );
1433
1434    debug_assert!(
1435        missing_jobs.is_empty(),
1436        "missing jobs for fragments' job_id: {:?}",
1437        missing_jobs
1438    );
1439
1440    for ensemble in ensembles {
1441        let unique_vnode_counts: Vec<_> = ensemble
1442            .components
1443            .iter()
1444            .flat_map(|fragment_id| {
1445                fragment_lookup
1446                    .get(fragment_id)
1447                    .map(|(fragment, _)| fragment.vnode_count)
1448            })
1449            .unique()
1450            .collect();
1451
1452        debug_assert!(
1453            unique_vnode_counts.len() <= 1,
1454            "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
1455            ensemble.components,
1456            unique_vnode_counts
1457        );
1458    }
1459}
1460
1461async fn resolve_source_fragments<C>(
1462    txn: &C,
1463    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1464) -> MetaResult<(
1465    HashMap<FragmentId, SourceId>,
1466    HashMap<FragmentId, Vec<SplitImpl>>,
1467)>
1468where
1469    C: ConnectionTrait,
1470{
1471    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1472    for (fragment_id, fragment) in job_fragments.values().flatten() {
1473        let mask = fragment.fragment_type_mask;
1474        if mask.contains(FragmentTypeFlag::Source)
1475            && let Some(source_id) = fragment.nodes.find_stream_source()
1476        {
1477            source_fragment_ids
1478                .entry(source_id)
1479                .or_default()
1480                .insert(*fragment_id);
1481        }
1482
1483        if mask.contains(FragmentTypeFlag::SourceScan)
1484            && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1485        {
1486            source_fragment_ids
1487                .entry(source_id)
1488                .or_default()
1489                .insert(*fragment_id);
1490        }
1491    }
1492
1493    let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1494        .iter()
1495        .flat_map(|(source_id, fragment_ids)| {
1496            fragment_ids
1497                .iter()
1498                .map(|fragment_id| (*fragment_id, *source_id))
1499        })
1500        .collect();
1501
1502    let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1503
1504    let fragment_splits: Vec<_> = FragmentSplits::find()
1505        .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1506        .all(txn)
1507        .await?;
1508
1509    let fragment_splits: HashMap<_, _> = fragment_splits
1510        .into_iter()
1511        .flat_map(|model| {
1512            model.splits.map(|splits| {
1513                (
1514                    model.fragment_id,
1515                    splits
1516                        .to_protobuf()
1517                        .splits
1518                        .iter()
1519                        .flat_map(SplitImpl::try_from)
1520                        .collect_vec(),
1521                )
1522            })
1523        })
1524        .collect();
1525
1526    Ok((fragment_source_ids, fragment_splits))
1527}
1528
1529// Helper struct to make the function signature cleaner and to properly bundle the required data.
1530#[derive(Debug)]
1531pub struct ActorGraph<'a> {
1532    pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1533    pub locations: &'a HashMap<ActorId, WorkerId>,
1534}
1535
1536#[derive(Debug, Clone)]
1537pub struct NoShuffleEnsemble {
1538    entries: HashSet<FragmentId>,
1539    components: HashSet<FragmentId>,
1540}
1541
1542impl NoShuffleEnsemble {
1543    /// Create a single-fragment ensemble (for standalone fragments with no `NoShuffle` edges).
1544    pub(crate) fn singleton(fragment_id: FragmentId) -> Self {
1545        Self {
1546            entries: HashSet::from_iter([fragment_id]),
1547            components: HashSet::from_iter([fragment_id]),
1548        }
1549    }
1550
1551    #[cfg(test)]
1552    pub fn for_test(
1553        entries: impl IntoIterator<Item = FragmentId>,
1554        components: impl IntoIterator<Item = FragmentId>,
1555    ) -> Self {
1556        let entries = entries.into_iter().collect();
1557        let components = components.into_iter().collect();
1558        Self {
1559            entries,
1560            components,
1561        }
1562    }
1563
1564    pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1565        self.components.iter().cloned()
1566    }
1567
1568    pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1569        self.entries.iter().copied()
1570    }
1571
1572    pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1573        self.components.iter().copied()
1574    }
1575
1576    pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1577        self.entries.contains(fragment_id)
1578    }
1579}
1580
1581pub async fn find_fragment_no_shuffle_dags_detailed(
1582    db: &impl ConnectionTrait,
1583    initial_fragment_ids: &[FragmentId],
1584) -> MetaResult<Vec<NoShuffleEnsemble>> {
1585    let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1586        .columns([
1587            fragment_relation::Column::SourceFragmentId,
1588            fragment_relation::Column::TargetFragmentId,
1589        ])
1590        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1591        .into_tuple()
1592        .all(db)
1593        .await?;
1594
1595    let (forward_edges, backward_edges) =
1596        build_no_shuffle_fragment_graph_edges(all_no_shuffle_relations);
1597
1598    find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1599}
1600
1601pub(crate) fn build_no_shuffle_fragment_graph_edges(
1602    relations: impl IntoIterator<Item = (FragmentId, FragmentId)>,
1603) -> (
1604    HashMap<FragmentId, Vec<FragmentId>>,
1605    HashMap<FragmentId, Vec<FragmentId>>,
1606) {
1607    let mut forward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1608    let mut backward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1609
1610    for (src, dst) in relations {
1611        forward_edges.entry(src).or_default().insert(dst);
1612        backward_edges.entry(dst).or_default().insert(src);
1613    }
1614
1615    let forward_edges = forward_edges
1616        .into_iter()
1617        .map(|(src, dst_set)| (src, dst_set.into_iter().collect()))
1618        .collect();
1619    let backward_edges = backward_edges
1620        .into_iter()
1621        .map(|(dst, src_set)| (dst, src_set.into_iter().collect()))
1622        .collect();
1623
1624    (forward_edges, backward_edges)
1625}
1626
1627pub(crate) fn find_no_shuffle_graphs(
1628    initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1629    forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1630    backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1631) -> MetaResult<Vec<NoShuffleEnsemble>> {
1632    let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1633    let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1634
1635    for &init_id in initial_fragment_ids {
1636        let init_id = init_id.into();
1637        if globally_visited.contains(&init_id) {
1638            continue;
1639        }
1640
1641        // Found a new component. Traverse it to find all its nodes.
1642        let mut components = HashSet::new();
1643        let mut queue: VecDeque<FragmentId> = VecDeque::new();
1644
1645        queue.push_back(init_id);
1646        globally_visited.insert(init_id);
1647
1648        while let Some(current_id) = queue.pop_front() {
1649            components.insert(current_id);
1650            let neighbors = forward_edges
1651                .get(&current_id)
1652                .into_iter()
1653                .flatten()
1654                .chain(backward_edges.get(&current_id).into_iter().flatten());
1655
1656            for &neighbor_id in neighbors {
1657                if globally_visited.insert(neighbor_id) {
1658                    queue.push_back(neighbor_id);
1659                }
1660            }
1661        }
1662
1663        // For the newly found component, identify its roots.
1664        let mut entries = HashSet::new();
1665        for &node_id in &components {
1666            let is_root = match backward_edges.get(&node_id) {
1667                Some(parents) => parents.iter().all(|p| !components.contains(p)),
1668                None => true,
1669            };
1670            if is_root {
1671                entries.insert(node_id);
1672            }
1673        }
1674
1675        // Store the detailed DAG structure (roots, all nodes in this DAG).
1676        if !entries.is_empty() {
1677            graphs.push(NoShuffleEnsemble {
1678                entries,
1679                components,
1680            });
1681        }
1682    }
1683
1684    Ok(graphs)
1685}
1686
1687#[cfg(test)]
1688mod tests {
1689    use std::collections::{BTreeSet, HashMap, HashSet};
1690    use std::sync::Arc;
1691
1692    use risingwave_connector::source::SplitImpl;
1693    use risingwave_connector::source::test_source::TestSourceSplit;
1694    use risingwave_meta_model::{CreateType, JobStatus};
1695    use risingwave_pb::common::WorkerType;
1696    use risingwave_pb::common::worker_node::Property as WorkerProperty;
1697    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1698
1699    use super::*;
1700
1701    // Helper type aliases for cleaner test code
1702    // Using the actual FragmentId type from the module
1703    type Edges = (
1704        HashMap<FragmentId, Vec<FragmentId>>,
1705        HashMap<FragmentId, Vec<FragmentId>>,
1706    );
1707
1708    /// A helper function to build forward and backward edge maps from a simple list of tuples.
1709    /// This reduces boilerplate in each test.
1710    fn build_edges(relations: &[(u32, u32)]) -> Edges {
1711        let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1712        let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1713        for &(src, dst) in relations {
1714            forward_edges
1715                .entry(src.into())
1716                .or_default()
1717                .push(dst.into());
1718            backward_edges
1719                .entry(dst.into())
1720                .or_default()
1721                .push(src.into());
1722        }
1723        (forward_edges, backward_edges)
1724    }
1725
1726    /// Helper function to create a `HashSet` from a slice easily.
1727    fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1728        ids.iter().map(|id| (*id).into()).collect()
1729    }
1730
1731    fn build_fragment(
1732        fragment_id: FragmentId,
1733        job_id: JobId,
1734        fragment_type_mask: i32,
1735        distribution_type: DistributionType,
1736        vnode_count: i32,
1737        parallelism: StreamingParallelism,
1738    ) -> LoadedFragment {
1739        LoadedFragment {
1740            fragment_id,
1741            job_id,
1742            fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1743            distribution_type,
1744            vnode_count: vnode_count as usize,
1745            nodes: PbStreamNode::default(),
1746            state_table_ids: HashSet::new(),
1747            parallelism: Some(parallelism),
1748        }
1749    }
1750
1751    type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1752
1753    fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1754        let base = fragment.actors.keys().copied().min().unwrap_or_default();
1755
1756        let mut entries: Vec<_> = fragment
1757            .actors
1758            .iter()
1759            .map(|(&actor_id, info)| {
1760                let idx = actor_id.as_raw_id() - base.as_raw_id();
1761                let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1762                    bitmap
1763                        .iter()
1764                        .enumerate()
1765                        .filter_map(|(pos, is_set)| is_set.then_some(pos))
1766                        .collect::<Vec<_>>()
1767                });
1768                let splits = info
1769                    .splits
1770                    .iter()
1771                    .map(|split| split.id().to_string())
1772                    .collect::<Vec<_>>();
1773                (idx.into(), info.worker_id, vnode_indices, splits)
1774            })
1775            .collect();
1776
1777        entries.sort_by_key(|(idx, _, _, _)| *idx);
1778        entries
1779    }
1780
1781    fn build_worker_node(
1782        id: impl Into<WorkerId>,
1783        parallelism: usize,
1784        resource_group: &str,
1785    ) -> WorkerNode {
1786        WorkerNode {
1787            id: id.into(),
1788            r#type: WorkerType::ComputeNode as i32,
1789            property: Some(WorkerProperty {
1790                is_streaming: true,
1791                parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1792                resource_group: Some(resource_group.to_owned()),
1793                ..Default::default()
1794            }),
1795            ..Default::default()
1796        }
1797    }
1798
1799    #[test]
1800    fn test_single_linear_chain() {
1801        // Scenario: A simple linear graph 1 -> 2 -> 3.
1802        // We start from the middle node (2).
1803        let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1804        let initial_ids = &[2];
1805
1806        // Act
1807        let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1808
1809        // Assert
1810        assert!(result.is_ok());
1811        let graphs = result.unwrap();
1812
1813        assert_eq!(graphs.len(), 1);
1814        let graph = &graphs[0];
1815        assert_eq!(graph.entries, to_hashset(&[1]));
1816        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1817    }
1818
1819    #[test]
1820    fn test_two_disconnected_graphs() {
1821        // Scenario: Two separate graphs: 1->2 and 10->11.
1822        // We start with one node from each graph.
1823        let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1824        let initial_ids = &[2, 10];
1825
1826        // Act
1827        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1828
1829        // Assert
1830        assert_eq!(graphs.len(), 2);
1831
1832        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1833        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1834
1835        // Graph 1
1836        assert_eq!(graphs[0].entries, to_hashset(&[1]));
1837        assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1838
1839        // Graph 2
1840        assert_eq!(graphs[1].entries, to_hashset(&[10]));
1841        assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1842    }
1843
1844    #[test]
1845    fn test_multiple_entries_in_one_graph() {
1846        // Scenario: A graph with two roots feeding into one node: 1->3, 2->3.
1847        let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1848        let initial_ids = &[3];
1849
1850        // Act
1851        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1852
1853        // Assert
1854        assert_eq!(graphs.len(), 1);
1855        let graph = &graphs[0];
1856        assert_eq!(graph.entries, to_hashset(&[1, 2]));
1857        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1858    }
1859
1860    #[test]
1861    fn test_diamond_shape_graph() {
1862        // Scenario: A diamond shape: 1->2, 1->3, 2->4, 3->4
1863        let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1864        let initial_ids = &[4];
1865
1866        // Act
1867        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1868
1869        // Assert
1870        assert_eq!(graphs.len(), 1);
1871        let graph = &graphs[0];
1872        assert_eq!(graph.entries, to_hashset(&[1]));
1873        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1874    }
1875
1876    #[test]
1877    fn test_starting_with_multiple_nodes_in_same_graph() {
1878        // Scenario: Start with two different nodes (2 and 4) from the same component.
1879        // Should only identify one graph, not two.
1880        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1881        let initial_ids = &[2, 4];
1882
1883        // Act
1884        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1885
1886        // Assert
1887        assert_eq!(graphs.len(), 1);
1888        let graph = &graphs[0];
1889        assert_eq!(graph.entries, to_hashset(&[1]));
1890        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1891    }
1892
1893    #[test]
1894    fn test_empty_initial_ids() {
1895        // Scenario: The initial ID list is empty.
1896        let (forward, backward) = build_edges(&[(1, 2)]);
1897        let initial_ids: &[u32] = &[];
1898
1899        // Act
1900        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1901
1902        // Assert
1903        assert!(graphs.is_empty());
1904    }
1905
1906    #[test]
1907    fn test_isolated_node_as_input() {
1908        // Scenario: Start with an ID that has no relations.
1909        let (forward, backward) = build_edges(&[(1, 2)]);
1910        let initial_ids = &[100];
1911
1912        // Act
1913        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1914
1915        // Assert
1916        assert_eq!(graphs.len(), 1);
1917        let graph = &graphs[0];
1918        assert_eq!(graph.entries, to_hashset(&[100]));
1919        assert_eq!(graph.components, to_hashset(&[100]));
1920    }
1921
1922    #[test]
1923    fn test_graph_with_a_cycle() {
1924        // Scenario: A graph with a cycle: 1 -> 2 -> 3 -> 1.
1925        // The algorithm should correctly identify all nodes in the component.
1926        // Crucially, NO node is a root because every node has a parent *within the component*.
1927        // Therefore, the `entries` set should be empty, and the graph should not be included in the results.
1928        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1929        let initial_ids = &[2];
1930
1931        // Act
1932        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1933
1934        // Assert
1935        assert!(
1936            graphs.is_empty(),
1937            "A graph with no entries should not be returned"
1938        );
1939    }
1940    #[test]
1941    fn test_custom_complex() {
1942        let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1943        let initial_ids = &[1, 2, 4, 6];
1944
1945        // Act
1946        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1947
1948        // Assert
1949        assert_eq!(graphs.len(), 2);
1950        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1951        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1952
1953        // Graph 1
1954        assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1955        assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1956
1957        // Graph 2
1958        assert_eq!(graphs[1].entries, to_hashset(&[6]));
1959        assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1960    }
1961
1962    #[test]
1963    fn render_actors_increments_actor_counter() {
1964        let actor_id_counter = AtomicU32::new(100);
1965        let fragment_id: FragmentId = 1.into();
1966        let job_id: JobId = 10.into();
1967        let database_id: DatabaseId = DatabaseId::new(3);
1968
1969        let fragment_model = build_fragment(
1970            fragment_id,
1971            job_id,
1972            0,
1973            DistributionType::Single,
1974            1,
1975            StreamingParallelism::Fixed(1),
1976        );
1977
1978        let job_model = streaming_job::Model {
1979            job_id,
1980            job_status: JobStatus::Created,
1981            create_type: CreateType::Foreground,
1982            timezone: None,
1983            config_override: None,
1984            adaptive_parallelism_strategy: None,
1985            parallelism: StreamingParallelism::Fixed(1),
1986            backfill_parallelism: None,
1987            backfill_orders: None,
1988            max_parallelism: 1,
1989            specific_resource_group: None,
1990            is_serverless_backfill: false,
1991        };
1992
1993        let database_model = database::Model {
1994            database_id,
1995            name: "test_db".into(),
1996            resource_group: "rg-a".into(),
1997            barrier_interval_ms: None,
1998            checkpoint_frequency: None,
1999        };
2000
2001        let ensembles = vec![NoShuffleEnsemble {
2002            entries: HashSet::from([fragment_id]),
2003            components: HashSet::from([fragment_id]),
2004        }];
2005
2006        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
2007        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2008        let job_map = HashMap::from([(job_id, job_model)]);
2009
2010        let worker_map: HashMap<WorkerId, WorkerNode> =
2011            HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
2012
2013        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2014        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2015        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2016        let database_map = HashMap::from([(database_id, database_model)]);
2017
2018        let context = RenderActorsContext {
2019            fragment_source_ids: &fragment_source_ids,
2020            fragment_splits: &fragment_splits,
2021            streaming_job_databases: &streaming_job_databases,
2022            database_map: &database_map,
2023        };
2024
2025        let result = render_actors(
2026            &actor_id_counter,
2027            &ensembles,
2028            &job_fragments,
2029            &job_map,
2030            &worker_map,
2031            AdaptiveParallelismStrategy::Auto,
2032            context,
2033        )
2034        .expect("actor rendering succeeds");
2035
2036        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2037        assert_eq!(state.len(), 1);
2038        assert!(
2039            state[0].2.is_none(),
2040            "single distribution should not assign vnode bitmaps"
2041        );
2042        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
2043    }
2044
2045    #[test]
2046    fn render_actors_aligns_hash_vnode_bitmaps() {
2047        let actor_id_counter = AtomicU32::new(0);
2048        let entry_fragment_id: FragmentId = 1.into();
2049        let downstream_fragment_id: FragmentId = 2.into();
2050        let job_id: JobId = 20.into();
2051        let database_id: DatabaseId = DatabaseId::new(5);
2052
2053        let entry_fragment = build_fragment(
2054            entry_fragment_id,
2055            job_id,
2056            0,
2057            DistributionType::Hash,
2058            4,
2059            StreamingParallelism::Fixed(2),
2060        );
2061
2062        let downstream_fragment = build_fragment(
2063            downstream_fragment_id,
2064            job_id,
2065            0,
2066            DistributionType::Hash,
2067            4,
2068            StreamingParallelism::Fixed(2),
2069        );
2070
2071        let job_model = streaming_job::Model {
2072            job_id,
2073            job_status: JobStatus::Created,
2074            create_type: CreateType::Background,
2075            timezone: None,
2076            config_override: None,
2077            adaptive_parallelism_strategy: None,
2078            parallelism: StreamingParallelism::Fixed(2),
2079            backfill_parallelism: None,
2080            backfill_orders: None,
2081            max_parallelism: 2,
2082            specific_resource_group: None,
2083            is_serverless_backfill: false,
2084        };
2085
2086        let database_model = database::Model {
2087            database_id,
2088            name: "test_db_hash".into(),
2089            resource_group: "rg-hash".into(),
2090            barrier_interval_ms: None,
2091            checkpoint_frequency: None,
2092        };
2093
2094        let ensembles = vec![NoShuffleEnsemble {
2095            entries: HashSet::from([entry_fragment_id]),
2096            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2097        }];
2098
2099        let fragment_map = HashMap::from([
2100            (entry_fragment_id, entry_fragment),
2101            (downstream_fragment_id, downstream_fragment),
2102        ]);
2103        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2104        let job_map = HashMap::from([(job_id, job_model)]);
2105
2106        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2107            (1.into(), build_worker_node(1, 1, "rg-hash")),
2108            (2.into(), build_worker_node(2, 1, "rg-hash")),
2109        ]);
2110
2111        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2112        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2113        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2114        let database_map = HashMap::from([(database_id, database_model)]);
2115
2116        let context = RenderActorsContext {
2117            fragment_source_ids: &fragment_source_ids,
2118            fragment_splits: &fragment_splits,
2119            streaming_job_databases: &streaming_job_databases,
2120            database_map: &database_map,
2121        };
2122
2123        let result = render_actors(
2124            &actor_id_counter,
2125            &ensembles,
2126            &job_fragments,
2127            &job_map,
2128            &worker_map,
2129            AdaptiveParallelismStrategy::Auto,
2130            context,
2131        )
2132        .expect("actor rendering succeeds");
2133
2134        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2135        let downstream_state =
2136            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2137
2138        assert_eq!(entry_state.len(), 2);
2139        assert_eq!(entry_state, downstream_state);
2140
2141        let assigned_vnodes: BTreeSet<_> = entry_state
2142            .iter()
2143            .flat_map(|(_, _, vnodes, _)| {
2144                vnodes
2145                    .as_ref()
2146                    .expect("hash distribution should populate vnode bitmap")
2147                    .iter()
2148                    .copied()
2149            })
2150            .collect();
2151        assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
2152        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2153    }
2154
2155    #[test]
2156    fn render_actors_propagates_source_splits() {
2157        let actor_id_counter = AtomicU32::new(0);
2158        let entry_fragment_id: FragmentId = 11.into();
2159        let downstream_fragment_id: FragmentId = 12.into();
2160        let job_id: JobId = 30.into();
2161        let database_id: DatabaseId = DatabaseId::new(7);
2162        let source_id: SourceId = 99.into();
2163
2164        let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2165        let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2166
2167        let entry_fragment = build_fragment(
2168            entry_fragment_id,
2169            job_id,
2170            source_mask,
2171            DistributionType::Hash,
2172            4,
2173            StreamingParallelism::Fixed(2),
2174        );
2175
2176        let downstream_fragment = build_fragment(
2177            downstream_fragment_id,
2178            job_id,
2179            source_scan_mask,
2180            DistributionType::Hash,
2181            4,
2182            StreamingParallelism::Fixed(2),
2183        );
2184
2185        let job_model = streaming_job::Model {
2186            job_id,
2187            job_status: JobStatus::Created,
2188            create_type: CreateType::Background,
2189            timezone: None,
2190            config_override: None,
2191            adaptive_parallelism_strategy: None,
2192            parallelism: StreamingParallelism::Fixed(2),
2193            backfill_parallelism: None,
2194            backfill_orders: None,
2195            max_parallelism: 2,
2196            specific_resource_group: None,
2197            is_serverless_backfill: false,
2198        };
2199
2200        let database_model = database::Model {
2201            database_id,
2202            name: "split_db".into(),
2203            resource_group: "rg-source".into(),
2204            barrier_interval_ms: None,
2205            checkpoint_frequency: None,
2206        };
2207
2208        let ensembles = vec![NoShuffleEnsemble {
2209            entries: HashSet::from([entry_fragment_id]),
2210            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2211        }];
2212
2213        let fragment_map = HashMap::from([
2214            (entry_fragment_id, entry_fragment),
2215            (downstream_fragment_id, downstream_fragment),
2216        ]);
2217        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2218        let job_map = HashMap::from([(job_id, job_model)]);
2219
2220        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2221            (1.into(), build_worker_node(1, 1, "rg-source")),
2222            (2.into(), build_worker_node(2, 1, "rg-source")),
2223        ]);
2224
2225        let split_a = SplitImpl::Test(TestSourceSplit {
2226            id: Arc::<str>::from("split-a"),
2227            properties: HashMap::new(),
2228            offset: "0".into(),
2229        });
2230        let split_b = SplitImpl::Test(TestSourceSplit {
2231            id: Arc::<str>::from("split-b"),
2232            properties: HashMap::new(),
2233            offset: "0".into(),
2234        });
2235
2236        let fragment_source_ids = HashMap::from([
2237            (entry_fragment_id, source_id),
2238            (downstream_fragment_id, source_id),
2239        ]);
2240        let fragment_splits =
2241            HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
2242        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2243        let database_map = HashMap::from([(database_id, database_model)]);
2244
2245        let context = RenderActorsContext {
2246            fragment_source_ids: &fragment_source_ids,
2247            fragment_splits: &fragment_splits,
2248            streaming_job_databases: &streaming_job_databases,
2249            database_map: &database_map,
2250        };
2251
2252        let result = render_actors(
2253            &actor_id_counter,
2254            &ensembles,
2255            &job_fragments,
2256            &job_map,
2257            &worker_map,
2258            AdaptiveParallelismStrategy::Auto,
2259            context,
2260        )
2261        .expect("actor rendering succeeds");
2262
2263        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2264        let downstream_state =
2265            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2266
2267        assert_eq!(entry_state, downstream_state);
2268
2269        let split_ids: BTreeSet<_> = entry_state
2270            .iter()
2271            .flat_map(|(_, _, _, splits)| splits.iter().cloned())
2272            .collect();
2273        assert_eq!(
2274            split_ids,
2275            BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
2276        );
2277        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2278    }
2279
2280    #[test]
2281    fn preview_actor_assignments_defers_actor_id_allocation() {
2282        let actor_id_counter = AtomicU32::new(100);
2283        let entry_fragment_id: FragmentId = 11.into();
2284        let downstream_fragment_id: FragmentId = 12.into();
2285        let job_id: JobId = 30.into();
2286        let database_id: DatabaseId = DatabaseId::new(7);
2287        let source_id: SourceId = 99.into();
2288
2289        let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2290        let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2291
2292        let entry_fragment = build_fragment(
2293            entry_fragment_id,
2294            job_id,
2295            source_mask,
2296            DistributionType::Hash,
2297            4,
2298            StreamingParallelism::Fixed(2),
2299        );
2300
2301        let downstream_fragment = build_fragment(
2302            downstream_fragment_id,
2303            job_id,
2304            source_scan_mask,
2305            DistributionType::Hash,
2306            4,
2307            StreamingParallelism::Fixed(2),
2308        );
2309
2310        let job_model = streaming_job::Model {
2311            job_id,
2312            job_status: JobStatus::Created,
2313            create_type: CreateType::Background,
2314            timezone: None,
2315            config_override: None,
2316            adaptive_parallelism_strategy: None,
2317            parallelism: StreamingParallelism::Fixed(2),
2318            backfill_parallelism: None,
2319            backfill_orders: None,
2320            max_parallelism: 2,
2321            specific_resource_group: None,
2322            is_serverless_backfill: false,
2323        };
2324
2325        let database_model = database::Model {
2326            database_id,
2327            name: "preview_db".into(),
2328            resource_group: "rg-source".into(),
2329            barrier_interval_ms: None,
2330            checkpoint_frequency: None,
2331        };
2332
2333        let ensembles = vec![NoShuffleEnsemble {
2334            entries: HashSet::from([entry_fragment_id]),
2335            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2336        }];
2337
2338        let fragment_map = HashMap::from([
2339            (entry_fragment_id, entry_fragment),
2340            (downstream_fragment_id, downstream_fragment),
2341        ]);
2342        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2343        let job_map = HashMap::from([(job_id, job_model)]);
2344
2345        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2346            (1.into(), build_worker_node(1, 1, "rg-source")),
2347            (2.into(), build_worker_node(2, 1, "rg-source")),
2348        ]);
2349
2350        let split_a = SplitImpl::Test(TestSourceSplit {
2351            id: Arc::<str>::from("split-a"),
2352            properties: HashMap::new(),
2353            offset: "0".into(),
2354        });
2355        let split_b = SplitImpl::Test(TestSourceSplit {
2356            id: Arc::<str>::from("split-b"),
2357            properties: HashMap::new(),
2358            offset: "0".into(),
2359        });
2360
2361        let loaded = LoadedFragmentContext {
2362            ensembles,
2363            job_fragments,
2364            job_map,
2365            streaming_job_databases: HashMap::from([(job_id, database_id)]),
2366            database_map: HashMap::from([(database_id, database_model)]),
2367            fragment_source_ids: HashMap::from([
2368                (entry_fragment_id, source_id),
2369                (downstream_fragment_id, source_id),
2370            ]),
2371            fragment_splits: HashMap::from([(entry_fragment_id, vec![split_a, split_b])]),
2372        };
2373
2374        let preview =
2375            preview_actor_assignments(&worker_map, AdaptiveParallelismStrategy::Auto, &loaded)
2376                .expect("preview rendering succeeds");
2377        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 100);
2378
2379        let preview_entry_state =
2380            collect_actor_state(&preview.fragments[&database_id][&job_id][&entry_fragment_id]);
2381        let preview_downstream_state =
2382            collect_actor_state(&preview.fragments[&database_id][&job_id][&downstream_fragment_id]);
2383        assert_eq!(preview_entry_state, preview_downstream_state);
2384
2385        let materialized = materialize_actor_assignments(&actor_id_counter, preview);
2386
2387        let materialized_entry_state =
2388            collect_actor_state(&materialized.fragments[&database_id][&job_id][&entry_fragment_id]);
2389        let materialized_downstream_state = collect_actor_state(
2390            &materialized.fragments[&database_id][&job_id][&downstream_fragment_id],
2391        );
2392
2393        assert_eq!(materialized_entry_state, materialized_downstream_state);
2394        assert_eq!(materialized_entry_state, preview_entry_state);
2395        assert_eq!(materialized_downstream_state, preview_downstream_state);
2396        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 104);
2397    }
2398
2399    /// Test that job-level strategy overrides global strategy for Adaptive parallelism.
2400    #[test]
2401    fn render_actors_job_strategy_overrides_global() {
2402        let actor_id_counter = AtomicU32::new(0);
2403        let fragment_id: FragmentId = 1.into();
2404        let job_id: JobId = 100.into();
2405        let database_id: DatabaseId = DatabaseId::new(10);
2406
2407        // Fragment with Adaptive parallelism, vnode_count = 8
2408        let fragment_model = build_fragment(
2409            fragment_id,
2410            job_id,
2411            0,
2412            DistributionType::Hash,
2413            8,
2414            StreamingParallelism::Adaptive,
2415        );
2416
2417        // Job has custom strategy: BOUNDED(2)
2418        let job_model = streaming_job::Model {
2419            job_id,
2420            job_status: JobStatus::Created,
2421            create_type: CreateType::Foreground,
2422            timezone: None,
2423            config_override: None,
2424            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2425            parallelism: StreamingParallelism::Adaptive,
2426            backfill_parallelism: None,
2427            backfill_orders: None,
2428            max_parallelism: 8,
2429            specific_resource_group: None,
2430            is_serverless_backfill: false,
2431        };
2432
2433        let database_model = database::Model {
2434            database_id,
2435            name: "test_db".into(),
2436            resource_group: "default".into(),
2437            barrier_interval_ms: None,
2438            checkpoint_frequency: None,
2439        };
2440
2441        let ensembles = vec![NoShuffleEnsemble {
2442            entries: HashSet::from([fragment_id]),
2443            components: HashSet::from([fragment_id]),
2444        }];
2445
2446        let fragment_map =
2447            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2448        let job_map = HashMap::from([(job_id, job_model)]);
2449
2450        // 4 workers with 1 parallelism each = total 4 parallelism
2451        let worker_map = HashMap::from([
2452            (1.into(), build_worker_node(1, 1, "default")),
2453            (2.into(), build_worker_node(2, 1, "default")),
2454            (3.into(), build_worker_node(3, 1, "default")),
2455            (4.into(), build_worker_node(4, 1, "default")),
2456        ]);
2457
2458        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2459        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2460        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2461        let database_map = HashMap::from([(database_id, database_model)]);
2462
2463        let context = RenderActorsContext {
2464            fragment_source_ids: &fragment_source_ids,
2465            fragment_splits: &fragment_splits,
2466            streaming_job_databases: &streaming_job_databases,
2467            database_map: &database_map,
2468        };
2469
2470        // Global strategy is FULL (would give 4 actors), but job strategy is BOUNDED(2)
2471        let result = render_actors(
2472            &actor_id_counter,
2473            &ensembles,
2474            &fragment_map,
2475            &job_map,
2476            &worker_map,
2477            AdaptiveParallelismStrategy::Full,
2478            context,
2479        )
2480        .expect("actor rendering succeeds");
2481
2482        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2483        // Job strategy BOUNDED(2) should limit to 2 actors, not 4 (global FULL)
2484        assert_eq!(
2485            state.len(),
2486            2,
2487            "Job strategy BOUNDED(2) should override global FULL"
2488        );
2489    }
2490
2491    /// Test that global strategy is used when job has no custom strategy.
2492    #[test]
2493    fn render_actors_uses_global_strategy_when_job_has_none() {
2494        let actor_id_counter = AtomicU32::new(0);
2495        let fragment_id: FragmentId = 1.into();
2496        let job_id: JobId = 101.into();
2497        let database_id: DatabaseId = DatabaseId::new(11);
2498
2499        let fragment_model = build_fragment(
2500            fragment_id,
2501            job_id,
2502            0,
2503            DistributionType::Hash,
2504            8,
2505            StreamingParallelism::Adaptive,
2506        );
2507
2508        // Job has NO custom strategy (None)
2509        let job_model = streaming_job::Model {
2510            job_id,
2511            job_status: JobStatus::Created,
2512            create_type: CreateType::Foreground,
2513            timezone: None,
2514            config_override: None,
2515            adaptive_parallelism_strategy: None, // No custom strategy
2516            parallelism: StreamingParallelism::Adaptive,
2517            backfill_parallelism: None,
2518            backfill_orders: None,
2519            max_parallelism: 8,
2520            specific_resource_group: None,
2521            is_serverless_backfill: false,
2522        };
2523
2524        let database_model = database::Model {
2525            database_id,
2526            name: "test_db".into(),
2527            resource_group: "default".into(),
2528            barrier_interval_ms: None,
2529            checkpoint_frequency: None,
2530        };
2531
2532        let ensembles = vec![NoShuffleEnsemble {
2533            entries: HashSet::from([fragment_id]),
2534            components: HashSet::from([fragment_id]),
2535        }];
2536
2537        let fragment_map =
2538            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2539        let job_map = HashMap::from([(job_id, job_model)]);
2540
2541        // 4 workers = total 4 parallelism
2542        let worker_map = HashMap::from([
2543            (1.into(), build_worker_node(1, 1, "default")),
2544            (2.into(), build_worker_node(2, 1, "default")),
2545            (3.into(), build_worker_node(3, 1, "default")),
2546            (4.into(), build_worker_node(4, 1, "default")),
2547        ]);
2548
2549        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2550        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2551        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2552        let database_map = HashMap::from([(database_id, database_model)]);
2553
2554        let context = RenderActorsContext {
2555            fragment_source_ids: &fragment_source_ids,
2556            fragment_splits: &fragment_splits,
2557            streaming_job_databases: &streaming_job_databases,
2558            database_map: &database_map,
2559        };
2560
2561        // Global strategy is BOUNDED(3)
2562        let result = render_actors(
2563            &actor_id_counter,
2564            &ensembles,
2565            &fragment_map,
2566            &job_map,
2567            &worker_map,
2568            AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(3).unwrap()),
2569            context,
2570        )
2571        .expect("actor rendering succeeds");
2572
2573        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2574        // Should use global strategy BOUNDED(3)
2575        assert_eq!(
2576            state.len(),
2577            3,
2578            "Should use global strategy BOUNDED(3) when job has no custom strategy"
2579        );
2580    }
2581
2582    /// Test that Fixed parallelism ignores strategy entirely.
2583    #[test]
2584    fn render_actors_fixed_parallelism_ignores_strategy() {
2585        let actor_id_counter = AtomicU32::new(0);
2586        let fragment_id: FragmentId = 1.into();
2587        let job_id: JobId = 102.into();
2588        let database_id: DatabaseId = DatabaseId::new(12);
2589
2590        // Fragment with FIXED parallelism
2591        let fragment_model = build_fragment(
2592            fragment_id,
2593            job_id,
2594            0,
2595            DistributionType::Hash,
2596            8,
2597            StreamingParallelism::Fixed(5),
2598        );
2599
2600        // Job has custom strategy, but it should be ignored for Fixed parallelism
2601        let job_model = streaming_job::Model {
2602            job_id,
2603            job_status: JobStatus::Created,
2604            create_type: CreateType::Foreground,
2605            timezone: None,
2606            config_override: None,
2607            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2608            parallelism: StreamingParallelism::Fixed(5),
2609            backfill_parallelism: None,
2610            backfill_orders: None,
2611            max_parallelism: 8,
2612            specific_resource_group: None,
2613            is_serverless_backfill: false,
2614        };
2615
2616        let database_model = database::Model {
2617            database_id,
2618            name: "test_db".into(),
2619            resource_group: "default".into(),
2620            barrier_interval_ms: None,
2621            checkpoint_frequency: None,
2622        };
2623
2624        let ensembles = vec![NoShuffleEnsemble {
2625            entries: HashSet::from([fragment_id]),
2626            components: HashSet::from([fragment_id]),
2627        }];
2628
2629        let fragment_map =
2630            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2631        let job_map = HashMap::from([(job_id, job_model)]);
2632
2633        // 6 workers = total 6 parallelism
2634        let worker_map = HashMap::from([
2635            (1.into(), build_worker_node(1, 1, "default")),
2636            (2.into(), build_worker_node(2, 1, "default")),
2637            (3.into(), build_worker_node(3, 1, "default")),
2638            (4.into(), build_worker_node(4, 1, "default")),
2639            (5.into(), build_worker_node(5, 1, "default")),
2640            (6.into(), build_worker_node(6, 1, "default")),
2641        ]);
2642
2643        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2644        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2645        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2646        let database_map = HashMap::from([(database_id, database_model)]);
2647
2648        let context = RenderActorsContext {
2649            fragment_source_ids: &fragment_source_ids,
2650            fragment_splits: &fragment_splits,
2651            streaming_job_databases: &streaming_job_databases,
2652            database_map: &database_map,
2653        };
2654
2655        let result = render_actors(
2656            &actor_id_counter,
2657            &ensembles,
2658            &fragment_map,
2659            &job_map,
2660            &worker_map,
2661            AdaptiveParallelismStrategy::Full,
2662            context,
2663        )
2664        .expect("actor rendering succeeds");
2665
2666        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2667        // Fixed(5) should be used, ignoring both job strategy BOUNDED(2) and global FULL
2668        assert_eq!(
2669            state.len(),
2670            5,
2671            "Fixed parallelism should ignore all strategies"
2672        );
2673    }
2674
2675    /// Test RATIO strategy calculation.
2676    #[test]
2677    fn render_actors_ratio_strategy() {
2678        let actor_id_counter = AtomicU32::new(0);
2679        let fragment_id: FragmentId = 1.into();
2680        let job_id: JobId = 103.into();
2681        let database_id: DatabaseId = DatabaseId::new(13);
2682
2683        let fragment_model = build_fragment(
2684            fragment_id,
2685            job_id,
2686            0,
2687            DistributionType::Hash,
2688            16,
2689            StreamingParallelism::Adaptive,
2690        );
2691
2692        // Job has RATIO(0.5) strategy
2693        let job_model = streaming_job::Model {
2694            job_id,
2695            job_status: JobStatus::Created,
2696            create_type: CreateType::Foreground,
2697            timezone: None,
2698            config_override: None,
2699            adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2700            parallelism: StreamingParallelism::Adaptive,
2701            backfill_parallelism: None,
2702            backfill_orders: None,
2703            max_parallelism: 16,
2704            specific_resource_group: None,
2705            is_serverless_backfill: false,
2706        };
2707
2708        let database_model = database::Model {
2709            database_id,
2710            name: "test_db".into(),
2711            resource_group: "default".into(),
2712            barrier_interval_ms: None,
2713            checkpoint_frequency: None,
2714        };
2715
2716        let ensembles = vec![NoShuffleEnsemble {
2717            entries: HashSet::from([fragment_id]),
2718            components: HashSet::from([fragment_id]),
2719        }];
2720
2721        let fragment_map =
2722            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2723        let job_map = HashMap::from([(job_id, job_model)]);
2724
2725        // 8 workers = total 8 parallelism
2726        let worker_map = HashMap::from([
2727            (1.into(), build_worker_node(1, 1, "default")),
2728            (2.into(), build_worker_node(2, 1, "default")),
2729            (3.into(), build_worker_node(3, 1, "default")),
2730            (4.into(), build_worker_node(4, 1, "default")),
2731            (5.into(), build_worker_node(5, 1, "default")),
2732            (6.into(), build_worker_node(6, 1, "default")),
2733            (7.into(), build_worker_node(7, 1, "default")),
2734            (8.into(), build_worker_node(8, 1, "default")),
2735        ]);
2736
2737        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2738        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2739        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2740        let database_map = HashMap::from([(database_id, database_model)]);
2741
2742        let context = RenderActorsContext {
2743            fragment_source_ids: &fragment_source_ids,
2744            fragment_splits: &fragment_splits,
2745            streaming_job_databases: &streaming_job_databases,
2746            database_map: &database_map,
2747        };
2748
2749        let result = render_actors(
2750            &actor_id_counter,
2751            &ensembles,
2752            &fragment_map,
2753            &job_map,
2754            &worker_map,
2755            AdaptiveParallelismStrategy::Full,
2756            context,
2757        )
2758        .expect("actor rendering succeeds");
2759
2760        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2761        // RATIO(0.5) of 8 = 4
2762        assert_eq!(
2763            state.len(),
2764            4,
2765            "RATIO(0.5) of 8 workers should give 4 actors"
2766        );
2767    }
2768}