risingwave_meta/controller/
scale.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::system_param::AdaptiveParallelismStrategy;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_connector::source::{SplitImpl, SplitMetaData};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::prelude::{
29    Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
30};
31use risingwave_meta_model::{
32    CreateType, DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism,
33    WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink, source,
34    streaming_job, table,
35};
36use risingwave_meta_model_migration::Condition;
37use risingwave_pb::common::WorkerNode;
38use risingwave_pb::stream_plan::PbStreamNode;
39use sea_orm::{
40    ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
41    RelationTrait,
42};
43
44use crate::MetaResult;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::manager::ActiveStreamingWorkerNodes;
47use crate::model::{ActorId, StreamActor, StreamingJobModelContextExt};
48use crate::stream::{AssignerBuilder, SplitDiffOptions};
49
50pub(crate) async fn resolve_streaming_job_definition<C>(
51    txn: &C,
52    job_ids: &HashSet<JobId>,
53) -> MetaResult<HashMap<JobId, String>>
54where
55    C: ConnectionTrait,
56{
57    let job_ids = job_ids.iter().cloned().collect_vec();
58
59    // including table, materialized view, index
60    let common_job_definitions: Vec<(JobId, String)> = Table::find()
61        .select_only()
62        .columns([
63            table::Column::TableId,
64            #[cfg(not(debug_assertions))]
65            table::Column::Name,
66            #[cfg(debug_assertions)]
67            table::Column::Definition,
68        ])
69        .filter(table::Column::TableId.is_in(job_ids.clone()))
70        .into_tuple()
71        .all(txn)
72        .await?;
73
74    let sink_definitions: Vec<(JobId, String)> = Sink::find()
75        .select_only()
76        .columns([
77            sink::Column::SinkId,
78            #[cfg(not(debug_assertions))]
79            sink::Column::Name,
80            #[cfg(debug_assertions)]
81            sink::Column::Definition,
82        ])
83        .filter(sink::Column::SinkId.is_in(job_ids.clone()))
84        .into_tuple()
85        .all(txn)
86        .await?;
87
88    let source_definitions: Vec<(JobId, String)> = Source::find()
89        .select_only()
90        .columns([
91            source::Column::SourceId,
92            #[cfg(not(debug_assertions))]
93            source::Column::Name,
94            #[cfg(debug_assertions)]
95            source::Column::Definition,
96        ])
97        .filter(source::Column::SourceId.is_in(job_ids.clone()))
98        .into_tuple()
99        .all(txn)
100        .await?;
101
102    let definitions: HashMap<JobId, String> = common_job_definitions
103        .into_iter()
104        .chain(sink_definitions.into_iter())
105        .chain(source_definitions.into_iter())
106        .collect();
107
108    Ok(definitions)
109}
110
111pub async fn load_fragment_info<C>(
112    txn: &C,
113    actor_id_counter: &AtomicU32,
114    database_id: Option<DatabaseId>,
115    worker_nodes: &ActiveStreamingWorkerNodes,
116    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
117) -> MetaResult<FragmentRenderMap>
118where
119    C: ConnectionTrait,
120{
121    let mut query = StreamingJob::find()
122        .select_only()
123        .column(streaming_job::Column::JobId);
124
125    if let Some(database_id) = database_id {
126        query = query
127            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
128            .filter(object::Column::DatabaseId.eq(database_id));
129    }
130
131    let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
132
133    if jobs.is_empty() {
134        return Ok(HashMap::new());
135    }
136
137    let jobs: HashSet<JobId> = jobs.into_iter().collect();
138
139    let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
140
141    if loaded.is_empty() {
142        return Ok(HashMap::new());
143    }
144
145    let RenderedGraph { fragments, .. } = render_actor_assignments(
146        actor_id_counter,
147        worker_nodes.current(),
148        adaptive_parallelism_strategy,
149        &loaded,
150    )?;
151
152    Ok(fragments)
153}
154
155#[derive(Debug)]
156pub struct TargetResourcePolicy {
157    pub resource_group: Option<String>,
158    pub parallelism: StreamingParallelism,
159}
160
161#[derive(Debug, Clone)]
162pub struct WorkerInfo {
163    pub parallelism: NonZeroUsize,
164    pub resource_group: Option<String>,
165}
166
167pub type FragmentRenderMap =
168    HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
169
170#[derive(Default)]
171pub struct RenderedGraph {
172    pub fragments: FragmentRenderMap,
173    pub ensembles: Vec<NoShuffleEnsemble>,
174}
175
176impl RenderedGraph {
177    pub fn empty() -> Self {
178        Self::default()
179    }
180}
181
182/// Context loaded asynchronously from database, containing all metadata
183/// required to render actor assignments. This separates async I/O from
184/// sync rendering logic.
185#[derive(Clone, Debug)]
186pub struct LoadedFragment {
187    pub fragment_id: FragmentId,
188    pub job_id: JobId,
189    pub fragment_type_mask: FragmentTypeMask,
190    pub distribution_type: DistributionType,
191    pub vnode_count: usize,
192    pub nodes: PbStreamNode,
193    pub state_table_ids: HashSet<TableId>,
194    pub parallelism: Option<StreamingParallelism>,
195}
196
197impl From<fragment::Model> for LoadedFragment {
198    fn from(model: fragment::Model) -> Self {
199        Self {
200            fragment_id: model.fragment_id,
201            job_id: model.job_id,
202            fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
203            distribution_type: model.distribution_type,
204            vnode_count: model.vnode_count as usize,
205            nodes: model.stream_node.to_protobuf(),
206            state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
207            parallelism: model.parallelism,
208        }
209    }
210}
211
212#[derive(Default, Debug, Clone)]
213pub struct LoadedFragmentContext {
214    pub ensembles: Vec<NoShuffleEnsemble>,
215    pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
216    pub job_map: HashMap<JobId, streaming_job::Model>,
217    pub streaming_job_databases: HashMap<JobId, DatabaseId>,
218    pub database_map: HashMap<DatabaseId, database::Model>,
219    pub fragment_source_ids: HashMap<FragmentId, SourceId>,
220    pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
221}
222
223impl LoadedFragmentContext {
224    pub fn is_empty(&self) -> bool {
225        if self.ensembles.is_empty() {
226            assert!(
227                self.job_fragments.is_empty(),
228                "non-empty job fragments for empty ensembles: {:?}",
229                self.job_fragments
230            );
231            true
232        } else {
233            false
234        }
235    }
236
237    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
238        let job_ids: HashSet<JobId> = self
239            .streaming_job_databases
240            .iter()
241            .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
242            .collect();
243
244        if job_ids.is_empty() {
245            return None;
246        }
247
248        let job_fragments: HashMap<_, _> = job_ids
249            .iter()
250            .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
251            .collect();
252
253        let fragment_ids: HashSet<_> = job_fragments
254            .values()
255            .flat_map(|fragments| fragments.keys().copied())
256            .collect();
257
258        assert!(
259            !fragment_ids.is_empty(),
260            "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
261        );
262
263        let ensembles: Vec<NoShuffleEnsemble> = self
264            .ensembles
265            .iter()
266            .filter(|ensemble| {
267                if ensemble
268                    .components
269                    .iter()
270                    .any(|fragment_id| fragment_ids.contains(fragment_id))
271                {
272                    assert!(
273                        ensemble
274                            .components
275                            .iter()
276                            .all(|fragment_id| fragment_ids.contains(fragment_id)),
277                        "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
278                    );
279                    true
280                } else {
281                    false
282                }
283            })
284            .cloned()
285            .collect();
286
287        assert!(
288            !ensembles.is_empty(),
289            "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
290        );
291
292        let job_map = job_ids
293            .iter()
294            .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
295            .collect();
296
297        let streaming_job_databases = job_ids
298            .iter()
299            .filter_map(|job_id| {
300                self.streaming_job_databases
301                    .get(job_id)
302                    .map(|db_id| (*job_id, *db_id))
303            })
304            .collect();
305
306        let database_model = self.database_map[&database_id].clone();
307        let database_map = HashMap::from([(database_id, database_model)]);
308
309        let fragment_source_ids = self
310            .fragment_source_ids
311            .iter()
312            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
313            .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
314            .collect();
315
316        let fragment_splits = self
317            .fragment_splits
318            .iter()
319            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
320            .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
321            .collect();
322
323        Some(Self {
324            ensembles,
325            job_fragments,
326            job_map,
327            streaming_job_databases,
328            database_map,
329            fragment_source_ids,
330            fragment_splits,
331        })
332    }
333}
334
335/// Fragment-scoped rendering entry point used by operational tooling.
336/// It validates that the requested fragments are roots of their no-shuffle ensembles,
337/// resolves only the metadata required for those components, and then reuses the shared
338/// rendering pipeline to materialize actor assignments.
339pub async fn render_fragments<C>(
340    txn: &C,
341    actor_id_counter: &AtomicU32,
342    ensembles: Vec<NoShuffleEnsemble>,
343    workers: &HashMap<WorkerId, WorkerNode>,
344    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
345) -> MetaResult<RenderedGraph>
346where
347    C: ConnectionTrait,
348{
349    let loaded = load_fragment_context(txn, ensembles).await?;
350
351    if loaded.is_empty() {
352        return Ok(RenderedGraph::empty());
353    }
354
355    render_actor_assignments(
356        actor_id_counter,
357        workers,
358        adaptive_parallelism_strategy,
359        &loaded,
360    )
361}
362
363/// Async load stage for fragment-scoped rendering. It resolves all metadata required to later
364/// render actor assignments with arbitrary worker sets.
365pub async fn load_fragment_context<C>(
366    txn: &C,
367    ensembles: Vec<NoShuffleEnsemble>,
368) -> MetaResult<LoadedFragmentContext>
369where
370    C: ConnectionTrait,
371{
372    if ensembles.is_empty() {
373        return Ok(LoadedFragmentContext::default());
374    }
375
376    let required_fragment_ids: HashSet<_> = ensembles
377        .iter()
378        .flat_map(|ensemble| ensemble.components.iter().copied())
379        .collect();
380
381    let fragment_models = Fragment::find()
382        .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
383        .all(txn)
384        .await?;
385
386    let found_fragment_ids: HashSet<_> = fragment_models
387        .iter()
388        .map(|fragment| fragment.fragment_id)
389        .collect();
390
391    if found_fragment_ids.len() != required_fragment_ids.len() {
392        let missing = required_fragment_ids
393            .difference(&found_fragment_ids)
394            .copied()
395            .collect_vec();
396        return Err(anyhow!("fragments {:?} not found", missing).into());
397    }
398
399    let fragment_models: HashMap<_, _> = fragment_models
400        .into_iter()
401        .map(|fragment| (fragment.fragment_id, fragment))
402        .collect();
403
404    let job_ids: HashSet<_> = fragment_models
405        .values()
406        .map(|fragment| fragment.job_id)
407        .collect();
408
409    if job_ids.is_empty() {
410        return Ok(LoadedFragmentContext::default());
411    }
412
413    let jobs: HashMap<_, _> = StreamingJob::find()
414        .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
415        .all(txn)
416        .await?
417        .into_iter()
418        .map(|job| (job.job_id, job))
419        .collect();
420
421    let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
422    if found_job_ids.len() != job_ids.len() {
423        let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
424        return Err(anyhow!("streaming jobs {:?} not found", missing).into());
425    }
426
427    build_loaded_context(txn, ensembles, fragment_models, jobs).await
428}
429
430/// Job-scoped rendering entry point that walks every no-shuffle root belonging to the
431/// provided streaming jobs before delegating to the shared rendering backend.
432pub async fn render_jobs<C>(
433    txn: &C,
434    actor_id_counter: &AtomicU32,
435    job_ids: HashSet<JobId>,
436    workers: &HashMap<WorkerId, WorkerNode>,
437    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
438) -> MetaResult<RenderedGraph>
439where
440    C: ConnectionTrait,
441{
442    let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
443
444    if loaded.is_empty() {
445        return Ok(RenderedGraph::empty());
446    }
447
448    render_actor_assignments(
449        actor_id_counter,
450        workers,
451        adaptive_parallelism_strategy,
452        &loaded,
453    )
454}
455
456/// Async load stage for job-scoped rendering. It collects all no-shuffle ensembles and the
457/// metadata required to render actor assignments later with a provided worker set.
458pub async fn load_fragment_context_for_jobs<C>(
459    txn: &C,
460    job_ids: HashSet<JobId>,
461) -> MetaResult<LoadedFragmentContext>
462where
463    C: ConnectionTrait,
464{
465    if job_ids.is_empty() {
466        return Ok(LoadedFragmentContext::default());
467    }
468
469    let excluded_fragments_query = FragmentRelation::find()
470        .select_only()
471        .column(fragment_relation::Column::TargetFragmentId)
472        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
473        .into_query();
474
475    let condition = Condition::all()
476        .add(fragment::Column::JobId.is_in(job_ids.clone()))
477        .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
478
479    let fragments: Vec<FragmentId> = Fragment::find()
480        .select_only()
481        .column(fragment::Column::FragmentId)
482        .filter(condition)
483        .into_tuple()
484        .all(txn)
485        .await?;
486
487    let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
488
489    let fragments = Fragment::find()
490        .filter(
491            fragment::Column::FragmentId.is_in(
492                ensembles
493                    .iter()
494                    .flat_map(|graph| graph.components.iter())
495                    .cloned()
496                    .collect_vec(),
497            ),
498        )
499        .all(txn)
500        .await?;
501
502    let fragment_map: HashMap<_, _> = fragments
503        .into_iter()
504        .map(|fragment| (fragment.fragment_id, fragment))
505        .collect();
506
507    let job_ids = fragment_map
508        .values()
509        .map(|fragment| fragment.job_id)
510        .collect::<BTreeSet<_>>()
511        .into_iter()
512        .collect_vec();
513
514    let jobs: HashMap<_, _> = StreamingJob::find()
515        .filter(streaming_job::Column::JobId.is_in(job_ids))
516        .all(txn)
517        .await?
518        .into_iter()
519        .map(|job| (job.job_id, job))
520        .collect();
521
522    build_loaded_context(txn, ensembles, fragment_map, jobs).await
523}
524
525/// Sync render stage: uses loaded fragment context and current worker info
526/// to produce actor-to-worker assignments and vnode bitmaps.
527pub(crate) fn render_actor_assignments(
528    actor_id_counter: &AtomicU32,
529    worker_map: &HashMap<WorkerId, WorkerNode>,
530    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
531    loaded: &LoadedFragmentContext,
532) -> MetaResult<RenderedGraph> {
533    if loaded.is_empty() {
534        return Ok(RenderedGraph::empty());
535    }
536
537    let backfill_jobs: HashSet<JobId> = loaded
538        .job_map
539        .iter()
540        .filter(|(_, job)| {
541            job.create_type == CreateType::Background && job.job_status == JobStatus::Creating
542        })
543        .map(|(id, _)| *id)
544        .collect();
545
546    let render_context = RenderActorsContext {
547        fragment_source_ids: &loaded.fragment_source_ids,
548        fragment_splits: &loaded.fragment_splits,
549        streaming_job_databases: &loaded.streaming_job_databases,
550        database_map: &loaded.database_map,
551        backfill_jobs: &backfill_jobs,
552    };
553
554    let fragments = render_actors(
555        actor_id_counter,
556        &loaded.ensembles,
557        &loaded.job_fragments,
558        &loaded.job_map,
559        worker_map,
560        adaptive_parallelism_strategy,
561        render_context,
562    )?;
563
564    Ok(RenderedGraph {
565        fragments,
566        ensembles: loaded.ensembles.clone(),
567    })
568}
569
570async fn build_loaded_context<C>(
571    txn: &C,
572    ensembles: Vec<NoShuffleEnsemble>,
573    fragment_models: HashMap<FragmentId, fragment::Model>,
574    job_map: HashMap<JobId, streaming_job::Model>,
575) -> MetaResult<LoadedFragmentContext>
576where
577    C: ConnectionTrait,
578{
579    if ensembles.is_empty() {
580        return Ok(LoadedFragmentContext::default());
581    }
582
583    let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
584    for (fragment_id, model) in fragment_models {
585        job_fragments
586            .entry(model.job_id)
587            .or_default()
588            .try_insert(fragment_id, LoadedFragment::from(model))
589            .expect("duplicate fragment id for job");
590    }
591
592    #[cfg(debug_assertions)]
593    {
594        debug_sanity_check(&ensembles, &job_fragments, &job_map);
595    }
596
597    let (fragment_source_ids, fragment_splits) =
598        resolve_source_fragments(txn, &job_fragments).await?;
599
600    let job_ids = job_map.keys().copied().collect_vec();
601
602    let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
603        .select_only()
604        .column(streaming_job::Column::JobId)
605        .column(object::Column::DatabaseId)
606        .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
607        .filter(streaming_job::Column::JobId.is_in(job_ids))
608        .into_tuple()
609        .all(txn)
610        .await?
611        .into_iter()
612        .collect();
613
614    let database_map: HashMap<_, _> = Database::find()
615        .filter(
616            database::Column::DatabaseId
617                .is_in(streaming_job_databases.values().copied().collect_vec()),
618        )
619        .all(txn)
620        .await?
621        .into_iter()
622        .map(|db| (db.database_id, db))
623        .collect();
624
625    Ok(LoadedFragmentContext {
626        ensembles,
627        job_fragments,
628        job_map,
629        streaming_job_databases,
630        database_map,
631        fragment_source_ids,
632        fragment_splits,
633    })
634}
635
636// Only metadata resolved asynchronously lives here so the renderer stays synchronous
637// and the call site keeps the runtime dependencies (maps, strategy, actor counter, etc.) explicit.
638struct RenderActorsContext<'a> {
639    fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
640    fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
641    streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
642    database_map: &'a HashMap<DatabaseId, database::Model>,
643    backfill_jobs: &'a HashSet<JobId>,
644}
645
646fn render_actors(
647    actor_id_counter: &AtomicU32,
648    ensembles: &[NoShuffleEnsemble],
649    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
650    job_map: &HashMap<JobId, streaming_job::Model>,
651    worker_map: &HashMap<WorkerId, WorkerNode>,
652    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
653    context: RenderActorsContext<'_>,
654) -> MetaResult<FragmentRenderMap> {
655    let RenderActorsContext {
656        fragment_source_ids,
657        fragment_splits: fragment_splits_map,
658        streaming_job_databases,
659        database_map,
660        backfill_jobs,
661    } = context;
662
663    let mut all_fragments: FragmentRenderMap = HashMap::new();
664    let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
665        .values()
666        .flat_map(|fragments| fragments.iter())
667        .map(|(fragment_id, fragment)| (*fragment_id, fragment))
668        .collect();
669
670    for NoShuffleEnsemble {
671        entries,
672        components,
673    } in ensembles
674    {
675        tracing::debug!("rendering ensemble entries {:?}", entries);
676
677        let entry_fragments = entries
678            .iter()
679            .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
680            .collect_vec();
681
682        let entry_fragment_parallelism = entry_fragments
683            .iter()
684            .map(|fragment| fragment.parallelism.clone())
685            .dedup()
686            .exactly_one()
687            .map_err(|_| {
688                anyhow!(
689                    "entry fragments {:?} have inconsistent parallelism settings",
690                    entries.iter().copied().collect_vec()
691                )
692            })?;
693
694        let (job_id, vnode_count) = entry_fragments
695            .iter()
696            .map(|f| (f.job_id, f.vnode_count))
697            .dedup()
698            .exactly_one()
699            .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
700
701        let job = job_map
702            .get(&job_id)
703            .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
704
705        let job_strategy = job
706            .stream_context()
707            .adaptive_parallelism_strategy
708            .unwrap_or(adaptive_parallelism_strategy);
709
710        let resource_group = match &job.specific_resource_group {
711            None => {
712                let database = streaming_job_databases
713                    .get(&job_id)
714                    .and_then(|database_id| database_map.get(database_id))
715                    .unwrap();
716                database.resource_group.clone()
717            }
718            Some(resource_group) => resource_group.clone(),
719        };
720
721        let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
722            .iter()
723            .filter_map(|(worker_id, worker)| {
724                if worker
725                    .resource_group()
726                    .as_deref()
727                    .unwrap_or(DEFAULT_RESOURCE_GROUP)
728                    == resource_group.as_str()
729                {
730                    Some((
731                        *worker_id,
732                        worker
733                            .parallelism()
734                            .expect("should have parallelism for compute node")
735                            .try_into()
736                            .expect("parallelism for compute node"),
737                    ))
738                } else {
739                    None
740                }
741            })
742            .collect();
743
744        let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
745
746        let effective_job_parallelism = if backfill_jobs.contains(&job_id) {
747            job.backfill_parallelism
748                .as_ref()
749                .unwrap_or(&job.parallelism)
750        } else {
751            &job.parallelism
752        };
753
754        let actual_parallelism = match entry_fragment_parallelism
755            .as_ref()
756            .unwrap_or(effective_job_parallelism)
757        {
758            StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
759                job_strategy.compute_target_parallelism(total_parallelism)
760            }
761            StreamingParallelism::Fixed(n) => *n,
762        }
763        .min(vnode_count)
764        .min(job.max_parallelism as usize);
765
766        tracing::debug!(
767            "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
768            job_id,
769            actual_parallelism,
770            job.parallelism,
771            total_parallelism,
772            job.max_parallelism,
773            vnode_count,
774            entry_fragment_parallelism
775        );
776
777        let assigner = AssignerBuilder::new(job_id).build();
778
779        let actors = (0..(actual_parallelism as u32))
780            .map_into::<ActorId>()
781            .collect_vec();
782        let vnodes = (0..vnode_count).collect_vec();
783
784        let assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
785
786        let source_entry_fragment = entry_fragments.iter().find(|f| {
787            let mask = f.fragment_type_mask;
788            if mask.contains(FragmentTypeFlag::Source) {
789                assert!(!mask.contains(FragmentTypeFlag::SourceScan))
790            }
791            mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
792        });
793
794        let (fragment_splits, shared_source_id) = match source_entry_fragment {
795            Some(entry_fragment) => {
796                let source_id = fragment_source_ids
797                    .get(&entry_fragment.fragment_id)
798                    .ok_or_else(|| {
799                        anyhow!(
800                            "missing source id in source fragment {}",
801                            entry_fragment.fragment_id
802                        )
803                    })?;
804
805                let entry_fragment_id = entry_fragment.fragment_id;
806
807                let empty_actor_splits: HashMap<_, _> =
808                    actors.iter().map(|actor_id| (*actor_id, vec![])).collect();
809
810                let splits = fragment_splits_map
811                    .get(&entry_fragment_id)
812                    .cloned()
813                    .unwrap_or_default();
814
815                let splits: BTreeMap<_, _> = splits.into_iter().map(|s| (s.id(), s)).collect();
816
817                let fragment_splits = crate::stream::source_manager::reassign_splits(
818                    entry_fragment_id,
819                    empty_actor_splits,
820                    &splits,
821                    SplitDiffOptions::default(),
822                )
823                .unwrap_or_default();
824                (fragment_splits, Some(*source_id))
825            }
826            None => (HashMap::new(), None),
827        };
828
829        for component_fragment_id in components {
830            let fragment = fragment_lookup.get(component_fragment_id).unwrap();
831            let fragment_id = fragment.fragment_id;
832            let job_id = fragment.job_id;
833            let fragment_type_mask = fragment.fragment_type_mask;
834            let distribution_type = fragment.distribution_type;
835            let stream_node = &fragment.nodes;
836            let state_table_ids = &fragment.state_table_ids;
837            let vnode_count = fragment.vnode_count;
838
839            let actor_count =
840                u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
841            let actor_id_base = actor_id_counter.fetch_add(actor_count, Ordering::Relaxed);
842
843            let actors: HashMap<ActorId, InflightActorInfo> = assignment
844                .iter()
845                .flat_map(|(worker_id, actors)| {
846                    actors
847                        .iter()
848                        .map(move |(actor_id, vnodes)| (worker_id, actor_id, vnodes))
849                })
850                .map(|(&worker_id, &actor_idx, vnodes)| {
851                    let vnode_bitmap = match distribution_type {
852                        DistributionType::Single => None,
853                        DistributionType::Hash => Some(Bitmap::from_indices(vnode_count, vnodes)),
854                    };
855
856                    let actor_id = actor_idx + actor_id_base;
857
858                    let splits = if let Some(source_id) = fragment_source_ids.get(&fragment_id) {
859                        assert_eq!(shared_source_id, Some(*source_id));
860
861                        fragment_splits
862                            .get(&(actor_idx))
863                            .cloned()
864                            .unwrap_or_default()
865                    } else {
866                        vec![]
867                    };
868
869                    (
870                        actor_id,
871                        InflightActorInfo {
872                            worker_id,
873                            vnode_bitmap,
874                            splits,
875                        },
876                    )
877                })
878                .collect();
879
880            let fragment = InflightFragmentInfo {
881                fragment_id,
882                distribution_type,
883                fragment_type_mask,
884                vnode_count,
885                nodes: stream_node.clone(),
886                actors,
887                state_table_ids: state_table_ids.clone(),
888            };
889
890            let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
891                anyhow!("streaming job {job_id} not found in streaming_job_databases")
892            })?;
893
894            all_fragments
895                .entry(database_id)
896                .or_default()
897                .entry(job_id)
898                .or_default()
899                .insert(fragment_id, fragment);
900        }
901    }
902
903    Ok(all_fragments)
904}
905
906#[cfg(debug_assertions)]
907fn debug_sanity_check(
908    ensembles: &[NoShuffleEnsemble],
909    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
910    jobs: &HashMap<JobId, streaming_job::Model>,
911) {
912    let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
913        .iter()
914        .flat_map(|(job_id, fragments)| {
915            fragments
916                .iter()
917                .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
918        })
919        .collect();
920
921    // Debug-only assertions to catch inconsistent ensemble metadata early.
922    debug_assert!(
923        ensembles
924            .iter()
925            .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
926        "entries must be subset of components"
927    );
928
929    let mut missing_fragments = BTreeSet::new();
930    let mut missing_jobs = BTreeSet::new();
931
932    for fragment_id in ensembles
933        .iter()
934        .flat_map(|ensemble| ensemble.components.iter())
935    {
936        match fragment_lookup.get(fragment_id) {
937            Some((fragment, job_id)) => {
938                if !jobs.contains_key(&fragment.job_id) {
939                    missing_jobs.insert(*job_id);
940                }
941            }
942            None => {
943                missing_fragments.insert(*fragment_id);
944            }
945        }
946    }
947
948    debug_assert!(
949        missing_fragments.is_empty(),
950        "missing fragments in fragment_map: {:?}",
951        missing_fragments
952    );
953
954    debug_assert!(
955        missing_jobs.is_empty(),
956        "missing jobs for fragments' job_id: {:?}",
957        missing_jobs
958    );
959
960    for ensemble in ensembles {
961        let unique_vnode_counts: Vec<_> = ensemble
962            .components
963            .iter()
964            .flat_map(|fragment_id| {
965                fragment_lookup
966                    .get(fragment_id)
967                    .map(|(fragment, _)| fragment.vnode_count)
968            })
969            .unique()
970            .collect();
971
972        debug_assert!(
973            unique_vnode_counts.len() <= 1,
974            "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
975            ensemble.components,
976            unique_vnode_counts
977        );
978    }
979}
980
981async fn resolve_source_fragments<C>(
982    txn: &C,
983    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
984) -> MetaResult<(
985    HashMap<FragmentId, SourceId>,
986    HashMap<FragmentId, Vec<SplitImpl>>,
987)>
988where
989    C: ConnectionTrait,
990{
991    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
992    for (fragment_id, fragment) in job_fragments.values().flatten() {
993        let mask = fragment.fragment_type_mask;
994        if mask.contains(FragmentTypeFlag::Source)
995            && let Some(source_id) = fragment.nodes.find_stream_source()
996        {
997            source_fragment_ids
998                .entry(source_id)
999                .or_default()
1000                .insert(*fragment_id);
1001        }
1002
1003        if mask.contains(FragmentTypeFlag::SourceScan)
1004            && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1005        {
1006            source_fragment_ids
1007                .entry(source_id)
1008                .or_default()
1009                .insert(*fragment_id);
1010        }
1011    }
1012
1013    let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1014        .iter()
1015        .flat_map(|(source_id, fragment_ids)| {
1016            fragment_ids
1017                .iter()
1018                .map(|fragment_id| (*fragment_id, *source_id))
1019        })
1020        .collect();
1021
1022    let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1023
1024    let fragment_splits: Vec<_> = FragmentSplits::find()
1025        .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1026        .all(txn)
1027        .await?;
1028
1029    let fragment_splits: HashMap<_, _> = fragment_splits
1030        .into_iter()
1031        .flat_map(|model| {
1032            model.splits.map(|splits| {
1033                (
1034                    model.fragment_id,
1035                    splits
1036                        .to_protobuf()
1037                        .splits
1038                        .iter()
1039                        .flat_map(SplitImpl::try_from)
1040                        .collect_vec(),
1041                )
1042            })
1043        })
1044        .collect();
1045
1046    Ok((fragment_source_ids, fragment_splits))
1047}
1048
1049// Helper struct to make the function signature cleaner and to properly bundle the required data.
1050#[derive(Debug)]
1051pub struct ActorGraph<'a> {
1052    pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1053    pub locations: &'a HashMap<ActorId, WorkerId>,
1054}
1055
1056#[derive(Debug, Clone)]
1057pub struct NoShuffleEnsemble {
1058    entries: HashSet<FragmentId>,
1059    components: HashSet<FragmentId>,
1060}
1061
1062impl NoShuffleEnsemble {
1063    #[cfg(test)]
1064    pub fn for_test(
1065        entries: impl IntoIterator<Item = FragmentId>,
1066        components: impl IntoIterator<Item = FragmentId>,
1067    ) -> Self {
1068        let entries = entries.into_iter().collect();
1069        let components = components.into_iter().collect();
1070        Self {
1071            entries,
1072            components,
1073        }
1074    }
1075
1076    pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1077        self.components.iter().cloned()
1078    }
1079
1080    pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1081        self.entries.iter().copied()
1082    }
1083
1084    pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1085        self.components.iter().copied()
1086    }
1087
1088    pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1089        self.entries.contains(fragment_id)
1090    }
1091}
1092
1093pub async fn find_fragment_no_shuffle_dags_detailed(
1094    db: &impl ConnectionTrait,
1095    initial_fragment_ids: &[FragmentId],
1096) -> MetaResult<Vec<NoShuffleEnsemble>> {
1097    let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1098        .columns([
1099            fragment_relation::Column::SourceFragmentId,
1100            fragment_relation::Column::TargetFragmentId,
1101        ])
1102        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1103        .into_tuple()
1104        .all(db)
1105        .await?;
1106
1107    let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1108    let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1109
1110    for (src, dst) in all_no_shuffle_relations {
1111        forward_edges.entry(src).or_default().push(dst);
1112        backward_edges.entry(dst).or_default().push(src);
1113    }
1114
1115    find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1116}
1117
1118fn find_no_shuffle_graphs(
1119    initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1120    forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1121    backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1122) -> MetaResult<Vec<NoShuffleEnsemble>> {
1123    let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1124    let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1125
1126    for &init_id in initial_fragment_ids {
1127        let init_id = init_id.into();
1128        if globally_visited.contains(&init_id) {
1129            continue;
1130        }
1131
1132        // Found a new component. Traverse it to find all its nodes.
1133        let mut components = HashSet::new();
1134        let mut queue: VecDeque<FragmentId> = VecDeque::new();
1135
1136        queue.push_back(init_id);
1137        globally_visited.insert(init_id);
1138
1139        while let Some(current_id) = queue.pop_front() {
1140            components.insert(current_id);
1141            let neighbors = forward_edges
1142                .get(&current_id)
1143                .into_iter()
1144                .flatten()
1145                .chain(backward_edges.get(&current_id).into_iter().flatten());
1146
1147            for &neighbor_id in neighbors {
1148                if globally_visited.insert(neighbor_id) {
1149                    queue.push_back(neighbor_id);
1150                }
1151            }
1152        }
1153
1154        // For the newly found component, identify its roots.
1155        let mut entries = HashSet::new();
1156        for &node_id in &components {
1157            let is_root = match backward_edges.get(&node_id) {
1158                Some(parents) => parents.iter().all(|p| !components.contains(p)),
1159                None => true,
1160            };
1161            if is_root {
1162                entries.insert(node_id);
1163            }
1164        }
1165
1166        // Store the detailed DAG structure (roots, all nodes in this DAG).
1167        if !entries.is_empty() {
1168            graphs.push(NoShuffleEnsemble {
1169                entries,
1170                components,
1171            });
1172        }
1173    }
1174
1175    Ok(graphs)
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180    use std::collections::{BTreeSet, HashMap, HashSet};
1181    use std::sync::Arc;
1182
1183    use risingwave_connector::source::SplitImpl;
1184    use risingwave_connector::source::test_source::TestSourceSplit;
1185    use risingwave_meta_model::{CreateType, JobStatus};
1186    use risingwave_pb::common::WorkerType;
1187    use risingwave_pb::common::worker_node::Property as WorkerProperty;
1188    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1189
1190    use super::*;
1191
1192    // Helper type aliases for cleaner test code
1193    // Using the actual FragmentId type from the module
1194    type Edges = (
1195        HashMap<FragmentId, Vec<FragmentId>>,
1196        HashMap<FragmentId, Vec<FragmentId>>,
1197    );
1198
1199    /// A helper function to build forward and backward edge maps from a simple list of tuples.
1200    /// This reduces boilerplate in each test.
1201    fn build_edges(relations: &[(u32, u32)]) -> Edges {
1202        let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1203        let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1204        for &(src, dst) in relations {
1205            forward_edges
1206                .entry(src.into())
1207                .or_default()
1208                .push(dst.into());
1209            backward_edges
1210                .entry(dst.into())
1211                .or_default()
1212                .push(src.into());
1213        }
1214        (forward_edges, backward_edges)
1215    }
1216
1217    /// Helper function to create a `HashSet` from a slice easily.
1218    fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1219        ids.iter().map(|id| (*id).into()).collect()
1220    }
1221
1222    fn build_fragment(
1223        fragment_id: FragmentId,
1224        job_id: JobId,
1225        fragment_type_mask: i32,
1226        distribution_type: DistributionType,
1227        vnode_count: i32,
1228        parallelism: StreamingParallelism,
1229    ) -> LoadedFragment {
1230        LoadedFragment {
1231            fragment_id,
1232            job_id,
1233            fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1234            distribution_type,
1235            vnode_count: vnode_count as usize,
1236            nodes: PbStreamNode::default(),
1237            state_table_ids: HashSet::new(),
1238            parallelism: Some(parallelism),
1239        }
1240    }
1241
1242    type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1243
1244    fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1245        let base = fragment.actors.keys().copied().min().unwrap_or_default();
1246
1247        let mut entries: Vec<_> = fragment
1248            .actors
1249            .iter()
1250            .map(|(&actor_id, info)| {
1251                let idx = actor_id.as_raw_id() - base.as_raw_id();
1252                let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1253                    bitmap
1254                        .iter()
1255                        .enumerate()
1256                        .filter_map(|(pos, is_set)| is_set.then_some(pos))
1257                        .collect::<Vec<_>>()
1258                });
1259                let splits = info
1260                    .splits
1261                    .iter()
1262                    .map(|split| split.id().to_string())
1263                    .collect::<Vec<_>>();
1264                (idx.into(), info.worker_id, vnode_indices, splits)
1265            })
1266            .collect();
1267
1268        entries.sort_by_key(|(idx, _, _, _)| *idx);
1269        entries
1270    }
1271
1272    fn build_worker_node(
1273        id: impl Into<WorkerId>,
1274        parallelism: usize,
1275        resource_group: &str,
1276    ) -> WorkerNode {
1277        WorkerNode {
1278            id: id.into(),
1279            r#type: WorkerType::ComputeNode as i32,
1280            property: Some(WorkerProperty {
1281                is_streaming: true,
1282                parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1283                resource_group: Some(resource_group.to_owned()),
1284                ..Default::default()
1285            }),
1286            ..Default::default()
1287        }
1288    }
1289
1290    #[test]
1291    fn test_single_linear_chain() {
1292        // Scenario: A simple linear graph 1 -> 2 -> 3.
1293        // We start from the middle node (2).
1294        let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1295        let initial_ids = &[2];
1296
1297        // Act
1298        let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1299
1300        // Assert
1301        assert!(result.is_ok());
1302        let graphs = result.unwrap();
1303
1304        assert_eq!(graphs.len(), 1);
1305        let graph = &graphs[0];
1306        assert_eq!(graph.entries, to_hashset(&[1]));
1307        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1308    }
1309
1310    #[test]
1311    fn test_two_disconnected_graphs() {
1312        // Scenario: Two separate graphs: 1->2 and 10->11.
1313        // We start with one node from each graph.
1314        let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1315        let initial_ids = &[2, 10];
1316
1317        // Act
1318        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1319
1320        // Assert
1321        assert_eq!(graphs.len(), 2);
1322
1323        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1324        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1325
1326        // Graph 1
1327        assert_eq!(graphs[0].entries, to_hashset(&[1]));
1328        assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1329
1330        // Graph 2
1331        assert_eq!(graphs[1].entries, to_hashset(&[10]));
1332        assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1333    }
1334
1335    #[test]
1336    fn test_multiple_entries_in_one_graph() {
1337        // Scenario: A graph with two roots feeding into one node: 1->3, 2->3.
1338        let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1339        let initial_ids = &[3];
1340
1341        // Act
1342        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1343
1344        // Assert
1345        assert_eq!(graphs.len(), 1);
1346        let graph = &graphs[0];
1347        assert_eq!(graph.entries, to_hashset(&[1, 2]));
1348        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1349    }
1350
1351    #[test]
1352    fn test_diamond_shape_graph() {
1353        // Scenario: A diamond shape: 1->2, 1->3, 2->4, 3->4
1354        let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1355        let initial_ids = &[4];
1356
1357        // Act
1358        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1359
1360        // Assert
1361        assert_eq!(graphs.len(), 1);
1362        let graph = &graphs[0];
1363        assert_eq!(graph.entries, to_hashset(&[1]));
1364        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1365    }
1366
1367    #[test]
1368    fn test_starting_with_multiple_nodes_in_same_graph() {
1369        // Scenario: Start with two different nodes (2 and 4) from the same component.
1370        // Should only identify one graph, not two.
1371        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1372        let initial_ids = &[2, 4];
1373
1374        // Act
1375        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1376
1377        // Assert
1378        assert_eq!(graphs.len(), 1);
1379        let graph = &graphs[0];
1380        assert_eq!(graph.entries, to_hashset(&[1]));
1381        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1382    }
1383
1384    #[test]
1385    fn test_empty_initial_ids() {
1386        // Scenario: The initial ID list is empty.
1387        let (forward, backward) = build_edges(&[(1, 2)]);
1388        let initial_ids: &[u32] = &[];
1389
1390        // Act
1391        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1392
1393        // Assert
1394        assert!(graphs.is_empty());
1395    }
1396
1397    #[test]
1398    fn test_isolated_node_as_input() {
1399        // Scenario: Start with an ID that has no relations.
1400        let (forward, backward) = build_edges(&[(1, 2)]);
1401        let initial_ids = &[100];
1402
1403        // Act
1404        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1405
1406        // Assert
1407        assert_eq!(graphs.len(), 1);
1408        let graph = &graphs[0];
1409        assert_eq!(graph.entries, to_hashset(&[100]));
1410        assert_eq!(graph.components, to_hashset(&[100]));
1411    }
1412
1413    #[test]
1414    fn test_graph_with_a_cycle() {
1415        // Scenario: A graph with a cycle: 1 -> 2 -> 3 -> 1.
1416        // The algorithm should correctly identify all nodes in the component.
1417        // Crucially, NO node is a root because every node has a parent *within the component*.
1418        // Therefore, the `entries` set should be empty, and the graph should not be included in the results.
1419        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1420        let initial_ids = &[2];
1421
1422        // Act
1423        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1424
1425        // Assert
1426        assert!(
1427            graphs.is_empty(),
1428            "A graph with no entries should not be returned"
1429        );
1430    }
1431    #[test]
1432    fn test_custom_complex() {
1433        let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1434        let initial_ids = &[1, 2, 4, 6];
1435
1436        // Act
1437        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1438
1439        // Assert
1440        assert_eq!(graphs.len(), 2);
1441        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1442        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1443
1444        // Graph 1
1445        assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1446        assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1447
1448        // Graph 2
1449        assert_eq!(graphs[1].entries, to_hashset(&[6]));
1450        assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1451    }
1452
1453    #[test]
1454    fn render_actors_increments_actor_counter() {
1455        let actor_id_counter = AtomicU32::new(100);
1456        let fragment_id: FragmentId = 1.into();
1457        let job_id: JobId = 10.into();
1458        let database_id: DatabaseId = DatabaseId::new(3);
1459
1460        let fragment_model = build_fragment(
1461            fragment_id,
1462            job_id,
1463            0,
1464            DistributionType::Single,
1465            1,
1466            StreamingParallelism::Fixed(1),
1467        );
1468
1469        let job_model = streaming_job::Model {
1470            job_id,
1471            job_status: JobStatus::Created,
1472            create_type: CreateType::Foreground,
1473            timezone: None,
1474            config_override: None,
1475            adaptive_parallelism_strategy: None,
1476            parallelism: StreamingParallelism::Fixed(1),
1477            backfill_parallelism: None,
1478            backfill_orders: None,
1479            max_parallelism: 1,
1480            specific_resource_group: None,
1481            is_serverless_backfill: false,
1482        };
1483
1484        let database_model = database::Model {
1485            database_id,
1486            name: "test_db".into(),
1487            resource_group: "rg-a".into(),
1488            barrier_interval_ms: None,
1489            checkpoint_frequency: None,
1490        };
1491
1492        let ensembles = vec![NoShuffleEnsemble {
1493            entries: HashSet::from([fragment_id]),
1494            components: HashSet::from([fragment_id]),
1495        }];
1496
1497        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1498        let job_fragments = HashMap::from([(job_id, fragment_map)]);
1499        let job_map = HashMap::from([(job_id, job_model)]);
1500
1501        let worker_map: HashMap<WorkerId, WorkerNode> =
1502            HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
1503
1504        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1505        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1506        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1507        let database_map = HashMap::from([(database_id, database_model)]);
1508        let backfill_jobs = HashSet::new();
1509
1510        let context = RenderActorsContext {
1511            fragment_source_ids: &fragment_source_ids,
1512            fragment_splits: &fragment_splits,
1513            streaming_job_databases: &streaming_job_databases,
1514            database_map: &database_map,
1515            backfill_jobs: &backfill_jobs,
1516        };
1517
1518        let result = render_actors(
1519            &actor_id_counter,
1520            &ensembles,
1521            &job_fragments,
1522            &job_map,
1523            &worker_map,
1524            AdaptiveParallelismStrategy::Auto,
1525            context,
1526        )
1527        .expect("actor rendering succeeds");
1528
1529        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1530        assert_eq!(state.len(), 1);
1531        assert!(
1532            state[0].2.is_none(),
1533            "single distribution should not assign vnode bitmaps"
1534        );
1535        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
1536    }
1537
1538    #[test]
1539    fn render_actors_aligns_hash_vnode_bitmaps() {
1540        let actor_id_counter = AtomicU32::new(0);
1541        let entry_fragment_id: FragmentId = 1.into();
1542        let downstream_fragment_id: FragmentId = 2.into();
1543        let job_id: JobId = 20.into();
1544        let database_id: DatabaseId = DatabaseId::new(5);
1545
1546        let entry_fragment = build_fragment(
1547            entry_fragment_id,
1548            job_id,
1549            0,
1550            DistributionType::Hash,
1551            4,
1552            StreamingParallelism::Fixed(2),
1553        );
1554
1555        let downstream_fragment = build_fragment(
1556            downstream_fragment_id,
1557            job_id,
1558            0,
1559            DistributionType::Hash,
1560            4,
1561            StreamingParallelism::Fixed(2),
1562        );
1563
1564        let job_model = streaming_job::Model {
1565            job_id,
1566            job_status: JobStatus::Created,
1567            create_type: CreateType::Background,
1568            timezone: None,
1569            config_override: None,
1570            adaptive_parallelism_strategy: None,
1571            parallelism: StreamingParallelism::Fixed(2),
1572            backfill_parallelism: None,
1573            backfill_orders: None,
1574            max_parallelism: 2,
1575            specific_resource_group: None,
1576            is_serverless_backfill: false,
1577        };
1578
1579        let database_model = database::Model {
1580            database_id,
1581            name: "test_db_hash".into(),
1582            resource_group: "rg-hash".into(),
1583            barrier_interval_ms: None,
1584            checkpoint_frequency: None,
1585        };
1586
1587        let ensembles = vec![NoShuffleEnsemble {
1588            entries: HashSet::from([entry_fragment_id]),
1589            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1590        }];
1591
1592        let fragment_map = HashMap::from([
1593            (entry_fragment_id, entry_fragment),
1594            (downstream_fragment_id, downstream_fragment),
1595        ]);
1596        let job_fragments = HashMap::from([(job_id, fragment_map)]);
1597        let job_map = HashMap::from([(job_id, job_model)]);
1598
1599        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
1600            (1.into(), build_worker_node(1, 1, "rg-hash")),
1601            (2.into(), build_worker_node(2, 1, "rg-hash")),
1602        ]);
1603
1604        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1605        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1606        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1607        let database_map = HashMap::from([(database_id, database_model)]);
1608        let backfill_jobs = HashSet::new();
1609
1610        let context = RenderActorsContext {
1611            fragment_source_ids: &fragment_source_ids,
1612            fragment_splits: &fragment_splits,
1613            streaming_job_databases: &streaming_job_databases,
1614            database_map: &database_map,
1615            backfill_jobs: &backfill_jobs,
1616        };
1617
1618        let result = render_actors(
1619            &actor_id_counter,
1620            &ensembles,
1621            &job_fragments,
1622            &job_map,
1623            &worker_map,
1624            AdaptiveParallelismStrategy::Auto,
1625            context,
1626        )
1627        .expect("actor rendering succeeds");
1628
1629        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1630        let downstream_state =
1631            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1632
1633        assert_eq!(entry_state.len(), 2);
1634        assert_eq!(entry_state, downstream_state);
1635
1636        let assigned_vnodes: BTreeSet<_> = entry_state
1637            .iter()
1638            .flat_map(|(_, _, vnodes, _)| {
1639                vnodes
1640                    .as_ref()
1641                    .expect("hash distribution should populate vnode bitmap")
1642                    .iter()
1643                    .copied()
1644            })
1645            .collect();
1646        assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
1647        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1648    }
1649
1650    #[test]
1651    fn render_actors_propagates_source_splits() {
1652        let actor_id_counter = AtomicU32::new(0);
1653        let entry_fragment_id: FragmentId = 11.into();
1654        let downstream_fragment_id: FragmentId = 12.into();
1655        let job_id: JobId = 30.into();
1656        let database_id: DatabaseId = DatabaseId::new(7);
1657        let source_id: SourceId = 99.into();
1658
1659        let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
1660        let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
1661
1662        let entry_fragment = build_fragment(
1663            entry_fragment_id,
1664            job_id,
1665            source_mask,
1666            DistributionType::Hash,
1667            4,
1668            StreamingParallelism::Fixed(2),
1669        );
1670
1671        let downstream_fragment = build_fragment(
1672            downstream_fragment_id,
1673            job_id,
1674            source_scan_mask,
1675            DistributionType::Hash,
1676            4,
1677            StreamingParallelism::Fixed(2),
1678        );
1679
1680        let job_model = streaming_job::Model {
1681            job_id,
1682            job_status: JobStatus::Created,
1683            create_type: CreateType::Background,
1684            timezone: None,
1685            config_override: None,
1686            adaptive_parallelism_strategy: None,
1687            parallelism: StreamingParallelism::Fixed(2),
1688            backfill_parallelism: None,
1689            backfill_orders: None,
1690            max_parallelism: 2,
1691            specific_resource_group: None,
1692            is_serverless_backfill: false,
1693        };
1694
1695        let database_model = database::Model {
1696            database_id,
1697            name: "split_db".into(),
1698            resource_group: "rg-source".into(),
1699            barrier_interval_ms: None,
1700            checkpoint_frequency: None,
1701        };
1702
1703        let ensembles = vec![NoShuffleEnsemble {
1704            entries: HashSet::from([entry_fragment_id]),
1705            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1706        }];
1707
1708        let fragment_map = HashMap::from([
1709            (entry_fragment_id, entry_fragment),
1710            (downstream_fragment_id, downstream_fragment),
1711        ]);
1712        let job_fragments = HashMap::from([(job_id, fragment_map)]);
1713        let job_map = HashMap::from([(job_id, job_model)]);
1714
1715        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
1716            (1.into(), build_worker_node(1, 1, "rg-source")),
1717            (2.into(), build_worker_node(2, 1, "rg-source")),
1718        ]);
1719
1720        let split_a = SplitImpl::Test(TestSourceSplit {
1721            id: Arc::<str>::from("split-a"),
1722            properties: HashMap::new(),
1723            offset: "0".into(),
1724        });
1725        let split_b = SplitImpl::Test(TestSourceSplit {
1726            id: Arc::<str>::from("split-b"),
1727            properties: HashMap::new(),
1728            offset: "0".into(),
1729        });
1730
1731        let fragment_source_ids = HashMap::from([
1732            (entry_fragment_id, source_id),
1733            (downstream_fragment_id, source_id),
1734        ]);
1735        let fragment_splits =
1736            HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
1737        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1738        let database_map = HashMap::from([(database_id, database_model)]);
1739        let backfill_jobs = HashSet::new();
1740
1741        let context = RenderActorsContext {
1742            fragment_source_ids: &fragment_source_ids,
1743            fragment_splits: &fragment_splits,
1744            streaming_job_databases: &streaming_job_databases,
1745            database_map: &database_map,
1746            backfill_jobs: &backfill_jobs,
1747        };
1748
1749        let result = render_actors(
1750            &actor_id_counter,
1751            &ensembles,
1752            &job_fragments,
1753            &job_map,
1754            &worker_map,
1755            AdaptiveParallelismStrategy::Auto,
1756            context,
1757        )
1758        .expect("actor rendering succeeds");
1759
1760        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1761        let downstream_state =
1762            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1763
1764        assert_eq!(entry_state, downstream_state);
1765
1766        let split_ids: BTreeSet<_> = entry_state
1767            .iter()
1768            .flat_map(|(_, _, _, splits)| splits.iter().cloned())
1769            .collect();
1770        assert_eq!(
1771            split_ids,
1772            BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
1773        );
1774        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1775    }
1776
1777    /// Test that job-level strategy overrides global strategy for Adaptive parallelism.
1778    #[test]
1779    fn render_actors_job_strategy_overrides_global() {
1780        let actor_id_counter = AtomicU32::new(0);
1781        let fragment_id: FragmentId = 1.into();
1782        let job_id: JobId = 100.into();
1783        let database_id: DatabaseId = DatabaseId::new(10);
1784
1785        // Fragment with Adaptive parallelism, vnode_count = 8
1786        let fragment_model = build_fragment(
1787            fragment_id,
1788            job_id,
1789            0,
1790            DistributionType::Hash,
1791            8,
1792            StreamingParallelism::Adaptive,
1793        );
1794
1795        // Job has custom strategy: BOUNDED(2)
1796        let job_model = streaming_job::Model {
1797            job_id,
1798            job_status: JobStatus::Created,
1799            create_type: CreateType::Foreground,
1800            timezone: None,
1801            config_override: None,
1802            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1803            parallelism: StreamingParallelism::Adaptive,
1804            backfill_parallelism: None,
1805            backfill_orders: None,
1806            max_parallelism: 8,
1807            specific_resource_group: None,
1808            is_serverless_backfill: false,
1809        };
1810
1811        let database_model = database::Model {
1812            database_id,
1813            name: "test_db".into(),
1814            resource_group: "default".into(),
1815            barrier_interval_ms: None,
1816            checkpoint_frequency: None,
1817        };
1818
1819        let ensembles = vec![NoShuffleEnsemble {
1820            entries: HashSet::from([fragment_id]),
1821            components: HashSet::from([fragment_id]),
1822        }];
1823
1824        let fragment_map =
1825            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
1826        let job_map = HashMap::from([(job_id, job_model)]);
1827
1828        // 4 workers with 1 parallelism each = total 4 parallelism
1829        let worker_map = HashMap::from([
1830            (1.into(), build_worker_node(1, 1, "default")),
1831            (2.into(), build_worker_node(2, 1, "default")),
1832            (3.into(), build_worker_node(3, 1, "default")),
1833            (4.into(), build_worker_node(4, 1, "default")),
1834        ]);
1835
1836        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1837        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1838        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1839        let database_map = HashMap::from([(database_id, database_model)]);
1840        let backfill_jobs = HashSet::new();
1841
1842        let context = RenderActorsContext {
1843            fragment_source_ids: &fragment_source_ids,
1844            fragment_splits: &fragment_splits,
1845            streaming_job_databases: &streaming_job_databases,
1846            database_map: &database_map,
1847            backfill_jobs: &backfill_jobs,
1848        };
1849
1850        // Global strategy is FULL (would give 4 actors), but job strategy is BOUNDED(2)
1851        let result = render_actors(
1852            &actor_id_counter,
1853            &ensembles,
1854            &fragment_map,
1855            &job_map,
1856            &worker_map,
1857            AdaptiveParallelismStrategy::Full,
1858            context,
1859        )
1860        .expect("actor rendering succeeds");
1861
1862        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1863        // Job strategy BOUNDED(2) should limit to 2 actors, not 4 (global FULL)
1864        assert_eq!(
1865            state.len(),
1866            2,
1867            "Job strategy BOUNDED(2) should override global FULL"
1868        );
1869    }
1870
1871    /// Test that global strategy is used when job has no custom strategy.
1872    #[test]
1873    fn render_actors_uses_global_strategy_when_job_has_none() {
1874        let actor_id_counter = AtomicU32::new(0);
1875        let fragment_id: FragmentId = 1.into();
1876        let job_id: JobId = 101.into();
1877        let database_id: DatabaseId = DatabaseId::new(11);
1878
1879        let fragment_model = build_fragment(
1880            fragment_id,
1881            job_id,
1882            0,
1883            DistributionType::Hash,
1884            8,
1885            StreamingParallelism::Adaptive,
1886        );
1887
1888        // Job has NO custom strategy (None)
1889        let job_model = streaming_job::Model {
1890            job_id,
1891            job_status: JobStatus::Created,
1892            create_type: CreateType::Foreground,
1893            timezone: None,
1894            config_override: None,
1895            adaptive_parallelism_strategy: None, // No custom strategy
1896            parallelism: StreamingParallelism::Adaptive,
1897            backfill_parallelism: None,
1898            backfill_orders: None,
1899            max_parallelism: 8,
1900            specific_resource_group: None,
1901            is_serverless_backfill: false,
1902        };
1903
1904        let database_model = database::Model {
1905            database_id,
1906            name: "test_db".into(),
1907            resource_group: "default".into(),
1908            barrier_interval_ms: None,
1909            checkpoint_frequency: None,
1910        };
1911
1912        let ensembles = vec![NoShuffleEnsemble {
1913            entries: HashSet::from([fragment_id]),
1914            components: HashSet::from([fragment_id]),
1915        }];
1916
1917        let fragment_map =
1918            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
1919        let job_map = HashMap::from([(job_id, job_model)]);
1920
1921        // 4 workers = total 4 parallelism
1922        let worker_map = HashMap::from([
1923            (1.into(), build_worker_node(1, 1, "default")),
1924            (2.into(), build_worker_node(2, 1, "default")),
1925            (3.into(), build_worker_node(3, 1, "default")),
1926            (4.into(), build_worker_node(4, 1, "default")),
1927        ]);
1928
1929        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1930        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1931        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1932        let database_map = HashMap::from([(database_id, database_model)]);
1933        let backfill_jobs = HashSet::new();
1934
1935        let context = RenderActorsContext {
1936            fragment_source_ids: &fragment_source_ids,
1937            fragment_splits: &fragment_splits,
1938            streaming_job_databases: &streaming_job_databases,
1939            database_map: &database_map,
1940            backfill_jobs: &backfill_jobs,
1941        };
1942
1943        // Global strategy is BOUNDED(3)
1944        let result = render_actors(
1945            &actor_id_counter,
1946            &ensembles,
1947            &fragment_map,
1948            &job_map,
1949            &worker_map,
1950            AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(3).unwrap()),
1951            context,
1952        )
1953        .expect("actor rendering succeeds");
1954
1955        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1956        // Should use global strategy BOUNDED(3)
1957        assert_eq!(
1958            state.len(),
1959            3,
1960            "Should use global strategy BOUNDED(3) when job has no custom strategy"
1961        );
1962    }
1963
1964    /// Test that Fixed parallelism ignores strategy entirely.
1965    #[test]
1966    fn render_actors_fixed_parallelism_ignores_strategy() {
1967        let actor_id_counter = AtomicU32::new(0);
1968        let fragment_id: FragmentId = 1.into();
1969        let job_id: JobId = 102.into();
1970        let database_id: DatabaseId = DatabaseId::new(12);
1971
1972        // Fragment with FIXED parallelism
1973        let fragment_model = build_fragment(
1974            fragment_id,
1975            job_id,
1976            0,
1977            DistributionType::Hash,
1978            8,
1979            StreamingParallelism::Fixed(5),
1980        );
1981
1982        // Job has custom strategy, but it should be ignored for Fixed parallelism
1983        let job_model = streaming_job::Model {
1984            job_id,
1985            job_status: JobStatus::Created,
1986            create_type: CreateType::Foreground,
1987            timezone: None,
1988            config_override: None,
1989            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1990            parallelism: StreamingParallelism::Fixed(5),
1991            backfill_parallelism: None,
1992            backfill_orders: None,
1993            max_parallelism: 8,
1994            specific_resource_group: None,
1995            is_serverless_backfill: false,
1996        };
1997
1998        let database_model = database::Model {
1999            database_id,
2000            name: "test_db".into(),
2001            resource_group: "default".into(),
2002            barrier_interval_ms: None,
2003            checkpoint_frequency: None,
2004        };
2005
2006        let ensembles = vec![NoShuffleEnsemble {
2007            entries: HashSet::from([fragment_id]),
2008            components: HashSet::from([fragment_id]),
2009        }];
2010
2011        let fragment_map =
2012            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2013        let job_map = HashMap::from([(job_id, job_model)]);
2014
2015        // 6 workers = total 6 parallelism
2016        let worker_map = HashMap::from([
2017            (1.into(), build_worker_node(1, 1, "default")),
2018            (2.into(), build_worker_node(2, 1, "default")),
2019            (3.into(), build_worker_node(3, 1, "default")),
2020            (4.into(), build_worker_node(4, 1, "default")),
2021            (5.into(), build_worker_node(5, 1, "default")),
2022            (6.into(), build_worker_node(6, 1, "default")),
2023        ]);
2024
2025        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2026        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2027        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2028        let database_map = HashMap::from([(database_id, database_model)]);
2029        let backfill_jobs = HashSet::new();
2030
2031        let context = RenderActorsContext {
2032            fragment_source_ids: &fragment_source_ids,
2033            fragment_splits: &fragment_splits,
2034            streaming_job_databases: &streaming_job_databases,
2035            database_map: &database_map,
2036            backfill_jobs: &backfill_jobs,
2037        };
2038
2039        let result = render_actors(
2040            &actor_id_counter,
2041            &ensembles,
2042            &fragment_map,
2043            &job_map,
2044            &worker_map,
2045            AdaptiveParallelismStrategy::Full,
2046            context,
2047        )
2048        .expect("actor rendering succeeds");
2049
2050        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2051        // Fixed(5) should be used, ignoring both job strategy BOUNDED(2) and global FULL
2052        assert_eq!(
2053            state.len(),
2054            5,
2055            "Fixed parallelism should ignore all strategies"
2056        );
2057    }
2058
2059    /// Test RATIO strategy calculation.
2060    #[test]
2061    fn render_actors_ratio_strategy() {
2062        let actor_id_counter = AtomicU32::new(0);
2063        let fragment_id: FragmentId = 1.into();
2064        let job_id: JobId = 103.into();
2065        let database_id: DatabaseId = DatabaseId::new(13);
2066
2067        let fragment_model = build_fragment(
2068            fragment_id,
2069            job_id,
2070            0,
2071            DistributionType::Hash,
2072            16,
2073            StreamingParallelism::Adaptive,
2074        );
2075
2076        // Job has RATIO(0.5) strategy
2077        let job_model = streaming_job::Model {
2078            job_id,
2079            job_status: JobStatus::Created,
2080            create_type: CreateType::Foreground,
2081            timezone: None,
2082            config_override: None,
2083            adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2084            parallelism: StreamingParallelism::Adaptive,
2085            backfill_parallelism: None,
2086            backfill_orders: None,
2087            max_parallelism: 16,
2088            specific_resource_group: None,
2089            is_serverless_backfill: false,
2090        };
2091
2092        let database_model = database::Model {
2093            database_id,
2094            name: "test_db".into(),
2095            resource_group: "default".into(),
2096            barrier_interval_ms: None,
2097            checkpoint_frequency: None,
2098        };
2099
2100        let ensembles = vec![NoShuffleEnsemble {
2101            entries: HashSet::from([fragment_id]),
2102            components: HashSet::from([fragment_id]),
2103        }];
2104
2105        let fragment_map =
2106            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2107        let job_map = HashMap::from([(job_id, job_model)]);
2108
2109        // 8 workers = total 8 parallelism
2110        let worker_map = HashMap::from([
2111            (1.into(), build_worker_node(1, 1, "default")),
2112            (2.into(), build_worker_node(2, 1, "default")),
2113            (3.into(), build_worker_node(3, 1, "default")),
2114            (4.into(), build_worker_node(4, 1, "default")),
2115            (5.into(), build_worker_node(5, 1, "default")),
2116            (6.into(), build_worker_node(6, 1, "default")),
2117            (7.into(), build_worker_node(7, 1, "default")),
2118            (8.into(), build_worker_node(8, 1, "default")),
2119        ]);
2120
2121        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2122        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2123        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2124        let database_map = HashMap::from([(database_id, database_model)]);
2125        let backfill_jobs = HashSet::new();
2126
2127        let context = RenderActorsContext {
2128            fragment_source_ids: &fragment_source_ids,
2129            fragment_splits: &fragment_splits,
2130            streaming_job_databases: &streaming_job_databases,
2131            database_map: &database_map,
2132            backfill_jobs: &backfill_jobs,
2133        };
2134
2135        let result = render_actors(
2136            &actor_id_counter,
2137            &ensembles,
2138            &fragment_map,
2139            &job_map,
2140            &worker_map,
2141            AdaptiveParallelismStrategy::Full,
2142            context,
2143        )
2144        .expect("actor rendering succeeds");
2145
2146        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2147        // RATIO(0.5) of 8 = 4
2148        assert_eq!(
2149            state.len(),
2150            4,
2151            "RATIO(0.5) of 8 workers should give 4 actors"
2152        );
2153    }
2154}