risingwave_meta/controller/
scale.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::system_param::AdaptiveParallelismStrategy;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_connector::source::{SplitImpl, SplitMetaData};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::prelude::{
29    Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
30};
31use risingwave_meta_model::{
32    CreateType, DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism,
33    WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink, source,
34    streaming_job, table,
35};
36use risingwave_meta_model_migration::Condition;
37use risingwave_pb::common::WorkerNode;
38use risingwave_pb::stream_plan::PbStreamNode;
39use sea_orm::{
40    ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
41    RelationTrait,
42};
43
44use crate::MetaResult;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::manager::ActiveStreamingWorkerNodes;
47use crate::model::{ActorId, StreamActor, StreamingJobModelContextExt};
48use crate::stream::{AssignerBuilder, SplitDiffOptions};
49
50pub(crate) async fn resolve_streaming_job_definition<C>(
51    txn: &C,
52    job_ids: &HashSet<JobId>,
53) -> MetaResult<HashMap<JobId, String>>
54where
55    C: ConnectionTrait,
56{
57    let job_ids = job_ids.iter().cloned().collect_vec();
58
59    // including table, materialized view, index
60    let common_job_definitions: Vec<(JobId, String)> = Table::find()
61        .select_only()
62        .columns([
63            table::Column::TableId,
64            #[cfg(not(debug_assertions))]
65            table::Column::Name,
66            #[cfg(debug_assertions)]
67            table::Column::Definition,
68        ])
69        .filter(table::Column::TableId.is_in(job_ids.clone()))
70        .into_tuple()
71        .all(txn)
72        .await?;
73
74    let sink_definitions: Vec<(JobId, String)> = Sink::find()
75        .select_only()
76        .columns([
77            sink::Column::SinkId,
78            #[cfg(not(debug_assertions))]
79            sink::Column::Name,
80            #[cfg(debug_assertions)]
81            sink::Column::Definition,
82        ])
83        .filter(sink::Column::SinkId.is_in(job_ids.clone()))
84        .into_tuple()
85        .all(txn)
86        .await?;
87
88    let source_definitions: Vec<(JobId, String)> = Source::find()
89        .select_only()
90        .columns([
91            source::Column::SourceId,
92            #[cfg(not(debug_assertions))]
93            source::Column::Name,
94            #[cfg(debug_assertions)]
95            source::Column::Definition,
96        ])
97        .filter(source::Column::SourceId.is_in(job_ids.clone()))
98        .into_tuple()
99        .all(txn)
100        .await?;
101
102    let definitions: HashMap<JobId, String> = common_job_definitions
103        .into_iter()
104        .chain(sink_definitions.into_iter())
105        .chain(source_definitions.into_iter())
106        .collect();
107
108    Ok(definitions)
109}
110
111pub async fn load_fragment_info<C>(
112    txn: &C,
113    actor_id_counter: &AtomicU32,
114    database_id: Option<DatabaseId>,
115    worker_nodes: &ActiveStreamingWorkerNodes,
116    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
117) -> MetaResult<FragmentRenderMap>
118where
119    C: ConnectionTrait,
120{
121    let mut query = StreamingJob::find()
122        .select_only()
123        .column(streaming_job::Column::JobId);
124
125    if let Some(database_id) = database_id {
126        query = query
127            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
128            .filter(object::Column::DatabaseId.eq(database_id));
129    }
130
131    let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
132
133    if jobs.is_empty() {
134        return Ok(HashMap::new());
135    }
136
137    let jobs: HashSet<JobId> = jobs.into_iter().collect();
138
139    let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
140
141    if loaded.is_empty() {
142        return Ok(HashMap::new());
143    }
144
145    let RenderedGraph { fragments, .. } = render_actor_assignments(
146        actor_id_counter,
147        worker_nodes.current(),
148        adaptive_parallelism_strategy,
149        &loaded,
150    )?;
151
152    Ok(fragments)
153}
154
155#[derive(Debug)]
156pub struct TargetResourcePolicy {
157    pub resource_group: Option<String>,
158    pub parallelism: StreamingParallelism,
159}
160
161#[derive(Debug, Clone)]
162pub struct WorkerInfo {
163    pub parallelism: NonZeroUsize,
164    pub resource_group: Option<String>,
165}
166
167pub type FragmentRenderMap =
168    HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
169
170#[derive(Default)]
171pub struct RenderedGraph {
172    pub fragments: FragmentRenderMap,
173    pub ensembles: Vec<NoShuffleEnsemble>,
174}
175
176impl RenderedGraph {
177    pub fn empty() -> Self {
178        Self::default()
179    }
180}
181
182/// Context loaded asynchronously from database, containing all metadata
183/// required to render actor assignments. This separates async I/O from
184/// sync rendering logic.
185#[derive(Clone, Debug)]
186pub struct LoadedFragment {
187    pub fragment_id: FragmentId,
188    pub job_id: JobId,
189    pub fragment_type_mask: FragmentTypeMask,
190    pub distribution_type: DistributionType,
191    pub vnode_count: usize,
192    pub nodes: PbStreamNode,
193    pub state_table_ids: HashSet<TableId>,
194    pub parallelism: Option<StreamingParallelism>,
195}
196
197impl From<fragment::Model> for LoadedFragment {
198    fn from(model: fragment::Model) -> Self {
199        Self {
200            fragment_id: model.fragment_id,
201            job_id: model.job_id,
202            fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
203            distribution_type: model.distribution_type,
204            vnode_count: model.vnode_count as usize,
205            nodes: model.stream_node.to_protobuf(),
206            state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
207            parallelism: model.parallelism,
208        }
209    }
210}
211
212#[derive(Default, Debug, Clone)]
213pub struct LoadedFragmentContext {
214    pub ensembles: Vec<NoShuffleEnsemble>,
215    pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
216    pub job_map: HashMap<JobId, streaming_job::Model>,
217    pub streaming_job_databases: HashMap<JobId, DatabaseId>,
218    pub database_map: HashMap<DatabaseId, database::Model>,
219    pub fragment_source_ids: HashMap<FragmentId, SourceId>,
220    pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
221}
222
223impl LoadedFragmentContext {
224    pub fn is_empty(&self) -> bool {
225        if self.ensembles.is_empty() {
226            assert!(
227                self.job_fragments.is_empty(),
228                "non-empty job fragments for empty ensembles: {:?}",
229                self.job_fragments
230            );
231            true
232        } else {
233            false
234        }
235    }
236
237    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
238        let job_ids: HashSet<JobId> = self
239            .streaming_job_databases
240            .iter()
241            .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
242            .collect();
243
244        if job_ids.is_empty() {
245            return None;
246        }
247
248        let job_fragments: HashMap<_, _> = job_ids
249            .iter()
250            .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
251            .collect();
252
253        let fragment_ids: HashSet<_> = job_fragments
254            .values()
255            .flat_map(|fragments| fragments.keys().copied())
256            .collect();
257
258        assert!(
259            !fragment_ids.is_empty(),
260            "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
261        );
262
263        let ensembles: Vec<NoShuffleEnsemble> = self
264            .ensembles
265            .iter()
266            .filter(|ensemble| {
267                if ensemble
268                    .components
269                    .iter()
270                    .any(|fragment_id| fragment_ids.contains(fragment_id))
271                {
272                    assert!(
273                        ensemble
274                            .components
275                            .iter()
276                            .all(|fragment_id| fragment_ids.contains(fragment_id)),
277                        "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
278                    );
279                    true
280                } else {
281                    false
282                }
283            })
284            .cloned()
285            .collect();
286
287        assert!(
288            !ensembles.is_empty(),
289            "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
290        );
291
292        let job_map = job_ids
293            .iter()
294            .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
295            .collect();
296
297        let streaming_job_databases = job_ids
298            .iter()
299            .filter_map(|job_id| {
300                self.streaming_job_databases
301                    .get(job_id)
302                    .map(|db_id| (*job_id, *db_id))
303            })
304            .collect();
305
306        let database_model = self.database_map[&database_id].clone();
307        let database_map = HashMap::from([(database_id, database_model)]);
308
309        let fragment_source_ids = self
310            .fragment_source_ids
311            .iter()
312            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
313            .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
314            .collect();
315
316        let fragment_splits = self
317            .fragment_splits
318            .iter()
319            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
320            .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
321            .collect();
322
323        Some(Self {
324            ensembles,
325            job_fragments,
326            job_map,
327            streaming_job_databases,
328            database_map,
329            fragment_source_ids,
330            fragment_splits,
331        })
332    }
333
334    /// Split this loaded context by database in a single ownership pass to avoid cloning large
335    /// fragment payloads (for example `stream_node` in `LoadedFragment`).
336    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
337        let Self {
338            ensembles,
339            mut job_fragments,
340            mut job_map,
341            streaming_job_databases,
342            mut database_map,
343            mut fragment_source_ids,
344            mut fragment_splits,
345        } = self;
346
347        let mut contexts = HashMap::<DatabaseId, Self>::new();
348        let mut fragment_databases = HashMap::<FragmentId, DatabaseId>::new();
349        let mut unresolved_ensembles = 0usize;
350        let mut unresolved_ensemble_sample: Option<Vec<FragmentId>> = None;
351
352        for (job_id, database_id) in streaming_job_databases {
353            let context = contexts.entry(database_id).or_insert_with(|| {
354                let database_model = database_map
355                    .remove(&database_id)
356                    .expect("database should exist for streaming job");
357                Self {
358                    ensembles: Vec::new(),
359                    job_fragments: HashMap::new(),
360                    job_map: HashMap::new(),
361                    streaming_job_databases: HashMap::new(),
362                    database_map: HashMap::from([(database_id, database_model)]),
363                    fragment_source_ids: HashMap::new(),
364                    fragment_splits: HashMap::new(),
365                }
366            });
367
368            let fragments = job_fragments
369                .remove(&job_id)
370                .expect("job fragments should exist for streaming job");
371            for fragment_id in fragments.keys().copied() {
372                fragment_databases.insert(fragment_id, database_id);
373                if let Some(source_id) = fragment_source_ids.remove(&fragment_id) {
374                    context.fragment_source_ids.insert(fragment_id, source_id);
375                }
376                if let Some(splits) = fragment_splits.remove(&fragment_id) {
377                    context.fragment_splits.insert(fragment_id, splits);
378                }
379            }
380
381            assert!(
382                context
383                    .job_map
384                    .insert(
385                        job_id,
386                        job_map
387                            .remove(&job_id)
388                            .expect("streaming job should exist for loaded context"),
389                    )
390                    .is_none(),
391                "duplicated streaming job"
392            );
393            assert!(
394                context.job_fragments.insert(job_id, fragments).is_none(),
395                "duplicated job fragments"
396            );
397            assert!(
398                context
399                    .streaming_job_databases
400                    .insert(job_id, database_id)
401                    .is_none(),
402                "duplicated job database mapping"
403            );
404        }
405
406        for ensemble in ensembles {
407            let Some(database_id) = ensemble
408                .components
409                .iter()
410                .find_map(|fragment_id| fragment_databases.get(fragment_id).copied())
411            else {
412                unresolved_ensembles += 1;
413                if unresolved_ensemble_sample.is_none() {
414                    unresolved_ensemble_sample =
415                        Some(ensemble.components.iter().copied().collect());
416                }
417                continue;
418            };
419
420            debug_assert!(
421                ensemble
422                    .components
423                    .iter()
424                    .all(|fragment_id| fragment_databases.get(fragment_id) == Some(&database_id)),
425                "ensemble {ensemble:?} should belong to a single database"
426            );
427
428            contexts
429                .get_mut(&database_id)
430                .expect("database context should exist for ensemble")
431                .ensembles
432                .push(ensemble);
433        }
434
435        if unresolved_ensembles > 0 {
436            tracing::warn!(
437                unresolved_ensembles,
438                ?unresolved_ensemble_sample,
439                known_fragments = fragment_databases.len(),
440                "skip ensembles without resolved database while splitting loaded context"
441            );
442        }
443        debug_assert_eq!(
444            unresolved_ensembles, 0,
445            "all ensembles should be mappable to a database"
446        );
447
448        contexts
449    }
450}
451
452/// Fragment-scoped rendering entry point used by operational tooling.
453/// It validates that the requested fragments are roots of their no-shuffle ensembles,
454/// resolves only the metadata required for those components, and then reuses the shared
455/// rendering pipeline to materialize actor assignments.
456pub async fn render_fragments<C>(
457    txn: &C,
458    actor_id_counter: &AtomicU32,
459    ensembles: Vec<NoShuffleEnsemble>,
460    workers: &HashMap<WorkerId, WorkerNode>,
461    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
462) -> MetaResult<RenderedGraph>
463where
464    C: ConnectionTrait,
465{
466    let loaded = load_fragment_context(txn, ensembles).await?;
467
468    if loaded.is_empty() {
469        return Ok(RenderedGraph::empty());
470    }
471
472    render_actor_assignments(
473        actor_id_counter,
474        workers,
475        adaptive_parallelism_strategy,
476        &loaded,
477    )
478}
479
480/// Async load stage for fragment-scoped rendering. It resolves all metadata required to later
481/// render actor assignments with arbitrary worker sets.
482pub async fn load_fragment_context<C>(
483    txn: &C,
484    ensembles: Vec<NoShuffleEnsemble>,
485) -> MetaResult<LoadedFragmentContext>
486where
487    C: ConnectionTrait,
488{
489    if ensembles.is_empty() {
490        return Ok(LoadedFragmentContext::default());
491    }
492
493    let required_fragment_ids: HashSet<_> = ensembles
494        .iter()
495        .flat_map(|ensemble| ensemble.components.iter().copied())
496        .collect();
497
498    let fragment_models = Fragment::find()
499        .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
500        .all(txn)
501        .await?;
502
503    let found_fragment_ids: HashSet<_> = fragment_models
504        .iter()
505        .map(|fragment| fragment.fragment_id)
506        .collect();
507
508    if found_fragment_ids.len() != required_fragment_ids.len() {
509        let missing = required_fragment_ids
510            .difference(&found_fragment_ids)
511            .copied()
512            .collect_vec();
513        return Err(anyhow!("fragments {:?} not found", missing).into());
514    }
515
516    let fragment_models: HashMap<_, _> = fragment_models
517        .into_iter()
518        .map(|fragment| (fragment.fragment_id, fragment))
519        .collect();
520
521    let job_ids: HashSet<_> = fragment_models
522        .values()
523        .map(|fragment| fragment.job_id)
524        .collect();
525
526    if job_ids.is_empty() {
527        return Ok(LoadedFragmentContext::default());
528    }
529
530    let jobs: HashMap<_, _> = StreamingJob::find()
531        .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
532        .all(txn)
533        .await?
534        .into_iter()
535        .map(|job| (job.job_id, job))
536        .collect();
537
538    let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
539    if found_job_ids.len() != job_ids.len() {
540        let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
541        return Err(anyhow!("streaming jobs {:?} not found", missing).into());
542    }
543
544    build_loaded_context(txn, ensembles, fragment_models, jobs).await
545}
546
547/// Job-scoped rendering entry point that walks every no-shuffle root belonging to the
548/// provided streaming jobs before delegating to the shared rendering backend.
549pub async fn render_jobs<C>(
550    txn: &C,
551    actor_id_counter: &AtomicU32,
552    job_ids: HashSet<JobId>,
553    workers: &HashMap<WorkerId, WorkerNode>,
554    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
555) -> MetaResult<RenderedGraph>
556where
557    C: ConnectionTrait,
558{
559    let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
560
561    if loaded.is_empty() {
562        return Ok(RenderedGraph::empty());
563    }
564
565    render_actor_assignments(
566        actor_id_counter,
567        workers,
568        adaptive_parallelism_strategy,
569        &loaded,
570    )
571}
572
573/// Async load stage for job-scoped rendering. It collects all no-shuffle ensembles and the
574/// metadata required to render actor assignments later with a provided worker set.
575pub async fn load_fragment_context_for_jobs<C>(
576    txn: &C,
577    job_ids: HashSet<JobId>,
578) -> MetaResult<LoadedFragmentContext>
579where
580    C: ConnectionTrait,
581{
582    if job_ids.is_empty() {
583        return Ok(LoadedFragmentContext::default());
584    }
585
586    let excluded_fragments_query = FragmentRelation::find()
587        .select_only()
588        .column(fragment_relation::Column::TargetFragmentId)
589        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
590        .into_query();
591
592    let condition = Condition::all()
593        .add(fragment::Column::JobId.is_in(job_ids.clone()))
594        .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
595
596    let fragments: Vec<FragmentId> = Fragment::find()
597        .select_only()
598        .column(fragment::Column::FragmentId)
599        .filter(condition)
600        .into_tuple()
601        .all(txn)
602        .await?;
603
604    let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
605
606    let fragments = Fragment::find()
607        .filter(
608            fragment::Column::FragmentId.is_in(
609                ensembles
610                    .iter()
611                    .flat_map(|graph| graph.components.iter())
612                    .cloned()
613                    .collect_vec(),
614            ),
615        )
616        .all(txn)
617        .await?;
618
619    let fragment_map: HashMap<_, _> = fragments
620        .into_iter()
621        .map(|fragment| (fragment.fragment_id, fragment))
622        .collect();
623
624    let job_ids = fragment_map
625        .values()
626        .map(|fragment| fragment.job_id)
627        .collect::<BTreeSet<_>>()
628        .into_iter()
629        .collect_vec();
630
631    let jobs: HashMap<_, _> = StreamingJob::find()
632        .filter(streaming_job::Column::JobId.is_in(job_ids))
633        .all(txn)
634        .await?
635        .into_iter()
636        .map(|job| (job.job_id, job))
637        .collect();
638
639    build_loaded_context(txn, ensembles, fragment_map, jobs).await
640}
641
642/// Sync render stage: uses loaded fragment context and current worker info
643/// to produce actor-to-worker assignments and vnode bitmaps.
644pub(crate) fn render_actor_assignments(
645    actor_id_counter: &AtomicU32,
646    worker_map: &HashMap<WorkerId, WorkerNode>,
647    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
648    loaded: &LoadedFragmentContext,
649) -> MetaResult<RenderedGraph> {
650    if loaded.is_empty() {
651        return Ok(RenderedGraph::empty());
652    }
653
654    let backfill_jobs: HashSet<JobId> = loaded
655        .job_map
656        .iter()
657        .filter(|(_, job)| {
658            job.create_type == CreateType::Background && job.job_status == JobStatus::Creating
659        })
660        .map(|(id, _)| *id)
661        .collect();
662
663    let render_context = RenderActorsContext {
664        fragment_source_ids: &loaded.fragment_source_ids,
665        fragment_splits: &loaded.fragment_splits,
666        streaming_job_databases: &loaded.streaming_job_databases,
667        database_map: &loaded.database_map,
668        backfill_jobs: &backfill_jobs,
669    };
670
671    let fragments = render_actors(
672        actor_id_counter,
673        &loaded.ensembles,
674        &loaded.job_fragments,
675        &loaded.job_map,
676        worker_map,
677        adaptive_parallelism_strategy,
678        render_context,
679    )?;
680
681    Ok(RenderedGraph {
682        fragments,
683        ensembles: loaded.ensembles.clone(),
684    })
685}
686
687async fn build_loaded_context<C>(
688    txn: &C,
689    ensembles: Vec<NoShuffleEnsemble>,
690    fragment_models: HashMap<FragmentId, fragment::Model>,
691    job_map: HashMap<JobId, streaming_job::Model>,
692) -> MetaResult<LoadedFragmentContext>
693where
694    C: ConnectionTrait,
695{
696    if ensembles.is_empty() {
697        return Ok(LoadedFragmentContext::default());
698    }
699
700    let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
701    for (fragment_id, model) in fragment_models {
702        job_fragments
703            .entry(model.job_id)
704            .or_default()
705            .try_insert(fragment_id, LoadedFragment::from(model))
706            .expect("duplicate fragment id for job");
707    }
708
709    #[cfg(debug_assertions)]
710    {
711        debug_sanity_check(&ensembles, &job_fragments, &job_map);
712    }
713
714    let (fragment_source_ids, fragment_splits) =
715        resolve_source_fragments(txn, &job_fragments).await?;
716
717    let job_ids = job_map.keys().copied().collect_vec();
718
719    let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
720        .select_only()
721        .column(streaming_job::Column::JobId)
722        .column(object::Column::DatabaseId)
723        .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
724        .filter(streaming_job::Column::JobId.is_in(job_ids))
725        .into_tuple()
726        .all(txn)
727        .await?
728        .into_iter()
729        .collect();
730
731    let database_map: HashMap<_, _> = Database::find()
732        .filter(
733            database::Column::DatabaseId
734                .is_in(streaming_job_databases.values().copied().collect_vec()),
735        )
736        .all(txn)
737        .await?
738        .into_iter()
739        .map(|db| (db.database_id, db))
740        .collect();
741
742    Ok(LoadedFragmentContext {
743        ensembles,
744        job_fragments,
745        job_map,
746        streaming_job_databases,
747        database_map,
748        fragment_source_ids,
749        fragment_splits,
750    })
751}
752
753// Only metadata resolved asynchronously lives here so the renderer stays synchronous
754// and the call site keeps the runtime dependencies (maps, strategy, actor counter, etc.) explicit.
755struct RenderActorsContext<'a> {
756    fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
757    fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
758    streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
759    database_map: &'a HashMap<DatabaseId, database::Model>,
760    backfill_jobs: &'a HashSet<JobId>,
761}
762
763fn render_actors(
764    actor_id_counter: &AtomicU32,
765    ensembles: &[NoShuffleEnsemble],
766    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
767    job_map: &HashMap<JobId, streaming_job::Model>,
768    worker_map: &HashMap<WorkerId, WorkerNode>,
769    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
770    context: RenderActorsContext<'_>,
771) -> MetaResult<FragmentRenderMap> {
772    let RenderActorsContext {
773        fragment_source_ids,
774        fragment_splits: fragment_splits_map,
775        streaming_job_databases,
776        database_map,
777        backfill_jobs,
778    } = context;
779
780    let mut all_fragments: FragmentRenderMap = HashMap::new();
781    let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
782        .values()
783        .flat_map(|fragments| fragments.iter())
784        .map(|(fragment_id, fragment)| (*fragment_id, fragment))
785        .collect();
786
787    for NoShuffleEnsemble {
788        entries,
789        components,
790    } in ensembles
791    {
792        tracing::debug!("rendering ensemble entries {:?}", entries);
793
794        let entry_fragments = entries
795            .iter()
796            .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
797            .collect_vec();
798
799        let entry_fragment_parallelism = entry_fragments
800            .iter()
801            .map(|fragment| fragment.parallelism.clone())
802            .dedup()
803            .exactly_one()
804            .map_err(|_| {
805                anyhow!(
806                    "entry fragments {:?} have inconsistent parallelism settings",
807                    entries.iter().copied().collect_vec()
808                )
809            })?;
810
811        let (job_id, vnode_count) = entry_fragments
812            .iter()
813            .map(|f| (f.job_id, f.vnode_count))
814            .dedup()
815            .exactly_one()
816            .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
817
818        let job = job_map
819            .get(&job_id)
820            .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
821
822        let job_strategy = job
823            .stream_context()
824            .adaptive_parallelism_strategy
825            .unwrap_or(adaptive_parallelism_strategy);
826
827        let resource_group = match &job.specific_resource_group {
828            None => {
829                let database = streaming_job_databases
830                    .get(&job_id)
831                    .and_then(|database_id| database_map.get(database_id))
832                    .unwrap();
833                database.resource_group.clone()
834            }
835            Some(resource_group) => resource_group.clone(),
836        };
837
838        let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
839            .iter()
840            .filter_map(|(worker_id, worker)| {
841                if worker
842                    .resource_group()
843                    .as_deref()
844                    .unwrap_or(DEFAULT_RESOURCE_GROUP)
845                    == resource_group.as_str()
846                {
847                    Some((
848                        *worker_id,
849                        worker
850                            .parallelism()
851                            .expect("should have parallelism for compute node")
852                            .try_into()
853                            .expect("parallelism for compute node"),
854                    ))
855                } else {
856                    None
857                }
858            })
859            .collect();
860
861        let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
862
863        let effective_job_parallelism = if backfill_jobs.contains(&job_id) {
864            job.backfill_parallelism
865                .as_ref()
866                .unwrap_or(&job.parallelism)
867        } else {
868            &job.parallelism
869        };
870
871        let actual_parallelism = match entry_fragment_parallelism
872            .as_ref()
873            .unwrap_or(effective_job_parallelism)
874        {
875            StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
876                job_strategy.compute_target_parallelism(total_parallelism)
877            }
878            StreamingParallelism::Fixed(n) => *n,
879        }
880        .min(vnode_count)
881        .min(job.max_parallelism as usize);
882
883        tracing::debug!(
884            "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
885            job_id,
886            actual_parallelism,
887            job.parallelism,
888            total_parallelism,
889            job.max_parallelism,
890            vnode_count,
891            entry_fragment_parallelism
892        );
893
894        let assigner = AssignerBuilder::new(job_id).build();
895
896        let actors = (0..(actual_parallelism as u32))
897            .map_into::<ActorId>()
898            .collect_vec();
899        let vnodes = (0..vnode_count).collect_vec();
900
901        let assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
902
903        let source_entry_fragment = entry_fragments.iter().find(|f| {
904            let mask = f.fragment_type_mask;
905            if mask.contains(FragmentTypeFlag::Source) {
906                assert!(!mask.contains(FragmentTypeFlag::SourceScan))
907            }
908            mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
909        });
910
911        let (fragment_splits, shared_source_id) = match source_entry_fragment {
912            Some(entry_fragment) => {
913                let source_id = fragment_source_ids
914                    .get(&entry_fragment.fragment_id)
915                    .ok_or_else(|| {
916                        anyhow!(
917                            "missing source id in source fragment {}",
918                            entry_fragment.fragment_id
919                        )
920                    })?;
921
922                let entry_fragment_id = entry_fragment.fragment_id;
923
924                let empty_actor_splits: HashMap<_, _> =
925                    actors.iter().map(|actor_id| (*actor_id, vec![])).collect();
926
927                let splits = fragment_splits_map
928                    .get(&entry_fragment_id)
929                    .cloned()
930                    .unwrap_or_default();
931
932                let splits: BTreeMap<_, _> = splits.into_iter().map(|s| (s.id(), s)).collect();
933
934                let fragment_splits = crate::stream::source_manager::reassign_splits(
935                    entry_fragment_id,
936                    empty_actor_splits,
937                    &splits,
938                    SplitDiffOptions::default(),
939                )
940                .unwrap_or_default();
941                (fragment_splits, Some(*source_id))
942            }
943            None => (HashMap::new(), None),
944        };
945
946        for component_fragment_id in components {
947            let fragment = fragment_lookup.get(component_fragment_id).unwrap();
948            let fragment_id = fragment.fragment_id;
949            let job_id = fragment.job_id;
950            let fragment_type_mask = fragment.fragment_type_mask;
951            let distribution_type = fragment.distribution_type;
952            let stream_node = &fragment.nodes;
953            let state_table_ids = &fragment.state_table_ids;
954            let vnode_count = fragment.vnode_count;
955
956            let actor_count =
957                u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
958            let actor_id_base = actor_id_counter.fetch_add(actor_count, Ordering::Relaxed);
959
960            let actors: HashMap<ActorId, InflightActorInfo> = assignment
961                .iter()
962                .flat_map(|(worker_id, actors)| {
963                    actors
964                        .iter()
965                        .map(move |(actor_id, vnodes)| (worker_id, actor_id, vnodes))
966                })
967                .map(|(&worker_id, &actor_idx, vnodes)| {
968                    let vnode_bitmap = match distribution_type {
969                        DistributionType::Single => None,
970                        DistributionType::Hash => Some(Bitmap::from_indices(vnode_count, vnodes)),
971                    };
972
973                    let actor_id = actor_idx + actor_id_base;
974
975                    let splits = if let Some(source_id) = fragment_source_ids.get(&fragment_id) {
976                        assert_eq!(shared_source_id, Some(*source_id));
977
978                        fragment_splits
979                            .get(&(actor_idx))
980                            .cloned()
981                            .unwrap_or_default()
982                    } else {
983                        vec![]
984                    };
985
986                    (
987                        actor_id,
988                        InflightActorInfo {
989                            worker_id,
990                            vnode_bitmap,
991                            splits,
992                        },
993                    )
994                })
995                .collect();
996
997            let fragment = InflightFragmentInfo {
998                fragment_id,
999                distribution_type,
1000                fragment_type_mask,
1001                vnode_count,
1002                nodes: stream_node.clone(),
1003                actors,
1004                state_table_ids: state_table_ids.clone(),
1005            };
1006
1007            let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
1008                anyhow!("streaming job {job_id} not found in streaming_job_databases")
1009            })?;
1010
1011            all_fragments
1012                .entry(database_id)
1013                .or_default()
1014                .entry(job_id)
1015                .or_default()
1016                .insert(fragment_id, fragment);
1017        }
1018    }
1019
1020    Ok(all_fragments)
1021}
1022
1023#[cfg(debug_assertions)]
1024fn debug_sanity_check(
1025    ensembles: &[NoShuffleEnsemble],
1026    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1027    jobs: &HashMap<JobId, streaming_job::Model>,
1028) {
1029    let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
1030        .iter()
1031        .flat_map(|(job_id, fragments)| {
1032            fragments
1033                .iter()
1034                .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
1035        })
1036        .collect();
1037
1038    // Debug-only assertions to catch inconsistent ensemble metadata early.
1039    debug_assert!(
1040        ensembles
1041            .iter()
1042            .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
1043        "entries must be subset of components"
1044    );
1045
1046    let mut missing_fragments = BTreeSet::new();
1047    let mut missing_jobs = BTreeSet::new();
1048
1049    for fragment_id in ensembles
1050        .iter()
1051        .flat_map(|ensemble| ensemble.components.iter())
1052    {
1053        match fragment_lookup.get(fragment_id) {
1054            Some((fragment, job_id)) => {
1055                if !jobs.contains_key(&fragment.job_id) {
1056                    missing_jobs.insert(*job_id);
1057                }
1058            }
1059            None => {
1060                missing_fragments.insert(*fragment_id);
1061            }
1062        }
1063    }
1064
1065    debug_assert!(
1066        missing_fragments.is_empty(),
1067        "missing fragments in fragment_map: {:?}",
1068        missing_fragments
1069    );
1070
1071    debug_assert!(
1072        missing_jobs.is_empty(),
1073        "missing jobs for fragments' job_id: {:?}",
1074        missing_jobs
1075    );
1076
1077    for ensemble in ensembles {
1078        let unique_vnode_counts: Vec<_> = ensemble
1079            .components
1080            .iter()
1081            .flat_map(|fragment_id| {
1082                fragment_lookup
1083                    .get(fragment_id)
1084                    .map(|(fragment, _)| fragment.vnode_count)
1085            })
1086            .unique()
1087            .collect();
1088
1089        debug_assert!(
1090            unique_vnode_counts.len() <= 1,
1091            "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
1092            ensemble.components,
1093            unique_vnode_counts
1094        );
1095    }
1096}
1097
1098async fn resolve_source_fragments<C>(
1099    txn: &C,
1100    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1101) -> MetaResult<(
1102    HashMap<FragmentId, SourceId>,
1103    HashMap<FragmentId, Vec<SplitImpl>>,
1104)>
1105where
1106    C: ConnectionTrait,
1107{
1108    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1109    for (fragment_id, fragment) in job_fragments.values().flatten() {
1110        let mask = fragment.fragment_type_mask;
1111        if mask.contains(FragmentTypeFlag::Source)
1112            && let Some(source_id) = fragment.nodes.find_stream_source()
1113        {
1114            source_fragment_ids
1115                .entry(source_id)
1116                .or_default()
1117                .insert(*fragment_id);
1118        }
1119
1120        if mask.contains(FragmentTypeFlag::SourceScan)
1121            && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1122        {
1123            source_fragment_ids
1124                .entry(source_id)
1125                .or_default()
1126                .insert(*fragment_id);
1127        }
1128    }
1129
1130    let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1131        .iter()
1132        .flat_map(|(source_id, fragment_ids)| {
1133            fragment_ids
1134                .iter()
1135                .map(|fragment_id| (*fragment_id, *source_id))
1136        })
1137        .collect();
1138
1139    let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1140
1141    let fragment_splits: Vec<_> = FragmentSplits::find()
1142        .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1143        .all(txn)
1144        .await?;
1145
1146    let fragment_splits: HashMap<_, _> = fragment_splits
1147        .into_iter()
1148        .flat_map(|model| {
1149            model.splits.map(|splits| {
1150                (
1151                    model.fragment_id,
1152                    splits
1153                        .to_protobuf()
1154                        .splits
1155                        .iter()
1156                        .flat_map(SplitImpl::try_from)
1157                        .collect_vec(),
1158                )
1159            })
1160        })
1161        .collect();
1162
1163    Ok((fragment_source_ids, fragment_splits))
1164}
1165
1166// Helper struct to make the function signature cleaner and to properly bundle the required data.
1167#[derive(Debug)]
1168pub struct ActorGraph<'a> {
1169    pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1170    pub locations: &'a HashMap<ActorId, WorkerId>,
1171}
1172
1173#[derive(Debug, Clone)]
1174pub struct NoShuffleEnsemble {
1175    entries: HashSet<FragmentId>,
1176    components: HashSet<FragmentId>,
1177}
1178
1179impl NoShuffleEnsemble {
1180    #[cfg(test)]
1181    pub fn for_test(
1182        entries: impl IntoIterator<Item = FragmentId>,
1183        components: impl IntoIterator<Item = FragmentId>,
1184    ) -> Self {
1185        let entries = entries.into_iter().collect();
1186        let components = components.into_iter().collect();
1187        Self {
1188            entries,
1189            components,
1190        }
1191    }
1192
1193    pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1194        self.components.iter().cloned()
1195    }
1196
1197    pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1198        self.entries.iter().copied()
1199    }
1200
1201    pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1202        self.components.iter().copied()
1203    }
1204
1205    pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1206        self.entries.contains(fragment_id)
1207    }
1208}
1209
1210pub async fn find_fragment_no_shuffle_dags_detailed(
1211    db: &impl ConnectionTrait,
1212    initial_fragment_ids: &[FragmentId],
1213) -> MetaResult<Vec<NoShuffleEnsemble>> {
1214    let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1215        .columns([
1216            fragment_relation::Column::SourceFragmentId,
1217            fragment_relation::Column::TargetFragmentId,
1218        ])
1219        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1220        .into_tuple()
1221        .all(db)
1222        .await?;
1223
1224    let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1225    let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1226
1227    for (src, dst) in all_no_shuffle_relations {
1228        forward_edges.entry(src).or_default().push(dst);
1229        backward_edges.entry(dst).or_default().push(src);
1230    }
1231
1232    find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1233}
1234
1235fn find_no_shuffle_graphs(
1236    initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1237    forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1238    backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1239) -> MetaResult<Vec<NoShuffleEnsemble>> {
1240    let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1241    let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1242
1243    for &init_id in initial_fragment_ids {
1244        let init_id = init_id.into();
1245        if globally_visited.contains(&init_id) {
1246            continue;
1247        }
1248
1249        // Found a new component. Traverse it to find all its nodes.
1250        let mut components = HashSet::new();
1251        let mut queue: VecDeque<FragmentId> = VecDeque::new();
1252
1253        queue.push_back(init_id);
1254        globally_visited.insert(init_id);
1255
1256        while let Some(current_id) = queue.pop_front() {
1257            components.insert(current_id);
1258            let neighbors = forward_edges
1259                .get(&current_id)
1260                .into_iter()
1261                .flatten()
1262                .chain(backward_edges.get(&current_id).into_iter().flatten());
1263
1264            for &neighbor_id in neighbors {
1265                if globally_visited.insert(neighbor_id) {
1266                    queue.push_back(neighbor_id);
1267                }
1268            }
1269        }
1270
1271        // For the newly found component, identify its roots.
1272        let mut entries = HashSet::new();
1273        for &node_id in &components {
1274            let is_root = match backward_edges.get(&node_id) {
1275                Some(parents) => parents.iter().all(|p| !components.contains(p)),
1276                None => true,
1277            };
1278            if is_root {
1279                entries.insert(node_id);
1280            }
1281        }
1282
1283        // Store the detailed DAG structure (roots, all nodes in this DAG).
1284        if !entries.is_empty() {
1285            graphs.push(NoShuffleEnsemble {
1286                entries,
1287                components,
1288            });
1289        }
1290    }
1291
1292    Ok(graphs)
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297    use std::collections::{BTreeSet, HashMap, HashSet};
1298    use std::sync::Arc;
1299
1300    use risingwave_connector::source::SplitImpl;
1301    use risingwave_connector::source::test_source::TestSourceSplit;
1302    use risingwave_meta_model::{CreateType, JobStatus};
1303    use risingwave_pb::common::WorkerType;
1304    use risingwave_pb::common::worker_node::Property as WorkerProperty;
1305    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1306
1307    use super::*;
1308
1309    // Helper type aliases for cleaner test code
1310    // Using the actual FragmentId type from the module
1311    type Edges = (
1312        HashMap<FragmentId, Vec<FragmentId>>,
1313        HashMap<FragmentId, Vec<FragmentId>>,
1314    );
1315
1316    /// A helper function to build forward and backward edge maps from a simple list of tuples.
1317    /// This reduces boilerplate in each test.
1318    fn build_edges(relations: &[(u32, u32)]) -> Edges {
1319        let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1320        let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1321        for &(src, dst) in relations {
1322            forward_edges
1323                .entry(src.into())
1324                .or_default()
1325                .push(dst.into());
1326            backward_edges
1327                .entry(dst.into())
1328                .or_default()
1329                .push(src.into());
1330        }
1331        (forward_edges, backward_edges)
1332    }
1333
1334    /// Helper function to create a `HashSet` from a slice easily.
1335    fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1336        ids.iter().map(|id| (*id).into()).collect()
1337    }
1338
1339    fn build_fragment(
1340        fragment_id: FragmentId,
1341        job_id: JobId,
1342        fragment_type_mask: i32,
1343        distribution_type: DistributionType,
1344        vnode_count: i32,
1345        parallelism: StreamingParallelism,
1346    ) -> LoadedFragment {
1347        LoadedFragment {
1348            fragment_id,
1349            job_id,
1350            fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1351            distribution_type,
1352            vnode_count: vnode_count as usize,
1353            nodes: PbStreamNode::default(),
1354            state_table_ids: HashSet::new(),
1355            parallelism: Some(parallelism),
1356        }
1357    }
1358
1359    type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1360
1361    fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1362        let base = fragment.actors.keys().copied().min().unwrap_or_default();
1363
1364        let mut entries: Vec<_> = fragment
1365            .actors
1366            .iter()
1367            .map(|(&actor_id, info)| {
1368                let idx = actor_id.as_raw_id() - base.as_raw_id();
1369                let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1370                    bitmap
1371                        .iter()
1372                        .enumerate()
1373                        .filter_map(|(pos, is_set)| is_set.then_some(pos))
1374                        .collect::<Vec<_>>()
1375                });
1376                let splits = info
1377                    .splits
1378                    .iter()
1379                    .map(|split| split.id().to_string())
1380                    .collect::<Vec<_>>();
1381                (idx.into(), info.worker_id, vnode_indices, splits)
1382            })
1383            .collect();
1384
1385        entries.sort_by_key(|(idx, _, _, _)| *idx);
1386        entries
1387    }
1388
1389    fn build_worker_node(
1390        id: impl Into<WorkerId>,
1391        parallelism: usize,
1392        resource_group: &str,
1393    ) -> WorkerNode {
1394        WorkerNode {
1395            id: id.into(),
1396            r#type: WorkerType::ComputeNode as i32,
1397            property: Some(WorkerProperty {
1398                is_streaming: true,
1399                parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1400                resource_group: Some(resource_group.to_owned()),
1401                ..Default::default()
1402            }),
1403            ..Default::default()
1404        }
1405    }
1406
1407    #[test]
1408    fn test_single_linear_chain() {
1409        // Scenario: A simple linear graph 1 -> 2 -> 3.
1410        // We start from the middle node (2).
1411        let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1412        let initial_ids = &[2];
1413
1414        // Act
1415        let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1416
1417        // Assert
1418        assert!(result.is_ok());
1419        let graphs = result.unwrap();
1420
1421        assert_eq!(graphs.len(), 1);
1422        let graph = &graphs[0];
1423        assert_eq!(graph.entries, to_hashset(&[1]));
1424        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1425    }
1426
1427    #[test]
1428    fn test_two_disconnected_graphs() {
1429        // Scenario: Two separate graphs: 1->2 and 10->11.
1430        // We start with one node from each graph.
1431        let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1432        let initial_ids = &[2, 10];
1433
1434        // Act
1435        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1436
1437        // Assert
1438        assert_eq!(graphs.len(), 2);
1439
1440        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1441        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1442
1443        // Graph 1
1444        assert_eq!(graphs[0].entries, to_hashset(&[1]));
1445        assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1446
1447        // Graph 2
1448        assert_eq!(graphs[1].entries, to_hashset(&[10]));
1449        assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1450    }
1451
1452    #[test]
1453    fn test_multiple_entries_in_one_graph() {
1454        // Scenario: A graph with two roots feeding into one node: 1->3, 2->3.
1455        let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1456        let initial_ids = &[3];
1457
1458        // Act
1459        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1460
1461        // Assert
1462        assert_eq!(graphs.len(), 1);
1463        let graph = &graphs[0];
1464        assert_eq!(graph.entries, to_hashset(&[1, 2]));
1465        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1466    }
1467
1468    #[test]
1469    fn test_diamond_shape_graph() {
1470        // Scenario: A diamond shape: 1->2, 1->3, 2->4, 3->4
1471        let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1472        let initial_ids = &[4];
1473
1474        // Act
1475        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1476
1477        // Assert
1478        assert_eq!(graphs.len(), 1);
1479        let graph = &graphs[0];
1480        assert_eq!(graph.entries, to_hashset(&[1]));
1481        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1482    }
1483
1484    #[test]
1485    fn test_starting_with_multiple_nodes_in_same_graph() {
1486        // Scenario: Start with two different nodes (2 and 4) from the same component.
1487        // Should only identify one graph, not two.
1488        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1489        let initial_ids = &[2, 4];
1490
1491        // Act
1492        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1493
1494        // Assert
1495        assert_eq!(graphs.len(), 1);
1496        let graph = &graphs[0];
1497        assert_eq!(graph.entries, to_hashset(&[1]));
1498        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1499    }
1500
1501    #[test]
1502    fn test_empty_initial_ids() {
1503        // Scenario: The initial ID list is empty.
1504        let (forward, backward) = build_edges(&[(1, 2)]);
1505        let initial_ids: &[u32] = &[];
1506
1507        // Act
1508        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1509
1510        // Assert
1511        assert!(graphs.is_empty());
1512    }
1513
1514    #[test]
1515    fn test_isolated_node_as_input() {
1516        // Scenario: Start with an ID that has no relations.
1517        let (forward, backward) = build_edges(&[(1, 2)]);
1518        let initial_ids = &[100];
1519
1520        // Act
1521        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1522
1523        // Assert
1524        assert_eq!(graphs.len(), 1);
1525        let graph = &graphs[0];
1526        assert_eq!(graph.entries, to_hashset(&[100]));
1527        assert_eq!(graph.components, to_hashset(&[100]));
1528    }
1529
1530    #[test]
1531    fn test_graph_with_a_cycle() {
1532        // Scenario: A graph with a cycle: 1 -> 2 -> 3 -> 1.
1533        // The algorithm should correctly identify all nodes in the component.
1534        // Crucially, NO node is a root because every node has a parent *within the component*.
1535        // Therefore, the `entries` set should be empty, and the graph should not be included in the results.
1536        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1537        let initial_ids = &[2];
1538
1539        // Act
1540        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1541
1542        // Assert
1543        assert!(
1544            graphs.is_empty(),
1545            "A graph with no entries should not be returned"
1546        );
1547    }
1548    #[test]
1549    fn test_custom_complex() {
1550        let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1551        let initial_ids = &[1, 2, 4, 6];
1552
1553        // Act
1554        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1555
1556        // Assert
1557        assert_eq!(graphs.len(), 2);
1558        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1559        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1560
1561        // Graph 1
1562        assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1563        assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1564
1565        // Graph 2
1566        assert_eq!(graphs[1].entries, to_hashset(&[6]));
1567        assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1568    }
1569
1570    #[test]
1571    fn render_actors_increments_actor_counter() {
1572        let actor_id_counter = AtomicU32::new(100);
1573        let fragment_id: FragmentId = 1.into();
1574        let job_id: JobId = 10.into();
1575        let database_id: DatabaseId = DatabaseId::new(3);
1576
1577        let fragment_model = build_fragment(
1578            fragment_id,
1579            job_id,
1580            0,
1581            DistributionType::Single,
1582            1,
1583            StreamingParallelism::Fixed(1),
1584        );
1585
1586        let job_model = streaming_job::Model {
1587            job_id,
1588            job_status: JobStatus::Created,
1589            create_type: CreateType::Foreground,
1590            timezone: None,
1591            config_override: None,
1592            adaptive_parallelism_strategy: None,
1593            parallelism: StreamingParallelism::Fixed(1),
1594            backfill_parallelism: None,
1595            backfill_orders: None,
1596            max_parallelism: 1,
1597            specific_resource_group: None,
1598            is_serverless_backfill: false,
1599        };
1600
1601        let database_model = database::Model {
1602            database_id,
1603            name: "test_db".into(),
1604            resource_group: "rg-a".into(),
1605            barrier_interval_ms: None,
1606            checkpoint_frequency: None,
1607        };
1608
1609        let ensembles = vec![NoShuffleEnsemble {
1610            entries: HashSet::from([fragment_id]),
1611            components: HashSet::from([fragment_id]),
1612        }];
1613
1614        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1615        let job_fragments = HashMap::from([(job_id, fragment_map)]);
1616        let job_map = HashMap::from([(job_id, job_model)]);
1617
1618        let worker_map: HashMap<WorkerId, WorkerNode> =
1619            HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
1620
1621        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1622        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1623        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1624        let database_map = HashMap::from([(database_id, database_model)]);
1625        let backfill_jobs = HashSet::new();
1626
1627        let context = RenderActorsContext {
1628            fragment_source_ids: &fragment_source_ids,
1629            fragment_splits: &fragment_splits,
1630            streaming_job_databases: &streaming_job_databases,
1631            database_map: &database_map,
1632            backfill_jobs: &backfill_jobs,
1633        };
1634
1635        let result = render_actors(
1636            &actor_id_counter,
1637            &ensembles,
1638            &job_fragments,
1639            &job_map,
1640            &worker_map,
1641            AdaptiveParallelismStrategy::Auto,
1642            context,
1643        )
1644        .expect("actor rendering succeeds");
1645
1646        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1647        assert_eq!(state.len(), 1);
1648        assert!(
1649            state[0].2.is_none(),
1650            "single distribution should not assign vnode bitmaps"
1651        );
1652        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
1653    }
1654
1655    #[test]
1656    fn render_actors_aligns_hash_vnode_bitmaps() {
1657        let actor_id_counter = AtomicU32::new(0);
1658        let entry_fragment_id: FragmentId = 1.into();
1659        let downstream_fragment_id: FragmentId = 2.into();
1660        let job_id: JobId = 20.into();
1661        let database_id: DatabaseId = DatabaseId::new(5);
1662
1663        let entry_fragment = build_fragment(
1664            entry_fragment_id,
1665            job_id,
1666            0,
1667            DistributionType::Hash,
1668            4,
1669            StreamingParallelism::Fixed(2),
1670        );
1671
1672        let downstream_fragment = build_fragment(
1673            downstream_fragment_id,
1674            job_id,
1675            0,
1676            DistributionType::Hash,
1677            4,
1678            StreamingParallelism::Fixed(2),
1679        );
1680
1681        let job_model = streaming_job::Model {
1682            job_id,
1683            job_status: JobStatus::Created,
1684            create_type: CreateType::Background,
1685            timezone: None,
1686            config_override: None,
1687            adaptive_parallelism_strategy: None,
1688            parallelism: StreamingParallelism::Fixed(2),
1689            backfill_parallelism: None,
1690            backfill_orders: None,
1691            max_parallelism: 2,
1692            specific_resource_group: None,
1693            is_serverless_backfill: false,
1694        };
1695
1696        let database_model = database::Model {
1697            database_id,
1698            name: "test_db_hash".into(),
1699            resource_group: "rg-hash".into(),
1700            barrier_interval_ms: None,
1701            checkpoint_frequency: None,
1702        };
1703
1704        let ensembles = vec![NoShuffleEnsemble {
1705            entries: HashSet::from([entry_fragment_id]),
1706            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1707        }];
1708
1709        let fragment_map = HashMap::from([
1710            (entry_fragment_id, entry_fragment),
1711            (downstream_fragment_id, downstream_fragment),
1712        ]);
1713        let job_fragments = HashMap::from([(job_id, fragment_map)]);
1714        let job_map = HashMap::from([(job_id, job_model)]);
1715
1716        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
1717            (1.into(), build_worker_node(1, 1, "rg-hash")),
1718            (2.into(), build_worker_node(2, 1, "rg-hash")),
1719        ]);
1720
1721        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1722        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1723        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1724        let database_map = HashMap::from([(database_id, database_model)]);
1725        let backfill_jobs = HashSet::new();
1726
1727        let context = RenderActorsContext {
1728            fragment_source_ids: &fragment_source_ids,
1729            fragment_splits: &fragment_splits,
1730            streaming_job_databases: &streaming_job_databases,
1731            database_map: &database_map,
1732            backfill_jobs: &backfill_jobs,
1733        };
1734
1735        let result = render_actors(
1736            &actor_id_counter,
1737            &ensembles,
1738            &job_fragments,
1739            &job_map,
1740            &worker_map,
1741            AdaptiveParallelismStrategy::Auto,
1742            context,
1743        )
1744        .expect("actor rendering succeeds");
1745
1746        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1747        let downstream_state =
1748            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1749
1750        assert_eq!(entry_state.len(), 2);
1751        assert_eq!(entry_state, downstream_state);
1752
1753        let assigned_vnodes: BTreeSet<_> = entry_state
1754            .iter()
1755            .flat_map(|(_, _, vnodes, _)| {
1756                vnodes
1757                    .as_ref()
1758                    .expect("hash distribution should populate vnode bitmap")
1759                    .iter()
1760                    .copied()
1761            })
1762            .collect();
1763        assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
1764        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1765    }
1766
1767    #[test]
1768    fn render_actors_propagates_source_splits() {
1769        let actor_id_counter = AtomicU32::new(0);
1770        let entry_fragment_id: FragmentId = 11.into();
1771        let downstream_fragment_id: FragmentId = 12.into();
1772        let job_id: JobId = 30.into();
1773        let database_id: DatabaseId = DatabaseId::new(7);
1774        let source_id: SourceId = 99.into();
1775
1776        let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
1777        let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
1778
1779        let entry_fragment = build_fragment(
1780            entry_fragment_id,
1781            job_id,
1782            source_mask,
1783            DistributionType::Hash,
1784            4,
1785            StreamingParallelism::Fixed(2),
1786        );
1787
1788        let downstream_fragment = build_fragment(
1789            downstream_fragment_id,
1790            job_id,
1791            source_scan_mask,
1792            DistributionType::Hash,
1793            4,
1794            StreamingParallelism::Fixed(2),
1795        );
1796
1797        let job_model = streaming_job::Model {
1798            job_id,
1799            job_status: JobStatus::Created,
1800            create_type: CreateType::Background,
1801            timezone: None,
1802            config_override: None,
1803            adaptive_parallelism_strategy: None,
1804            parallelism: StreamingParallelism::Fixed(2),
1805            backfill_parallelism: None,
1806            backfill_orders: None,
1807            max_parallelism: 2,
1808            specific_resource_group: None,
1809            is_serverless_backfill: false,
1810        };
1811
1812        let database_model = database::Model {
1813            database_id,
1814            name: "split_db".into(),
1815            resource_group: "rg-source".into(),
1816            barrier_interval_ms: None,
1817            checkpoint_frequency: None,
1818        };
1819
1820        let ensembles = vec![NoShuffleEnsemble {
1821            entries: HashSet::from([entry_fragment_id]),
1822            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1823        }];
1824
1825        let fragment_map = HashMap::from([
1826            (entry_fragment_id, entry_fragment),
1827            (downstream_fragment_id, downstream_fragment),
1828        ]);
1829        let job_fragments = HashMap::from([(job_id, fragment_map)]);
1830        let job_map = HashMap::from([(job_id, job_model)]);
1831
1832        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
1833            (1.into(), build_worker_node(1, 1, "rg-source")),
1834            (2.into(), build_worker_node(2, 1, "rg-source")),
1835        ]);
1836
1837        let split_a = SplitImpl::Test(TestSourceSplit {
1838            id: Arc::<str>::from("split-a"),
1839            properties: HashMap::new(),
1840            offset: "0".into(),
1841        });
1842        let split_b = SplitImpl::Test(TestSourceSplit {
1843            id: Arc::<str>::from("split-b"),
1844            properties: HashMap::new(),
1845            offset: "0".into(),
1846        });
1847
1848        let fragment_source_ids = HashMap::from([
1849            (entry_fragment_id, source_id),
1850            (downstream_fragment_id, source_id),
1851        ]);
1852        let fragment_splits =
1853            HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
1854        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1855        let database_map = HashMap::from([(database_id, database_model)]);
1856        let backfill_jobs = HashSet::new();
1857
1858        let context = RenderActorsContext {
1859            fragment_source_ids: &fragment_source_ids,
1860            fragment_splits: &fragment_splits,
1861            streaming_job_databases: &streaming_job_databases,
1862            database_map: &database_map,
1863            backfill_jobs: &backfill_jobs,
1864        };
1865
1866        let result = render_actors(
1867            &actor_id_counter,
1868            &ensembles,
1869            &job_fragments,
1870            &job_map,
1871            &worker_map,
1872            AdaptiveParallelismStrategy::Auto,
1873            context,
1874        )
1875        .expect("actor rendering succeeds");
1876
1877        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1878        let downstream_state =
1879            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1880
1881        assert_eq!(entry_state, downstream_state);
1882
1883        let split_ids: BTreeSet<_> = entry_state
1884            .iter()
1885            .flat_map(|(_, _, _, splits)| splits.iter().cloned())
1886            .collect();
1887        assert_eq!(
1888            split_ids,
1889            BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
1890        );
1891        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1892    }
1893
1894    /// Test that job-level strategy overrides global strategy for Adaptive parallelism.
1895    #[test]
1896    fn render_actors_job_strategy_overrides_global() {
1897        let actor_id_counter = AtomicU32::new(0);
1898        let fragment_id: FragmentId = 1.into();
1899        let job_id: JobId = 100.into();
1900        let database_id: DatabaseId = DatabaseId::new(10);
1901
1902        // Fragment with Adaptive parallelism, vnode_count = 8
1903        let fragment_model = build_fragment(
1904            fragment_id,
1905            job_id,
1906            0,
1907            DistributionType::Hash,
1908            8,
1909            StreamingParallelism::Adaptive,
1910        );
1911
1912        // Job has custom strategy: BOUNDED(2)
1913        let job_model = streaming_job::Model {
1914            job_id,
1915            job_status: JobStatus::Created,
1916            create_type: CreateType::Foreground,
1917            timezone: None,
1918            config_override: None,
1919            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1920            parallelism: StreamingParallelism::Adaptive,
1921            backfill_parallelism: None,
1922            backfill_orders: None,
1923            max_parallelism: 8,
1924            specific_resource_group: None,
1925            is_serverless_backfill: false,
1926        };
1927
1928        let database_model = database::Model {
1929            database_id,
1930            name: "test_db".into(),
1931            resource_group: "default".into(),
1932            barrier_interval_ms: None,
1933            checkpoint_frequency: None,
1934        };
1935
1936        let ensembles = vec![NoShuffleEnsemble {
1937            entries: HashSet::from([fragment_id]),
1938            components: HashSet::from([fragment_id]),
1939        }];
1940
1941        let fragment_map =
1942            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
1943        let job_map = HashMap::from([(job_id, job_model)]);
1944
1945        // 4 workers with 1 parallelism each = total 4 parallelism
1946        let worker_map = HashMap::from([
1947            (1.into(), build_worker_node(1, 1, "default")),
1948            (2.into(), build_worker_node(2, 1, "default")),
1949            (3.into(), build_worker_node(3, 1, "default")),
1950            (4.into(), build_worker_node(4, 1, "default")),
1951        ]);
1952
1953        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1954        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1955        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1956        let database_map = HashMap::from([(database_id, database_model)]);
1957        let backfill_jobs = HashSet::new();
1958
1959        let context = RenderActorsContext {
1960            fragment_source_ids: &fragment_source_ids,
1961            fragment_splits: &fragment_splits,
1962            streaming_job_databases: &streaming_job_databases,
1963            database_map: &database_map,
1964            backfill_jobs: &backfill_jobs,
1965        };
1966
1967        // Global strategy is FULL (would give 4 actors), but job strategy is BOUNDED(2)
1968        let result = render_actors(
1969            &actor_id_counter,
1970            &ensembles,
1971            &fragment_map,
1972            &job_map,
1973            &worker_map,
1974            AdaptiveParallelismStrategy::Full,
1975            context,
1976        )
1977        .expect("actor rendering succeeds");
1978
1979        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1980        // Job strategy BOUNDED(2) should limit to 2 actors, not 4 (global FULL)
1981        assert_eq!(
1982            state.len(),
1983            2,
1984            "Job strategy BOUNDED(2) should override global FULL"
1985        );
1986    }
1987
1988    /// Test that global strategy is used when job has no custom strategy.
1989    #[test]
1990    fn render_actors_uses_global_strategy_when_job_has_none() {
1991        let actor_id_counter = AtomicU32::new(0);
1992        let fragment_id: FragmentId = 1.into();
1993        let job_id: JobId = 101.into();
1994        let database_id: DatabaseId = DatabaseId::new(11);
1995
1996        let fragment_model = build_fragment(
1997            fragment_id,
1998            job_id,
1999            0,
2000            DistributionType::Hash,
2001            8,
2002            StreamingParallelism::Adaptive,
2003        );
2004
2005        // Job has NO custom strategy (None)
2006        let job_model = streaming_job::Model {
2007            job_id,
2008            job_status: JobStatus::Created,
2009            create_type: CreateType::Foreground,
2010            timezone: None,
2011            config_override: None,
2012            adaptive_parallelism_strategy: None, // No custom strategy
2013            parallelism: StreamingParallelism::Adaptive,
2014            backfill_parallelism: None,
2015            backfill_orders: None,
2016            max_parallelism: 8,
2017            specific_resource_group: None,
2018            is_serverless_backfill: false,
2019        };
2020
2021        let database_model = database::Model {
2022            database_id,
2023            name: "test_db".into(),
2024            resource_group: "default".into(),
2025            barrier_interval_ms: None,
2026            checkpoint_frequency: None,
2027        };
2028
2029        let ensembles = vec![NoShuffleEnsemble {
2030            entries: HashSet::from([fragment_id]),
2031            components: HashSet::from([fragment_id]),
2032        }];
2033
2034        let fragment_map =
2035            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2036        let job_map = HashMap::from([(job_id, job_model)]);
2037
2038        // 4 workers = total 4 parallelism
2039        let worker_map = HashMap::from([
2040            (1.into(), build_worker_node(1, 1, "default")),
2041            (2.into(), build_worker_node(2, 1, "default")),
2042            (3.into(), build_worker_node(3, 1, "default")),
2043            (4.into(), build_worker_node(4, 1, "default")),
2044        ]);
2045
2046        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2047        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2048        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2049        let database_map = HashMap::from([(database_id, database_model)]);
2050        let backfill_jobs = HashSet::new();
2051
2052        let context = RenderActorsContext {
2053            fragment_source_ids: &fragment_source_ids,
2054            fragment_splits: &fragment_splits,
2055            streaming_job_databases: &streaming_job_databases,
2056            database_map: &database_map,
2057            backfill_jobs: &backfill_jobs,
2058        };
2059
2060        // Global strategy is BOUNDED(3)
2061        let result = render_actors(
2062            &actor_id_counter,
2063            &ensembles,
2064            &fragment_map,
2065            &job_map,
2066            &worker_map,
2067            AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(3).unwrap()),
2068            context,
2069        )
2070        .expect("actor rendering succeeds");
2071
2072        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2073        // Should use global strategy BOUNDED(3)
2074        assert_eq!(
2075            state.len(),
2076            3,
2077            "Should use global strategy BOUNDED(3) when job has no custom strategy"
2078        );
2079    }
2080
2081    /// Test that Fixed parallelism ignores strategy entirely.
2082    #[test]
2083    fn render_actors_fixed_parallelism_ignores_strategy() {
2084        let actor_id_counter = AtomicU32::new(0);
2085        let fragment_id: FragmentId = 1.into();
2086        let job_id: JobId = 102.into();
2087        let database_id: DatabaseId = DatabaseId::new(12);
2088
2089        // Fragment with FIXED parallelism
2090        let fragment_model = build_fragment(
2091            fragment_id,
2092            job_id,
2093            0,
2094            DistributionType::Hash,
2095            8,
2096            StreamingParallelism::Fixed(5),
2097        );
2098
2099        // Job has custom strategy, but it should be ignored for Fixed parallelism
2100        let job_model = streaming_job::Model {
2101            job_id,
2102            job_status: JobStatus::Created,
2103            create_type: CreateType::Foreground,
2104            timezone: None,
2105            config_override: None,
2106            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2107            parallelism: StreamingParallelism::Fixed(5),
2108            backfill_parallelism: None,
2109            backfill_orders: None,
2110            max_parallelism: 8,
2111            specific_resource_group: None,
2112            is_serverless_backfill: false,
2113        };
2114
2115        let database_model = database::Model {
2116            database_id,
2117            name: "test_db".into(),
2118            resource_group: "default".into(),
2119            barrier_interval_ms: None,
2120            checkpoint_frequency: None,
2121        };
2122
2123        let ensembles = vec![NoShuffleEnsemble {
2124            entries: HashSet::from([fragment_id]),
2125            components: HashSet::from([fragment_id]),
2126        }];
2127
2128        let fragment_map =
2129            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2130        let job_map = HashMap::from([(job_id, job_model)]);
2131
2132        // 6 workers = total 6 parallelism
2133        let worker_map = HashMap::from([
2134            (1.into(), build_worker_node(1, 1, "default")),
2135            (2.into(), build_worker_node(2, 1, "default")),
2136            (3.into(), build_worker_node(3, 1, "default")),
2137            (4.into(), build_worker_node(4, 1, "default")),
2138            (5.into(), build_worker_node(5, 1, "default")),
2139            (6.into(), build_worker_node(6, 1, "default")),
2140        ]);
2141
2142        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2143        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2144        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2145        let database_map = HashMap::from([(database_id, database_model)]);
2146        let backfill_jobs = HashSet::new();
2147
2148        let context = RenderActorsContext {
2149            fragment_source_ids: &fragment_source_ids,
2150            fragment_splits: &fragment_splits,
2151            streaming_job_databases: &streaming_job_databases,
2152            database_map: &database_map,
2153            backfill_jobs: &backfill_jobs,
2154        };
2155
2156        let result = render_actors(
2157            &actor_id_counter,
2158            &ensembles,
2159            &fragment_map,
2160            &job_map,
2161            &worker_map,
2162            AdaptiveParallelismStrategy::Full,
2163            context,
2164        )
2165        .expect("actor rendering succeeds");
2166
2167        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2168        // Fixed(5) should be used, ignoring both job strategy BOUNDED(2) and global FULL
2169        assert_eq!(
2170            state.len(),
2171            5,
2172            "Fixed parallelism should ignore all strategies"
2173        );
2174    }
2175
2176    /// Test RATIO strategy calculation.
2177    #[test]
2178    fn render_actors_ratio_strategy() {
2179        let actor_id_counter = AtomicU32::new(0);
2180        let fragment_id: FragmentId = 1.into();
2181        let job_id: JobId = 103.into();
2182        let database_id: DatabaseId = DatabaseId::new(13);
2183
2184        let fragment_model = build_fragment(
2185            fragment_id,
2186            job_id,
2187            0,
2188            DistributionType::Hash,
2189            16,
2190            StreamingParallelism::Adaptive,
2191        );
2192
2193        // Job has RATIO(0.5) strategy
2194        let job_model = streaming_job::Model {
2195            job_id,
2196            job_status: JobStatus::Created,
2197            create_type: CreateType::Foreground,
2198            timezone: None,
2199            config_override: None,
2200            adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2201            parallelism: StreamingParallelism::Adaptive,
2202            backfill_parallelism: None,
2203            backfill_orders: None,
2204            max_parallelism: 16,
2205            specific_resource_group: None,
2206            is_serverless_backfill: false,
2207        };
2208
2209        let database_model = database::Model {
2210            database_id,
2211            name: "test_db".into(),
2212            resource_group: "default".into(),
2213            barrier_interval_ms: None,
2214            checkpoint_frequency: None,
2215        };
2216
2217        let ensembles = vec![NoShuffleEnsemble {
2218            entries: HashSet::from([fragment_id]),
2219            components: HashSet::from([fragment_id]),
2220        }];
2221
2222        let fragment_map =
2223            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2224        let job_map = HashMap::from([(job_id, job_model)]);
2225
2226        // 8 workers = total 8 parallelism
2227        let worker_map = HashMap::from([
2228            (1.into(), build_worker_node(1, 1, "default")),
2229            (2.into(), build_worker_node(2, 1, "default")),
2230            (3.into(), build_worker_node(3, 1, "default")),
2231            (4.into(), build_worker_node(4, 1, "default")),
2232            (5.into(), build_worker_node(5, 1, "default")),
2233            (6.into(), build_worker_node(6, 1, "default")),
2234            (7.into(), build_worker_node(7, 1, "default")),
2235            (8.into(), build_worker_node(8, 1, "default")),
2236        ]);
2237
2238        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2239        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2240        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2241        let database_map = HashMap::from([(database_id, database_model)]);
2242        let backfill_jobs = HashSet::new();
2243
2244        let context = RenderActorsContext {
2245            fragment_source_ids: &fragment_source_ids,
2246            fragment_splits: &fragment_splits,
2247            streaming_job_databases: &streaming_job_databases,
2248            database_map: &database_map,
2249            backfill_jobs: &backfill_jobs,
2250        };
2251
2252        let result = render_actors(
2253            &actor_id_counter,
2254            &ensembles,
2255            &fragment_map,
2256            &job_map,
2257            &worker_map,
2258            AdaptiveParallelismStrategy::Full,
2259            context,
2260        )
2261        .expect("actor rendering succeeds");
2262
2263        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2264        // RATIO(0.5) of 8 = 4
2265        assert_eq!(
2266            state.len(),
2267            4,
2268            "RATIO(0.5) of 8 workers should give 4 actors"
2269        );
2270    }
2271}