Skip to main content

risingwave_meta/controller/
scale.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23#[cfg(debug_assertions)]
24use risingwave_common::hash::VnodeBitmapExt;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_meta_model::prelude::{
32    Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
33};
34use risingwave_meta_model::{
35    DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism, WorkerId,
36    database, fragment, fragment_relation, fragment_splits, object, sink, source, streaming_job,
37    table,
38};
39use risingwave_meta_model_migration::Condition;
40use risingwave_pb::common::WorkerNode;
41use risingwave_pb::stream_plan::PbStreamNode;
42use sea_orm::{
43    ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
44    RelationTrait,
45};
46
47use crate::MetaResult;
48use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
49use crate::controller::utils::resolve_no_shuffle_actor_mapping;
50use crate::manager::ActiveStreamingWorkerNodes;
51use crate::model::{ActorId, StreamActor};
52use crate::stream::{AssignerBuilder, SplitDiffOptions};
53
54pub(crate) async fn resolve_streaming_job_definition<C>(
55    txn: &C,
56    job_ids: &HashSet<JobId>,
57) -> MetaResult<HashMap<JobId, String>>
58where
59    C: ConnectionTrait,
60{
61    let job_ids = job_ids.iter().cloned().collect_vec();
62
63    // including table, materialized view, index
64    let common_job_definitions: Vec<(JobId, String)> = Table::find()
65        .select_only()
66        .columns([
67            table::Column::TableId,
68            #[cfg(not(debug_assertions))]
69            table::Column::Name,
70            #[cfg(debug_assertions)]
71            table::Column::Definition,
72        ])
73        .filter(table::Column::TableId.is_in(job_ids.clone()))
74        .into_tuple()
75        .all(txn)
76        .await?;
77
78    let sink_definitions: Vec<(JobId, String)> = Sink::find()
79        .select_only()
80        .columns([
81            sink::Column::SinkId,
82            #[cfg(not(debug_assertions))]
83            sink::Column::Name,
84            #[cfg(debug_assertions)]
85            sink::Column::Definition,
86        ])
87        .filter(sink::Column::SinkId.is_in(job_ids.clone()))
88        .into_tuple()
89        .all(txn)
90        .await?;
91
92    let source_definitions: Vec<(JobId, String)> = Source::find()
93        .select_only()
94        .columns([
95            source::Column::SourceId,
96            #[cfg(not(debug_assertions))]
97            source::Column::Name,
98            #[cfg(debug_assertions)]
99            source::Column::Definition,
100        ])
101        .filter(source::Column::SourceId.is_in(job_ids.clone()))
102        .into_tuple()
103        .all(txn)
104        .await?;
105
106    let definitions: HashMap<JobId, String> = common_job_definitions
107        .into_iter()
108        .chain(sink_definitions)
109        .chain(source_definitions)
110        .collect();
111
112    Ok(definitions)
113}
114
115pub async fn load_fragment_info<C>(
116    txn: &C,
117    actor_id_counter: &AtomicU32,
118    database_id: Option<DatabaseId>,
119    worker_nodes: &ActiveStreamingWorkerNodes,
120) -> MetaResult<FragmentRenderMap>
121where
122    C: ConnectionTrait,
123{
124    let mut query = StreamingJob::find()
125        .select_only()
126        .column(streaming_job::Column::JobId);
127
128    if let Some(database_id) = database_id {
129        query = query
130            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
131            .filter(object::Column::DatabaseId.eq(database_id));
132    }
133
134    let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
135
136    if jobs.is_empty() {
137        return Ok(HashMap::new());
138    }
139
140    let jobs: HashSet<JobId> = jobs.into_iter().collect();
141
142    let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
143
144    if loaded.is_empty() {
145        return Ok(HashMap::new());
146    }
147
148    let RenderedGraph { fragments, .. } =
149        render_actor_assignments(actor_id_counter, worker_nodes.current(), &loaded)?;
150
151    Ok(fragments)
152}
153
154#[derive(Debug)]
155pub struct TargetResourcePolicy {
156    pub resource_group: Option<String>,
157    pub parallelism: StreamingParallelism,
158}
159
160#[derive(Debug, Clone)]
161pub struct WorkerInfo {
162    pub parallelism: NonZeroUsize,
163    pub resource_group: Option<String>,
164}
165
166pub type FragmentRenderMap =
167    HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
168
169#[derive(Default)]
170pub struct RenderedGraph {
171    pub fragments: FragmentRenderMap,
172    pub ensembles: Vec<NoShuffleEnsemble>,
173}
174
175impl RenderedGraph {
176    pub fn empty() -> Self {
177        Self::default()
178    }
179}
180
181/// Context loaded asynchronously from database, containing all metadata
182/// required to render actor assignments. This separates async I/O from
183/// sync rendering logic.
184#[derive(Clone, Debug)]
185pub struct LoadedFragment {
186    pub fragment_id: FragmentId,
187    pub job_id: JobId,
188    pub fragment_type_mask: FragmentTypeMask,
189    pub distribution_type: DistributionType,
190    pub vnode_count: usize,
191    pub nodes: PbStreamNode,
192    pub state_table_ids: HashSet<TableId>,
193    pub parallelism: Option<StreamingParallelism>,
194}
195
196impl From<fragment::Model> for LoadedFragment {
197    fn from(model: fragment::Model) -> Self {
198        Self {
199            fragment_id: model.fragment_id,
200            job_id: model.job_id,
201            fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
202            distribution_type: model.distribution_type,
203            vnode_count: model.vnode_count as usize,
204            nodes: model.stream_node.to_protobuf(),
205            state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
206            parallelism: model.parallelism,
207        }
208    }
209}
210
211#[derive(Default, Debug, Clone)]
212pub struct LoadedFragmentContext {
213    pub ensembles: Vec<NoShuffleEnsemble>,
214    pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
215    pub job_map: HashMap<JobId, streaming_job::Model>,
216    pub streaming_job_databases: HashMap<JobId, DatabaseId>,
217    pub database_map: HashMap<DatabaseId, database::Model>,
218    pub fragment_source_ids: HashMap<FragmentId, SourceId>,
219    pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
220}
221
222impl LoadedFragmentContext {
223    pub fn is_empty(&self) -> bool {
224        if self.ensembles.is_empty() {
225            assert!(
226                self.job_fragments.is_empty(),
227                "non-empty job fragments for empty ensembles: {:?}",
228                self.job_fragments
229            );
230            true
231        } else {
232            false
233        }
234    }
235
236    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
237        let job_ids: HashSet<JobId> = self
238            .streaming_job_databases
239            .iter()
240            .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
241            .collect();
242
243        if job_ids.is_empty() {
244            return None;
245        }
246
247        let job_fragments: HashMap<_, _> = job_ids
248            .iter()
249            .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
250            .collect();
251
252        let fragment_ids: HashSet<_> = job_fragments
253            .values()
254            .flat_map(|fragments| fragments.keys().copied())
255            .collect();
256
257        assert!(
258            !fragment_ids.is_empty(),
259            "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
260        );
261
262        let ensembles: Vec<NoShuffleEnsemble> = self
263            .ensembles
264            .iter()
265            .filter(|ensemble| {
266                if ensemble
267                    .components
268                    .iter()
269                    .any(|fragment_id| fragment_ids.contains(fragment_id))
270                {
271                    assert!(
272                        ensemble
273                            .components
274                            .iter()
275                            .all(|fragment_id| fragment_ids.contains(fragment_id)),
276                        "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
277                    );
278                    true
279                } else {
280                    false
281                }
282            })
283            .cloned()
284            .collect();
285
286        assert!(
287            !ensembles.is_empty(),
288            "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
289        );
290
291        let job_map = job_ids
292            .iter()
293            .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
294            .collect();
295
296        let streaming_job_databases = job_ids
297            .iter()
298            .filter_map(|job_id| {
299                self.streaming_job_databases
300                    .get(job_id)
301                    .map(|db_id| (*job_id, *db_id))
302            })
303            .collect();
304
305        let database_model = self.database_map[&database_id].clone();
306        let database_map = HashMap::from([(database_id, database_model)]);
307
308        let fragment_source_ids = self
309            .fragment_source_ids
310            .iter()
311            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
312            .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
313            .collect();
314
315        let fragment_splits = self
316            .fragment_splits
317            .iter()
318            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
319            .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
320            .collect();
321
322        Some(Self {
323            ensembles,
324            job_fragments,
325            job_map,
326            streaming_job_databases,
327            database_map,
328            fragment_source_ids,
329            fragment_splits,
330        })
331    }
332
333    /// Split this loaded context by database in a single ownership pass to avoid cloning large
334    /// fragment payloads (for example `stream_node` in `LoadedFragment`).
335    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
336        let Self {
337            ensembles,
338            mut job_fragments,
339            mut job_map,
340            streaming_job_databases,
341            mut database_map,
342            mut fragment_source_ids,
343            mut fragment_splits,
344        } = self;
345
346        let mut contexts = HashMap::<DatabaseId, Self>::new();
347        let mut fragment_databases = HashMap::<FragmentId, DatabaseId>::new();
348        let mut unresolved_ensembles = 0usize;
349        let mut unresolved_ensemble_sample: Option<Vec<FragmentId>> = None;
350
351        for (job_id, database_id) in streaming_job_databases {
352            let context = contexts.entry(database_id).or_insert_with(|| {
353                let database_model = database_map
354                    .remove(&database_id)
355                    .expect("database should exist for streaming job");
356                Self {
357                    ensembles: Vec::new(),
358                    job_fragments: HashMap::new(),
359                    job_map: HashMap::new(),
360                    streaming_job_databases: HashMap::new(),
361                    database_map: HashMap::from([(database_id, database_model)]),
362                    fragment_source_ids: HashMap::new(),
363                    fragment_splits: HashMap::new(),
364                }
365            });
366
367            let fragments = job_fragments
368                .remove(&job_id)
369                .expect("job fragments should exist for streaming job");
370            for fragment_id in fragments.keys().copied() {
371                fragment_databases.insert(fragment_id, database_id);
372                if let Some(source_id) = fragment_source_ids.remove(&fragment_id) {
373                    context.fragment_source_ids.insert(fragment_id, source_id);
374                }
375                if let Some(splits) = fragment_splits.remove(&fragment_id) {
376                    context.fragment_splits.insert(fragment_id, splits);
377                }
378            }
379
380            assert!(
381                context
382                    .job_map
383                    .insert(
384                        job_id,
385                        job_map
386                            .remove(&job_id)
387                            .expect("streaming job should exist for loaded context"),
388                    )
389                    .is_none(),
390                "duplicated streaming job"
391            );
392            assert!(
393                context.job_fragments.insert(job_id, fragments).is_none(),
394                "duplicated job fragments"
395            );
396            assert!(
397                context
398                    .streaming_job_databases
399                    .insert(job_id, database_id)
400                    .is_none(),
401                "duplicated job database mapping"
402            );
403        }
404
405        for ensemble in ensembles {
406            let Some(database_id) = ensemble
407                .components
408                .iter()
409                .find_map(|fragment_id| fragment_databases.get(fragment_id).copied())
410            else {
411                unresolved_ensembles += 1;
412                if unresolved_ensemble_sample.is_none() {
413                    unresolved_ensemble_sample =
414                        Some(ensemble.components.iter().copied().collect());
415                }
416                continue;
417            };
418
419            debug_assert!(
420                ensemble
421                    .components
422                    .iter()
423                    .all(|fragment_id| fragment_databases.get(fragment_id) == Some(&database_id)),
424                "ensemble {ensemble:?} should belong to a single database"
425            );
426
427            contexts
428                .get_mut(&database_id)
429                .expect("database context should exist for ensemble")
430                .ensembles
431                .push(ensemble);
432        }
433
434        if unresolved_ensembles > 0 {
435            tracing::warn!(
436                unresolved_ensembles,
437                ?unresolved_ensemble_sample,
438                known_fragments = fragment_databases.len(),
439                "skip ensembles without resolved database while splitting loaded context"
440            );
441        }
442        debug_assert_eq!(
443            unresolved_ensembles, 0,
444            "all ensembles should be mappable to a database"
445        );
446
447        contexts
448    }
449}
450
451/// Async load stage for fragment-scoped rendering. It resolves all metadata required to later
452/// render actor assignments with arbitrary worker sets.
453pub async fn load_fragment_context<C>(
454    txn: &C,
455    ensembles: Vec<NoShuffleEnsemble>,
456) -> MetaResult<LoadedFragmentContext>
457where
458    C: ConnectionTrait,
459{
460    if ensembles.is_empty() {
461        return Ok(LoadedFragmentContext::default());
462    }
463
464    let required_fragment_ids: HashSet<_> = ensembles
465        .iter()
466        .flat_map(|ensemble| ensemble.components.iter().copied())
467        .collect();
468
469    let fragment_models = Fragment::find()
470        .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
471        .all(txn)
472        .await?;
473
474    let found_fragment_ids: HashSet<_> = fragment_models
475        .iter()
476        .map(|fragment| fragment.fragment_id)
477        .collect();
478
479    if found_fragment_ids.len() != required_fragment_ids.len() {
480        let missing = required_fragment_ids
481            .difference(&found_fragment_ids)
482            .copied()
483            .collect_vec();
484        return Err(anyhow!("fragments {:?} not found", missing).into());
485    }
486
487    let fragment_models: HashMap<_, _> = fragment_models
488        .into_iter()
489        .map(|fragment| (fragment.fragment_id, fragment))
490        .collect();
491
492    let job_ids: HashSet<_> = fragment_models
493        .values()
494        .map(|fragment| fragment.job_id)
495        .collect();
496
497    if job_ids.is_empty() {
498        return Ok(LoadedFragmentContext::default());
499    }
500
501    let jobs: HashMap<_, _> = StreamingJob::find()
502        .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
503        .all(txn)
504        .await?
505        .into_iter()
506        .map(|job| (job.job_id, job))
507        .collect();
508
509    let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
510    if found_job_ids.len() != job_ids.len() {
511        let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
512        return Err(anyhow!("streaming jobs {:?} not found", missing).into());
513    }
514
515    build_loaded_context(txn, ensembles, fragment_models, jobs).await
516}
517
518/// Async load stage for job-scoped rendering. It collects all no-shuffle ensembles and the
519/// metadata required to render actor assignments later with a provided worker set.
520pub async fn load_fragment_context_for_jobs<C>(
521    txn: &C,
522    job_ids: HashSet<JobId>,
523) -> MetaResult<LoadedFragmentContext>
524where
525    C: ConnectionTrait,
526{
527    if job_ids.is_empty() {
528        return Ok(LoadedFragmentContext::default());
529    }
530
531    let excluded_fragments_query = FragmentRelation::find()
532        .select_only()
533        .column(fragment_relation::Column::TargetFragmentId)
534        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
535        .into_query();
536
537    let condition = Condition::all()
538        .add(fragment::Column::JobId.is_in(job_ids.clone()))
539        .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
540
541    let fragments: Vec<FragmentId> = Fragment::find()
542        .select_only()
543        .column(fragment::Column::FragmentId)
544        .filter(condition)
545        .into_tuple()
546        .all(txn)
547        .await?;
548
549    let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
550
551    let fragments = Fragment::find()
552        .filter(
553            fragment::Column::FragmentId.is_in(
554                ensembles
555                    .iter()
556                    .flat_map(|graph| graph.components.iter())
557                    .cloned()
558                    .collect_vec(),
559            ),
560        )
561        .all(txn)
562        .await?;
563
564    let fragment_map: HashMap<_, _> = fragments
565        .into_iter()
566        .map(|fragment| (fragment.fragment_id, fragment))
567        .collect();
568
569    let job_ids = fragment_map
570        .values()
571        .map(|fragment| fragment.job_id)
572        .collect::<BTreeSet<_>>()
573        .into_iter()
574        .collect_vec();
575
576    let jobs: HashMap<_, _> = StreamingJob::find()
577        .filter(streaming_job::Column::JobId.is_in(job_ids))
578        .all(txn)
579        .await?
580        .into_iter()
581        .map(|job| (job.job_id, job))
582        .collect();
583
584    build_loaded_context(txn, ensembles, fragment_map, jobs).await
585}
586
587/// Sync render stage: uses loaded fragment context and current worker info
588/// to produce actor-to-worker assignments and vnode bitmaps.
589pub(crate) fn render_actor_assignments(
590    actor_id_counter: &AtomicU32,
591    worker_map: &HashMap<WorkerId, WorkerNode>,
592    loaded: &LoadedFragmentContext,
593) -> MetaResult<RenderedGraph> {
594    let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
595    render_actor_assignments_with_allocator(&mut actor_id_allocator, worker_map, loaded)
596}
597
598/// Render a graph with preview-only actor ids so callers can compare layouts
599/// without consuming the global actor id generator.
600pub(crate) fn preview_actor_assignments(
601    worker_map: &HashMap<WorkerId, WorkerNode>,
602    loaded: &LoadedFragmentContext,
603) -> MetaResult<RenderedGraph> {
604    let mut actor_id_allocator = RenderActorIdAllocator::Preview { next_actor_id: 0 };
605    render_actor_assignments_with_allocator(&mut actor_id_allocator, worker_map, loaded)
606}
607
608/// Replace preview actor ids with real actor ids after the caller has decided
609/// the rendered layout is not a no-op.
610pub(crate) fn materialize_actor_assignments(
611    actor_id_counter: &AtomicU32,
612    rendered: RenderedGraph,
613) -> RenderedGraph {
614    let RenderedGraph {
615        fragments,
616        ensembles,
617    } = rendered;
618    #[cfg(debug_assertions)]
619    let preview_slots = collect_relative_actor_slots_by_fragment(&fragments);
620
621    let mut ordered_fragments = fragments
622        .into_iter()
623        .flat_map(|(database_id, jobs)| {
624            jobs.into_iter().flat_map(move |(job_id, fragments)| {
625                fragments.into_iter().map(move |(fragment_id, fragment)| {
626                    (database_id, job_id, fragment_id, fragment)
627                })
628            })
629        })
630        .collect_vec();
631    // Preserve preview order when remapping ids so fragments from the same
632    // no-shuffle ensemble keep their relative slot alignment.
633    //
634    // This works because the preview allocator assigns actor ids with a single
635    // monotonically-increasing counter, so `min(actor_id)` per fragment faithfully
636    // reproduces the original allocation order. If the preview allocator ever
637    // changes to a non-monotonic scheme this sort key must be revisited.
638    //
639    // Empty fragments should not appear, but fragment_id keeps the ordering
640    // deterministic if they do.
641    ordered_fragments.sort_by_key(|(_, _, fragment_id, fragment)| {
642        (
643            fragment
644                .actors
645                .keys()
646                .copied()
647                .min()
648                .map(|actor_id| actor_id.as_raw_id())
649                .unwrap_or(u32::MAX),
650            *fragment_id,
651        )
652    });
653
654    let mut materialized = FragmentRenderMap::new();
655    for (database_id, job_id, fragment_id, fragment) in ordered_fragments {
656        materialized
657            .entry(database_id)
658            .or_default()
659            .entry(job_id)
660            .or_default()
661            .insert(
662                fragment_id,
663                materialize_fragment(fragment, actor_id_counter),
664            );
665    }
666
667    #[cfg(debug_assertions)]
668    assert_materialization_preserves_preview_slots(&preview_slots, &materialized);
669
670    RenderedGraph {
671        fragments: materialized,
672        ensembles,
673    }
674}
675
676#[cfg(debug_assertions)]
677type RelativeActorSlot = (u32, WorkerId, Option<Vec<usize>>, Vec<SplitId>);
678
679#[cfg(debug_assertions)]
680fn collect_relative_actor_slots(fragment: &InflightFragmentInfo) -> Vec<RelativeActorSlot> {
681    let base = fragment
682        .actors
683        .keys()
684        .copied()
685        .min()
686        .map(|actor_id| actor_id.as_raw_id())
687        .unwrap_or_default();
688
689    let mut entries = fragment
690        .actors
691        .iter()
692        .map(|(&actor_id, info)| {
693            let idx = actor_id.as_raw_id() - base;
694            let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
695                bitmap
696                    .iter_vnodes()
697                    .map(|vnode| vnode.to_index())
698                    .collect_vec()
699            });
700            let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
701            splits.sort_unstable();
702            (idx, info.worker_id, vnode_bitmap, splits)
703        })
704        .collect_vec();
705    entries.sort_unstable_by_key(|(idx, ..)| *idx);
706    entries
707}
708
709#[cfg(debug_assertions)]
710fn collect_relative_actor_slots_by_fragment(
711    fragments: &FragmentRenderMap,
712) -> HashMap<FragmentId, Vec<RelativeActorSlot>> {
713    fragments
714        .values()
715        .flat_map(|jobs| jobs.values())
716        .flatten()
717        .map(|(fragment_id, fragment)| (*fragment_id, collect_relative_actor_slots(fragment)))
718        .collect()
719}
720
721#[cfg(debug_assertions)]
722fn assert_materialization_preserves_preview_slots(
723    preview_slots: &HashMap<FragmentId, Vec<RelativeActorSlot>>,
724    materialized_fragments: &FragmentRenderMap,
725) {
726    for (database_id, jobs) in materialized_fragments {
727        for (job_id, fragments) in jobs {
728            for (fragment_id, fragment) in fragments {
729                let expected_slots = preview_slots.get(fragment_id).unwrap_or_else(|| {
730                    panic!(
731                        "preview fragment {fragment_id} for database {database_id}, job {job_id} not found"
732                    )
733                });
734
735                assert_eq!(
736                    collect_relative_actor_slots(fragment),
737                    *expected_slots,
738                    "materialization changed preview slot ordering for database {database_id}, job {job_id}, fragment {fragment_id}"
739                );
740            }
741        }
742    }
743}
744
745fn render_actor_assignments_with_allocator(
746    actor_id_allocator: &mut RenderActorIdAllocator<'_>,
747    worker_map: &HashMap<WorkerId, WorkerNode>,
748    loaded: &LoadedFragmentContext,
749) -> MetaResult<RenderedGraph> {
750    if loaded.is_empty() {
751        return Ok(RenderedGraph::empty());
752    }
753
754    let render_context = RenderActorsContext {
755        fragment_source_ids: &loaded.fragment_source_ids,
756        fragment_splits: &loaded.fragment_splits,
757        streaming_job_databases: &loaded.streaming_job_databases,
758        database_map: &loaded.database_map,
759    };
760
761    let fragments = render_actors_with_allocator(
762        actor_id_allocator,
763        &loaded.ensembles,
764        &loaded.job_fragments,
765        &loaded.job_map,
766        worker_map,
767        render_context,
768    )?;
769
770    Ok(RenderedGraph {
771        fragments,
772        ensembles: loaded.ensembles.clone(),
773    })
774}
775
776fn materialize_fragment(
777    mut fragment: InflightFragmentInfo,
778    actor_id_counter: &AtomicU32,
779) -> InflightFragmentInfo {
780    let actor_count = u32::try_from(fragment.actors.len()).expect("actor count exceeds u32::MAX");
781    let actor_id_base: ActorId = actor_id_counter
782        .fetch_add(actor_count, Ordering::Relaxed)
783        .into();
784
785    let mut actors = fragment.actors.into_iter().collect_vec();
786    actors.sort_by_key(|(actor_id, _)| *actor_id);
787
788    fragment.actors = actors
789        .into_iter()
790        .enumerate()
791        .map(|(idx, (_, info))| {
792            let actor_offset = u32::try_from(idx).expect("actor index exceeds u32::MAX");
793            (actor_id_base + actor_offset, info)
794        })
795        .collect();
796
797    fragment
798}
799
800async fn build_loaded_context<C>(
801    txn: &C,
802    ensembles: Vec<NoShuffleEnsemble>,
803    fragment_models: HashMap<FragmentId, fragment::Model>,
804    job_map: HashMap<JobId, streaming_job::Model>,
805) -> MetaResult<LoadedFragmentContext>
806where
807    C: ConnectionTrait,
808{
809    if ensembles.is_empty() {
810        return Ok(LoadedFragmentContext::default());
811    }
812
813    let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
814    for (fragment_id, model) in fragment_models {
815        job_fragments
816            .entry(model.job_id)
817            .or_default()
818            .try_insert(fragment_id, LoadedFragment::from(model))
819            .expect("duplicate fragment id for job");
820    }
821
822    #[cfg(debug_assertions)]
823    {
824        debug_sanity_check(&ensembles, &job_fragments, &job_map);
825    }
826
827    let (fragment_source_ids, fragment_splits) =
828        resolve_source_fragments(txn, &job_fragments).await?;
829
830    let job_ids = job_map.keys().copied().collect_vec();
831
832    let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
833        .select_only()
834        .column(streaming_job::Column::JobId)
835        .column(object::Column::DatabaseId)
836        .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
837        .filter(streaming_job::Column::JobId.is_in(job_ids))
838        .into_tuple()
839        .all(txn)
840        .await?
841        .into_iter()
842        .collect();
843
844    let database_map: HashMap<_, _> = Database::find()
845        .filter(
846            database::Column::DatabaseId
847                .is_in(streaming_job_databases.values().copied().collect_vec()),
848        )
849        .all(txn)
850        .await?
851        .into_iter()
852        .map(|db| (db.database_id, db))
853        .collect();
854
855    Ok(LoadedFragmentContext {
856        ensembles,
857        job_fragments,
858        job_map,
859        streaming_job_databases,
860        database_map,
861        fragment_source_ids,
862        fragment_splits,
863    })
864}
865
866// Only metadata resolved asynchronously lives here so the renderer stays synchronous
867// and the call site keeps the runtime dependencies (maps, actor counter, etc.) explicit.
868struct RenderActorsContext<'a> {
869    fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
870    fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
871    streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
872    database_map: &'a HashMap<DatabaseId, database::Model>,
873}
874
875enum RenderActorIdAllocator<'a> {
876    Persistent(&'a AtomicU32),
877    Preview { next_actor_id: u32 },
878}
879
880impl RenderActorIdAllocator<'_> {
881    fn allocate_block(&mut self, actor_count: u32) -> ActorId {
882        match self {
883            Self::Persistent(actor_id_counter) => {
884                let actor_id_base: ActorId = actor_id_counter
885                    .fetch_add(actor_count, Ordering::Relaxed)
886                    .into();
887                actor_id_base
888            }
889            Self::Preview { next_actor_id } => {
890                let actor_id_base = *next_actor_id;
891                *next_actor_id = next_actor_id
892                    .checked_add(actor_count)
893                    .expect("preview actor id overflow");
894                let actor_id_base: ActorId = actor_id_base.into();
895                actor_id_base
896            }
897        }
898    }
899}
900
901#[cfg(test)]
902fn render_actors(
903    actor_id_counter: &AtomicU32,
904    ensembles: &[NoShuffleEnsemble],
905    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
906    job_map: &HashMap<JobId, streaming_job::Model>,
907    worker_map: &HashMap<WorkerId, WorkerNode>,
908    context: RenderActorsContext<'_>,
909) -> MetaResult<FragmentRenderMap> {
910    let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
911    render_actors_with_allocator(
912        &mut actor_id_allocator,
913        ensembles,
914        job_fragments,
915        job_map,
916        worker_map,
917        context,
918    )
919}
920
921fn render_actors_with_allocator(
922    actor_id_allocator: &mut RenderActorIdAllocator<'_>,
923    ensembles: &[NoShuffleEnsemble],
924    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
925    job_map: &HashMap<JobId, streaming_job::Model>,
926    worker_map: &HashMap<WorkerId, WorkerNode>,
927    context: RenderActorsContext<'_>,
928) -> MetaResult<FragmentRenderMap> {
929    let RenderActorsContext {
930        fragment_source_ids,
931        fragment_splits: fragment_splits_map,
932        streaming_job_databases,
933        database_map,
934    } = context;
935
936    let mut all_fragments: FragmentRenderMap = HashMap::new();
937    let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
938        .values()
939        .flat_map(|fragments| fragments.iter())
940        .map(|(fragment_id, fragment)| (*fragment_id, fragment))
941        .collect();
942
943    for NoShuffleEnsemble {
944        entries,
945        components,
946    } in ensembles
947    {
948        tracing::debug!("rendering ensemble entries {:?}", entries);
949
950        let entry_fragments = entries
951            .iter()
952            .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
953            .collect_vec();
954
955        let entry_fragment_parallelism = Itertools::exactly_one(
956            entry_fragments
957                .iter()
958                .map(|fragment| fragment.parallelism.clone())
959                .dedup(),
960        )
961        .map_err(|_| {
962            anyhow!(
963                "entry fragments {:?} have inconsistent parallelism settings",
964                entries.iter().copied().collect_vec()
965            )
966        })?;
967
968        let (job_id, distribution_type, vnode_count) = Itertools::exactly_one(
969            entry_fragments
970                .iter()
971                .map(|f| (f.job_id, f.distribution_type, f.vnode_count))
972                .dedup(),
973        )
974        .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
975
976        let job = job_map
977            .get(&job_id)
978            .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
979
980        let database_resource_group = streaming_job_databases
981            .get(&job_id)
982            .and_then(|database_id| database_map.get(database_id))
983            .unwrap()
984            .resource_group
985            .clone();
986
987        let source_entry_fragment = entry_fragments.iter().find(|f| {
988            let mask = f.fragment_type_mask;
989            if mask.contains(FragmentTypeFlag::Source) {
990                assert!(!mask.contains(FragmentTypeFlag::SourceScan))
991            }
992            mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
993        });
994
995        let actor_template = EnsembleActorTemplate::render_new(
996            job,
997            worker_map,
998            entry_fragment_parallelism,
999            database_resource_group,
1000            distribution_type,
1001            vnode_count,
1002        )?;
1003
1004        let source_splits = match source_entry_fragment {
1005            Some(entry_fragment) => {
1006                let source_id = fragment_source_ids
1007                    .get(&entry_fragment.fragment_id)
1008                    .ok_or_else(|| {
1009                        anyhow!(
1010                            "missing source id in source fragment {}",
1011                            entry_fragment.fragment_id
1012                        )
1013                    })?;
1014
1015                let entry_fragment_id = entry_fragment.fragment_id;
1016
1017                let splits = fragment_splits_map
1018                    .get(&entry_fragment_id)
1019                    .cloned()
1020                    .unwrap_or_default();
1021
1022                let splits: std::collections::BTreeMap<_, _> =
1023                    splits.into_iter().map(|s| (s.id(), s)).collect();
1024                let splits = actor_template.assign_splits(entry_fragment_id, splits);
1025                Some((splits, *source_id))
1026            }
1027            None => None,
1028        };
1029
1030        for component_fragment_id in components {
1031            let fragment = fragment_lookup.get(component_fragment_id).unwrap();
1032            let fragment_id = fragment.fragment_id;
1033            let job_id = fragment.job_id;
1034            let fragment_type_mask = fragment.fragment_type_mask;
1035            let distribution_type = fragment.distribution_type;
1036            let stream_node = &fragment.nodes;
1037            let state_table_ids = &fragment.state_table_ids;
1038            let vnode_count = fragment.vnode_count;
1039            let source_id = fragment_source_ids.get(&fragment_id).cloned();
1040
1041            let aligner = ComponentFragmentAligner::new(&actor_template, actor_id_allocator);
1042            let actors = aligner.align_component_actor(distribution_type);
1043            let mut splits = source_id
1044                .map(|source_id| {
1045                    let (fragment_splits, shared_source_id) = source_splits.as_ref().unwrap();
1046                    assert_eq!(*shared_source_id, source_id);
1047                    aligner.align_component_splits(fragment_splits)
1048                })
1049                .unwrap_or_default();
1050
1051            let actors: HashMap<ActorId, InflightActorInfo> = actors
1052                .into_iter()
1053                .map(|(actor_id, (worker_id, vnode_bitmap))| {
1054                    (
1055                        actor_id,
1056                        InflightActorInfo {
1057                            worker_id,
1058                            vnode_bitmap,
1059                            splits: splits.remove(&actor_id).unwrap_or_default(),
1060                        },
1061                    )
1062                })
1063                .collect();
1064
1065            let fragment = InflightFragmentInfo {
1066                fragment_id,
1067                distribution_type,
1068                fragment_type_mask,
1069                vnode_count,
1070                nodes: stream_node.clone(),
1071                actors,
1072                state_table_ids: state_table_ids.clone(),
1073            };
1074
1075            let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
1076                anyhow!("streaming job {job_id} not found in streaming_job_databases")
1077            })?;
1078
1079            all_fragments
1080                .entry(database_id)
1081                .or_default()
1082                .entry(job_id)
1083                .or_default()
1084                .insert(fragment_id, fragment);
1085        }
1086    }
1087
1088    Ok(all_fragments)
1089}
1090
1091pub(crate) struct EnsembleActorTemplate {
1092    assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>>,
1093    distribution_type: DistributionType,
1094    actor_count: u32,
1095}
1096
1097impl EnsembleActorTemplate {
1098    pub(crate) fn render_new(
1099        job: &streaming_job::Model,
1100        worker_map: &HashMap<WorkerId, WorkerNode>,
1101        entry_fragment_parallelism: Option<StreamingParallelism>,
1102        database_resource_group: String,
1103        distribution_type: DistributionType,
1104        vnode_count: usize,
1105    ) -> MetaResult<Self> {
1106        let job_id = job.job_id;
1107        let job_strategy = job
1108            .adaptive_parallelism_strategy
1109            .as_deref()
1110            .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1111        let backfill_job_strategy = job
1112            .backfill_adaptive_parallelism_strategy
1113            .as_deref()
1114            .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1115
1116        let resource_group = match &job.specific_resource_group {
1117            None => database_resource_group,
1118            Some(resource_group) => resource_group.clone(),
1119        };
1120
1121        let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
1122            .iter()
1123            .filter_map(|(worker_id, worker)| {
1124                if worker
1125                    .resource_group()
1126                    .as_deref()
1127                    .unwrap_or(DEFAULT_RESOURCE_GROUP)
1128                    == resource_group.as_str()
1129                {
1130                    Some((
1131                        *worker_id,
1132                        worker
1133                            .parallelism()
1134                            .expect("should have parallelism for compute node")
1135                            .try_into()
1136                            .expect("parallelism for compute node"),
1137                    ))
1138                } else {
1139                    None
1140                }
1141            })
1142            .collect();
1143
1144        let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
1145
1146        let effective_job_parallelism = if job.job_status != JobStatus::Created {
1147            job.backfill_parallelism
1148                .as_ref()
1149                .unwrap_or(&job.parallelism)
1150        } else {
1151            &job.parallelism
1152        };
1153        let effective_job_strategy = if job.job_status != JobStatus::Created {
1154            backfill_job_strategy.or(job_strategy)
1155        } else {
1156            job_strategy
1157        };
1158
1159        let target_parallelism = match entry_fragment_parallelism
1160            .as_ref()
1161            .unwrap_or(effective_job_parallelism)
1162        {
1163            StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
1164                let effective_job_strategy = effective_job_strategy.unwrap_or_else(|| {
1165                    tracing::warn!(
1166                        job_id = %job_id,
1167                        ?effective_job_parallelism,
1168                        "adaptive/custom job is missing adaptive strategy in StreamContext; falling back to default"
1169                    );
1170                    AdaptiveParallelismStrategy::default()
1171                });
1172                effective_job_strategy.compute_target_parallelism(total_parallelism)
1173            }
1174            StreamingParallelism::Fixed(n) => *n,
1175        };
1176        let actual_parallelism = target_parallelism
1177            .min(vnode_count)
1178            .min(job.max_parallelism as usize);
1179        if actual_parallelism != target_parallelism {
1180            tracing::warn!(
1181                job_id = %job_id,
1182                target_parallelism,
1183                actual_parallelism,
1184                vnode_count,
1185                job_max_parallelism = job.max_parallelism,
1186                ?effective_job_parallelism,
1187                ?entry_fragment_parallelism,
1188                "streaming job parallelism was capped by vnode count or max parallelism"
1189            );
1190        }
1191
1192        tracing::debug!(
1193            "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
1194            job_id,
1195            actual_parallelism,
1196            job.parallelism,
1197            total_parallelism,
1198            job.max_parallelism,
1199            vnode_count,
1200            entry_fragment_parallelism
1201        );
1202
1203        let assigner = AssignerBuilder::new(job_id).build();
1204
1205        let actors = (0..(actual_parallelism as u32)).collect_vec();
1206        let vnodes = (0..vnode_count).collect_vec();
1207
1208        let raw_assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
1209
1210        let assignment = raw_assignment
1211            .into_iter()
1212            .map(|(worker_id, actors)| {
1213                let actors = actors
1214                    .into_iter()
1215                    .map(|(actor_idx, vnodes)| {
1216                        let bitmap = match distribution_type {
1217                            DistributionType::Single => None,
1218                            DistributionType::Hash => {
1219                                Some(Bitmap::from_indices(vnode_count, &vnodes))
1220                            }
1221                        };
1222                        (actor_idx, bitmap)
1223                    })
1224                    .collect();
1225                (worker_id, actors)
1226            })
1227            .collect();
1228
1229        let actor_count = u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
1230
1231        Ok(Self {
1232            assignment,
1233            distribution_type,
1234            actor_count,
1235        })
1236    }
1237
1238    pub(crate) fn from_existing_inflight_fragment(fragment: &InflightFragmentInfo) -> Self {
1239        if fragment.actors.is_empty() {
1240            return Self {
1241                assignment: BTreeMap::new(),
1242                distribution_type: fragment.distribution_type,
1243                actor_count: 0,
1244            };
1245        }
1246
1247        let actor_count = fragment.actors.len() as u32;
1248
1249        let mut assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>> = BTreeMap::new();
1250        // Enumerate actors starting from 0 index, instead of deriving from actor_id.
1251        for (actor_idx, (&_actor_id, actor_info)) in fragment.actors.iter().enumerate() {
1252            let actor_idx = actor_idx as u32;
1253            assignment
1254                .entry(actor_info.worker_id)
1255                .or_default()
1256                .insert(actor_idx, actor_info.vnode_bitmap.clone());
1257        }
1258
1259        Self {
1260            assignment,
1261            distribution_type: fragment.distribution_type,
1262            actor_count,
1263        }
1264    }
1265
1266    /// Assert that two `EnsembleActorTemplate` are aligned: same distribution type,
1267    /// same actor count, and same vnode bitmap / worker placement. Used to verify that
1268    /// multiple existing fragments within the same no-shuffle ensemble are consistent.
1269    ///
1270    /// Internally calls [`resolve_no_shuffle_actor_mapping`] which asserts distribution
1271    /// type, count, and bitmap equality. Then additionally verifies worker placement.
1272    pub(crate) fn assert_aligned_with(
1273        &self,
1274        other: &Self,
1275        self_fragment_id: FragmentId,
1276        other_fragment_id: FragmentId,
1277    ) {
1278        let mapping = resolve_no_shuffle_actor_mapping(
1279            self.distribution_type,
1280            self.assignment
1281                .iter()
1282                .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1283            other.distribution_type,
1284            other
1285                .assignment
1286                .iter()
1287                .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1288        );
1289
1290        for ((self_worker, _self_idx), (other_worker, _other_idx)) in &mapping {
1291            assert_eq!(
1292                self_worker, other_worker,
1293                "fragments {} and {} disagree on worker placement: {:?}",
1294                self_fragment_id, other_fragment_id, mapping,
1295            );
1296        }
1297    }
1298
1299    fn assign_splits(
1300        &self,
1301        entry_fragment_id: FragmentId,
1302        splits: BTreeMap<SplitId, SplitImpl>,
1303    ) -> HashMap<u32, Vec<SplitImpl>> {
1304        {
1305            {
1306                let empty_actor_splits: HashMap<_, _> = self
1307                    .assignment
1308                    .values()
1309                    .flat_map(|actors| actors.keys())
1310                    .map(|actor_id| (*actor_id, vec![]))
1311                    .collect();
1312
1313                crate::stream::source_manager::reassign_splits(
1314                    entry_fragment_id,
1315                    empty_actor_splits,
1316                    &splits,
1317                    SplitDiffOptions::default(),
1318                )
1319                .unwrap_or_default()
1320            }
1321        }
1322    }
1323}
1324
1325pub(crate) struct ComponentFragmentAligner<'a> {
1326    actor_template: &'a EnsembleActorTemplate,
1327    actor_id_base: ActorId,
1328}
1329
1330impl<'a> ComponentFragmentAligner<'a> {
1331    fn new(
1332        actor_template: &'a EnsembleActorTemplate,
1333        actor_id_allocator: &mut RenderActorIdAllocator<'_>,
1334    ) -> Self {
1335        let actor_id_base = actor_id_allocator.allocate_block(actor_template.actor_count);
1336        Self {
1337            actor_template,
1338            actor_id_base,
1339        }
1340    }
1341
1342    pub(crate) fn new_persistent(
1343        actor_template: &'a EnsembleActorTemplate,
1344        actor_id_counter: &AtomicU32,
1345    ) -> Self {
1346        let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
1347        Self::new(actor_template, &mut actor_id_allocator)
1348    }
1349
1350    pub(crate) fn align_component_actor(
1351        &self,
1352        distribution_type: DistributionType,
1353    ) -> HashMap<ActorId, (WorkerId, Option<Bitmap>)> {
1354        let EnsembleActorTemplate {
1355            assignment,
1356            actor_count,
1357            distribution_type: _,
1358        } = &self.actor_template;
1359        let actor_id_base = self.actor_id_base;
1360        {
1361            assignment
1362                .iter()
1363                .flat_map(|(worker_id, actors)| {
1364                    actors
1365                        .iter()
1366                        .map(move |(actor_idx, bitmap)| (worker_id, actor_idx, bitmap))
1367                })
1368                .map(|(&worker_id, &actor_idx, bitmap)| {
1369                    if distribution_type == DistributionType::Single {
1370                        assert_eq!(*actor_count, 1);
1371                    }
1372
1373                    let actor_id = actor_id_base + actor_idx;
1374
1375                    (actor_id, (worker_id, bitmap.clone()))
1376                })
1377                .collect()
1378        }
1379    }
1380
1381    pub(crate) fn align_component_splits(
1382        &self,
1383        split_assignment: &HashMap<u32, Vec<SplitImpl>>,
1384    ) -> HashMap<ActorId, Vec<SplitImpl>> {
1385        (0..self.actor_template.actor_count)
1386            .filter_map(|actor_idx| {
1387                split_assignment
1388                    .get(&actor_idx)
1389                    .map(|splits| ((self.actor_id_base + actor_idx), splits.clone()))
1390            })
1391            .collect()
1392    }
1393}
1394
1395#[cfg(debug_assertions)]
1396fn debug_sanity_check(
1397    ensembles: &[NoShuffleEnsemble],
1398    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1399    jobs: &HashMap<JobId, streaming_job::Model>,
1400) {
1401    let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
1402        .iter()
1403        .flat_map(|(job_id, fragments)| {
1404            fragments
1405                .iter()
1406                .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
1407        })
1408        .collect();
1409
1410    // Debug-only assertions to catch inconsistent ensemble metadata early.
1411    debug_assert!(
1412        ensembles
1413            .iter()
1414            .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
1415        "entries must be subset of components"
1416    );
1417
1418    let mut missing_fragments = BTreeSet::new();
1419    let mut missing_jobs = BTreeSet::new();
1420
1421    for fragment_id in ensembles
1422        .iter()
1423        .flat_map(|ensemble| ensemble.components.iter())
1424    {
1425        match fragment_lookup.get(fragment_id) {
1426            Some((fragment, job_id)) => {
1427                if !jobs.contains_key(&fragment.job_id) {
1428                    missing_jobs.insert(*job_id);
1429                }
1430            }
1431            None => {
1432                missing_fragments.insert(*fragment_id);
1433            }
1434        }
1435    }
1436
1437    debug_assert!(
1438        missing_fragments.is_empty(),
1439        "missing fragments in fragment_map: {:?}",
1440        missing_fragments
1441    );
1442
1443    debug_assert!(
1444        missing_jobs.is_empty(),
1445        "missing jobs for fragments' job_id: {:?}",
1446        missing_jobs
1447    );
1448
1449    for ensemble in ensembles {
1450        let unique_vnode_counts: Vec<_> = ensemble
1451            .components
1452            .iter()
1453            .flat_map(|fragment_id| {
1454                fragment_lookup
1455                    .get(fragment_id)
1456                    .map(|(fragment, _)| fragment.vnode_count)
1457            })
1458            .unique()
1459            .collect();
1460
1461        debug_assert!(
1462            unique_vnode_counts.len() <= 1,
1463            "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
1464            ensemble.components,
1465            unique_vnode_counts
1466        );
1467    }
1468}
1469
1470async fn resolve_source_fragments<C>(
1471    txn: &C,
1472    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1473) -> MetaResult<(
1474    HashMap<FragmentId, SourceId>,
1475    HashMap<FragmentId, Vec<SplitImpl>>,
1476)>
1477where
1478    C: ConnectionTrait,
1479{
1480    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1481    for (fragment_id, fragment) in job_fragments.values().flatten() {
1482        let mask = fragment.fragment_type_mask;
1483        if mask.contains(FragmentTypeFlag::Source)
1484            && let Some(source_id) = fragment.nodes.find_stream_source()
1485        {
1486            source_fragment_ids
1487                .entry(source_id)
1488                .or_default()
1489                .insert(*fragment_id);
1490        }
1491
1492        if mask.contains(FragmentTypeFlag::SourceScan)
1493            && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1494        {
1495            source_fragment_ids
1496                .entry(source_id)
1497                .or_default()
1498                .insert(*fragment_id);
1499        }
1500    }
1501
1502    let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1503        .iter()
1504        .flat_map(|(source_id, fragment_ids)| {
1505            fragment_ids
1506                .iter()
1507                .map(|fragment_id| (*fragment_id, *source_id))
1508        })
1509        .collect();
1510
1511    let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1512
1513    let fragment_splits: Vec<_> = FragmentSplits::find()
1514        .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1515        .all(txn)
1516        .await?;
1517
1518    let fragment_splits: HashMap<_, _> = fragment_splits
1519        .into_iter()
1520        .flat_map(|model| {
1521            model.splits.map(|splits| {
1522                (
1523                    model.fragment_id,
1524                    splits
1525                        .to_protobuf()
1526                        .splits
1527                        .iter()
1528                        .flat_map(SplitImpl::try_from)
1529                        .collect_vec(),
1530                )
1531            })
1532        })
1533        .collect();
1534
1535    Ok((fragment_source_ids, fragment_splits))
1536}
1537
1538// Helper struct to make the function signature cleaner and to properly bundle the required data.
1539#[derive(Debug)]
1540pub struct ActorGraph<'a> {
1541    pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1542    pub locations: &'a HashMap<ActorId, WorkerId>,
1543}
1544
1545#[derive(Debug, Clone)]
1546pub struct NoShuffleEnsemble {
1547    entries: HashSet<FragmentId>,
1548    components: HashSet<FragmentId>,
1549}
1550
1551impl NoShuffleEnsemble {
1552    /// Create a single-fragment ensemble (for standalone fragments with no `NoShuffle` edges).
1553    pub(crate) fn singleton(fragment_id: FragmentId) -> Self {
1554        Self {
1555            entries: HashSet::from_iter([fragment_id]),
1556            components: HashSet::from_iter([fragment_id]),
1557        }
1558    }
1559
1560    #[cfg(test)]
1561    pub fn for_test(
1562        entries: impl IntoIterator<Item = FragmentId>,
1563        components: impl IntoIterator<Item = FragmentId>,
1564    ) -> Self {
1565        let entries = entries.into_iter().collect();
1566        let components = components.into_iter().collect();
1567        Self {
1568            entries,
1569            components,
1570        }
1571    }
1572
1573    pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1574        self.components.iter().cloned()
1575    }
1576
1577    pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1578        self.entries.iter().copied()
1579    }
1580
1581    pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1582        self.components.iter().copied()
1583    }
1584
1585    pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1586        self.entries.contains(fragment_id)
1587    }
1588}
1589
1590pub async fn find_fragment_no_shuffle_dags_detailed(
1591    db: &impl ConnectionTrait,
1592    initial_fragment_ids: &[FragmentId],
1593) -> MetaResult<Vec<NoShuffleEnsemble>> {
1594    let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1595        .columns([
1596            fragment_relation::Column::SourceFragmentId,
1597            fragment_relation::Column::TargetFragmentId,
1598        ])
1599        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1600        .into_tuple()
1601        .all(db)
1602        .await?;
1603
1604    let (forward_edges, backward_edges) =
1605        build_no_shuffle_fragment_graph_edges(all_no_shuffle_relations);
1606
1607    find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1608}
1609
1610pub(crate) fn build_no_shuffle_fragment_graph_edges(
1611    relations: impl IntoIterator<Item = (FragmentId, FragmentId)>,
1612) -> (
1613    HashMap<FragmentId, Vec<FragmentId>>,
1614    HashMap<FragmentId, Vec<FragmentId>>,
1615) {
1616    let mut forward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1617    let mut backward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1618
1619    for (src, dst) in relations {
1620        forward_edges.entry(src).or_default().insert(dst);
1621        backward_edges.entry(dst).or_default().insert(src);
1622    }
1623
1624    let forward_edges = forward_edges
1625        .into_iter()
1626        .map(|(src, dst_set)| (src, dst_set.into_iter().collect()))
1627        .collect();
1628    let backward_edges = backward_edges
1629        .into_iter()
1630        .map(|(dst, src_set)| (dst, src_set.into_iter().collect()))
1631        .collect();
1632
1633    (forward_edges, backward_edges)
1634}
1635
1636pub(crate) fn find_no_shuffle_graphs(
1637    initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1638    forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1639    backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1640) -> MetaResult<Vec<NoShuffleEnsemble>> {
1641    let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1642    let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1643
1644    for &init_id in initial_fragment_ids {
1645        let init_id = init_id.into();
1646        if globally_visited.contains(&init_id) {
1647            continue;
1648        }
1649
1650        // Found a new component. Traverse it to find all its nodes.
1651        let mut components = HashSet::new();
1652        let mut queue: VecDeque<FragmentId> = VecDeque::new();
1653
1654        queue.push_back(init_id);
1655        globally_visited.insert(init_id);
1656
1657        while let Some(current_id) = queue.pop_front() {
1658            components.insert(current_id);
1659            let neighbors = forward_edges
1660                .get(&current_id)
1661                .into_iter()
1662                .flatten()
1663                .chain(backward_edges.get(&current_id).into_iter().flatten());
1664
1665            for &neighbor_id in neighbors {
1666                if globally_visited.insert(neighbor_id) {
1667                    queue.push_back(neighbor_id);
1668                }
1669            }
1670        }
1671
1672        // For the newly found component, identify its roots.
1673        let mut entries = HashSet::new();
1674        for &node_id in &components {
1675            let is_root = match backward_edges.get(&node_id) {
1676                Some(parents) => parents.iter().all(|p| !components.contains(p)),
1677                None => true,
1678            };
1679            if is_root {
1680                entries.insert(node_id);
1681            }
1682        }
1683
1684        // Store the detailed DAG structure (roots, all nodes in this DAG).
1685        if !entries.is_empty() {
1686            graphs.push(NoShuffleEnsemble {
1687                entries,
1688                components,
1689            });
1690        }
1691    }
1692
1693    Ok(graphs)
1694}
1695
1696#[cfg(test)]
1697mod tests {
1698    use std::collections::{BTreeSet, HashMap, HashSet};
1699    use std::sync::Arc;
1700
1701    use risingwave_connector::source::SplitImpl;
1702    use risingwave_connector::source::test_source::TestSourceSplit;
1703    use risingwave_meta_model::{CreateType, JobStatus};
1704    use risingwave_pb::common::WorkerType;
1705    use risingwave_pb::common::worker_node::Property as WorkerProperty;
1706    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1707
1708    use super::*;
1709
1710    // Helper type aliases for cleaner test code
1711    // Using the actual FragmentId type from the module
1712    type Edges = (
1713        HashMap<FragmentId, Vec<FragmentId>>,
1714        HashMap<FragmentId, Vec<FragmentId>>,
1715    );
1716
1717    /// A helper function to build forward and backward edge maps from a simple list of tuples.
1718    /// This reduces boilerplate in each test.
1719    fn build_edges(relations: &[(u32, u32)]) -> Edges {
1720        let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1721        let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1722        for &(src, dst) in relations {
1723            forward_edges
1724                .entry(src.into())
1725                .or_default()
1726                .push(dst.into());
1727            backward_edges
1728                .entry(dst.into())
1729                .or_default()
1730                .push(src.into());
1731        }
1732        (forward_edges, backward_edges)
1733    }
1734
1735    /// Helper function to create a `HashSet` from a slice easily.
1736    fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1737        ids.iter().map(|id| (*id).into()).collect()
1738    }
1739
1740    fn build_fragment(
1741        fragment_id: FragmentId,
1742        job_id: JobId,
1743        fragment_type_mask: i32,
1744        distribution_type: DistributionType,
1745        vnode_count: i32,
1746        parallelism: StreamingParallelism,
1747    ) -> LoadedFragment {
1748        LoadedFragment {
1749            fragment_id,
1750            job_id,
1751            fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1752            distribution_type,
1753            vnode_count: vnode_count as usize,
1754            nodes: PbStreamNode::default(),
1755            state_table_ids: HashSet::new(),
1756            parallelism: Some(parallelism),
1757        }
1758    }
1759
1760    type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1761
1762    fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1763        let base = fragment.actors.keys().copied().min().unwrap_or_default();
1764
1765        let mut entries: Vec<_> = fragment
1766            .actors
1767            .iter()
1768            .map(|(&actor_id, info)| {
1769                let idx = actor_id.as_raw_id() - base.as_raw_id();
1770                let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1771                    bitmap
1772                        .iter()
1773                        .enumerate()
1774                        .filter_map(|(pos, is_set)| is_set.then_some(pos))
1775                        .collect::<Vec<_>>()
1776                });
1777                let splits = info
1778                    .splits
1779                    .iter()
1780                    .map(|split| split.id().to_string())
1781                    .collect::<Vec<_>>();
1782                (idx.into(), info.worker_id, vnode_indices, splits)
1783            })
1784            .collect();
1785
1786        entries.sort_by_key(|(idx, _, _, _)| *idx);
1787        entries
1788    }
1789
1790    fn build_worker_node(
1791        id: impl Into<WorkerId>,
1792        parallelism: usize,
1793        resource_group: &str,
1794    ) -> WorkerNode {
1795        WorkerNode {
1796            id: id.into(),
1797            r#type: WorkerType::ComputeNode as i32,
1798            property: Some(WorkerProperty {
1799                is_streaming: true,
1800                parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1801                resource_group: Some(resource_group.to_owned()),
1802                ..Default::default()
1803            }),
1804            ..Default::default()
1805        }
1806    }
1807
1808    #[test]
1809    fn test_single_linear_chain() {
1810        // Scenario: A simple linear graph 1 -> 2 -> 3.
1811        // We start from the middle node (2).
1812        let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1813        let initial_ids = &[2];
1814
1815        // Act
1816        let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1817
1818        // Assert
1819        assert!(result.is_ok());
1820        let graphs = result.unwrap();
1821
1822        assert_eq!(graphs.len(), 1);
1823        let graph = &graphs[0];
1824        assert_eq!(graph.entries, to_hashset(&[1]));
1825        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1826    }
1827
1828    #[test]
1829    fn test_two_disconnected_graphs() {
1830        // Scenario: Two separate graphs: 1->2 and 10->11.
1831        // We start with one node from each graph.
1832        let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1833        let initial_ids = &[2, 10];
1834
1835        // Act
1836        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1837
1838        // Assert
1839        assert_eq!(graphs.len(), 2);
1840
1841        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1842        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1843
1844        // Graph 1
1845        assert_eq!(graphs[0].entries, to_hashset(&[1]));
1846        assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1847
1848        // Graph 2
1849        assert_eq!(graphs[1].entries, to_hashset(&[10]));
1850        assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1851    }
1852
1853    #[test]
1854    fn test_multiple_entries_in_one_graph() {
1855        // Scenario: A graph with two roots feeding into one node: 1->3, 2->3.
1856        let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1857        let initial_ids = &[3];
1858
1859        // Act
1860        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1861
1862        // Assert
1863        assert_eq!(graphs.len(), 1);
1864        let graph = &graphs[0];
1865        assert_eq!(graph.entries, to_hashset(&[1, 2]));
1866        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1867    }
1868
1869    #[test]
1870    fn test_diamond_shape_graph() {
1871        // Scenario: A diamond shape: 1->2, 1->3, 2->4, 3->4
1872        let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1873        let initial_ids = &[4];
1874
1875        // Act
1876        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1877
1878        // Assert
1879        assert_eq!(graphs.len(), 1);
1880        let graph = &graphs[0];
1881        assert_eq!(graph.entries, to_hashset(&[1]));
1882        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1883    }
1884
1885    #[test]
1886    fn test_starting_with_multiple_nodes_in_same_graph() {
1887        // Scenario: Start with two different nodes (2 and 4) from the same component.
1888        // Should only identify one graph, not two.
1889        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1890        let initial_ids = &[2, 4];
1891
1892        // Act
1893        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1894
1895        // Assert
1896        assert_eq!(graphs.len(), 1);
1897        let graph = &graphs[0];
1898        assert_eq!(graph.entries, to_hashset(&[1]));
1899        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1900    }
1901
1902    #[test]
1903    fn test_empty_initial_ids() {
1904        // Scenario: The initial ID list is empty.
1905        let (forward, backward) = build_edges(&[(1, 2)]);
1906        let initial_ids: &[u32] = &[];
1907
1908        // Act
1909        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1910
1911        // Assert
1912        assert!(graphs.is_empty());
1913    }
1914
1915    #[test]
1916    fn test_isolated_node_as_input() {
1917        // Scenario: Start with an ID that has no relations.
1918        let (forward, backward) = build_edges(&[(1, 2)]);
1919        let initial_ids = &[100];
1920
1921        // Act
1922        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1923
1924        // Assert
1925        assert_eq!(graphs.len(), 1);
1926        let graph = &graphs[0];
1927        assert_eq!(graph.entries, to_hashset(&[100]));
1928        assert_eq!(graph.components, to_hashset(&[100]));
1929    }
1930
1931    #[test]
1932    fn test_graph_with_a_cycle() {
1933        // Scenario: A graph with a cycle: 1 -> 2 -> 3 -> 1.
1934        // The algorithm should correctly identify all nodes in the component.
1935        // Crucially, NO node is a root because every node has a parent *within the component*.
1936        // Therefore, the `entries` set should be empty, and the graph should not be included in the results.
1937        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1938        let initial_ids = &[2];
1939
1940        // Act
1941        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1942
1943        // Assert
1944        assert!(
1945            graphs.is_empty(),
1946            "A graph with no entries should not be returned"
1947        );
1948    }
1949    #[test]
1950    fn test_custom_complex() {
1951        let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1952        let initial_ids = &[1, 2, 4, 6];
1953
1954        // Act
1955        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1956
1957        // Assert
1958        assert_eq!(graphs.len(), 2);
1959        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1960        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1961
1962        // Graph 1
1963        assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1964        assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1965
1966        // Graph 2
1967        assert_eq!(graphs[1].entries, to_hashset(&[6]));
1968        assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1969    }
1970
1971    #[test]
1972    fn render_actors_increments_actor_counter() {
1973        let actor_id_counter = AtomicU32::new(100);
1974        let fragment_id: FragmentId = 1.into();
1975        let job_id: JobId = 10.into();
1976        let database_id: DatabaseId = DatabaseId::new(3);
1977
1978        let fragment_model = build_fragment(
1979            fragment_id,
1980            job_id,
1981            0,
1982            DistributionType::Single,
1983            1,
1984            StreamingParallelism::Fixed(1),
1985        );
1986
1987        let job_model = streaming_job::Model {
1988            job_id,
1989            job_status: JobStatus::Created,
1990            create_type: CreateType::Foreground,
1991            timezone: None,
1992            config_override: None,
1993            adaptive_parallelism_strategy: None,
1994            parallelism: StreamingParallelism::Fixed(1),
1995            backfill_parallelism: None,
1996            backfill_adaptive_parallelism_strategy: None,
1997            backfill_orders: None,
1998            max_parallelism: 1,
1999            specific_resource_group: None,
2000            is_serverless_backfill: false,
2001            refresh_interval_sec: None,
2002        };
2003
2004        let database_model = database::Model {
2005            database_id,
2006            name: "test_db".into(),
2007            resource_group: "rg-a".into(),
2008            barrier_interval_ms: None,
2009            checkpoint_frequency: None,
2010        };
2011
2012        let ensembles = vec![NoShuffleEnsemble {
2013            entries: HashSet::from([fragment_id]),
2014            components: HashSet::from([fragment_id]),
2015        }];
2016
2017        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
2018        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2019        let job_map = HashMap::from([(job_id, job_model)]);
2020
2021        let worker_map: HashMap<WorkerId, WorkerNode> =
2022            HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
2023
2024        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2025        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2026        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2027        let database_map = HashMap::from([(database_id, database_model)]);
2028
2029        let context = RenderActorsContext {
2030            fragment_source_ids: &fragment_source_ids,
2031            fragment_splits: &fragment_splits,
2032            streaming_job_databases: &streaming_job_databases,
2033            database_map: &database_map,
2034        };
2035
2036        let result = render_actors(
2037            &actor_id_counter,
2038            &ensembles,
2039            &job_fragments,
2040            &job_map,
2041            &worker_map,
2042            context,
2043        )
2044        .expect("actor rendering succeeds");
2045
2046        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2047        assert_eq!(state.len(), 1);
2048        assert!(
2049            state[0].2.is_none(),
2050            "single distribution should not assign vnode bitmaps"
2051        );
2052        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
2053    }
2054
2055    #[test]
2056    fn render_actors_aligns_hash_vnode_bitmaps() {
2057        let actor_id_counter = AtomicU32::new(0);
2058        let entry_fragment_id: FragmentId = 1.into();
2059        let downstream_fragment_id: FragmentId = 2.into();
2060        let job_id: JobId = 20.into();
2061        let database_id: DatabaseId = DatabaseId::new(5);
2062
2063        let entry_fragment = build_fragment(
2064            entry_fragment_id,
2065            job_id,
2066            0,
2067            DistributionType::Hash,
2068            4,
2069            StreamingParallelism::Fixed(2),
2070        );
2071
2072        let downstream_fragment = build_fragment(
2073            downstream_fragment_id,
2074            job_id,
2075            0,
2076            DistributionType::Hash,
2077            4,
2078            StreamingParallelism::Fixed(2),
2079        );
2080
2081        let job_model = streaming_job::Model {
2082            job_id,
2083            job_status: JobStatus::Created,
2084            create_type: CreateType::Background,
2085            timezone: None,
2086            config_override: None,
2087            adaptive_parallelism_strategy: None,
2088            parallelism: StreamingParallelism::Fixed(2),
2089            backfill_parallelism: None,
2090            backfill_adaptive_parallelism_strategy: None,
2091            backfill_orders: None,
2092            max_parallelism: 2,
2093            specific_resource_group: None,
2094            is_serverless_backfill: false,
2095            refresh_interval_sec: None,
2096        };
2097
2098        let database_model = database::Model {
2099            database_id,
2100            name: "test_db_hash".into(),
2101            resource_group: "rg-hash".into(),
2102            barrier_interval_ms: None,
2103            checkpoint_frequency: None,
2104        };
2105
2106        let ensembles = vec![NoShuffleEnsemble {
2107            entries: HashSet::from([entry_fragment_id]),
2108            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2109        }];
2110
2111        let fragment_map = HashMap::from([
2112            (entry_fragment_id, entry_fragment),
2113            (downstream_fragment_id, downstream_fragment),
2114        ]);
2115        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2116        let job_map = HashMap::from([(job_id, job_model)]);
2117
2118        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2119            (1.into(), build_worker_node(1, 1, "rg-hash")),
2120            (2.into(), build_worker_node(2, 1, "rg-hash")),
2121        ]);
2122
2123        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2124        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2125        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2126        let database_map = HashMap::from([(database_id, database_model)]);
2127
2128        let context = RenderActorsContext {
2129            fragment_source_ids: &fragment_source_ids,
2130            fragment_splits: &fragment_splits,
2131            streaming_job_databases: &streaming_job_databases,
2132            database_map: &database_map,
2133        };
2134
2135        let result = render_actors(
2136            &actor_id_counter,
2137            &ensembles,
2138            &job_fragments,
2139            &job_map,
2140            &worker_map,
2141            context,
2142        )
2143        .expect("actor rendering succeeds");
2144
2145        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2146        let downstream_state =
2147            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2148
2149        assert_eq!(entry_state.len(), 2);
2150        assert_eq!(entry_state, downstream_state);
2151
2152        let assigned_vnodes: BTreeSet<_> = entry_state
2153            .iter()
2154            .flat_map(|(_, _, vnodes, _)| {
2155                vnodes
2156                    .as_ref()
2157                    .expect("hash distribution should populate vnode bitmap")
2158                    .iter()
2159                    .copied()
2160            })
2161            .collect();
2162        assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
2163        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2164    }
2165
2166    #[test]
2167    fn render_actors_propagates_source_splits() {
2168        let actor_id_counter = AtomicU32::new(0);
2169        let entry_fragment_id: FragmentId = 11.into();
2170        let downstream_fragment_id: FragmentId = 12.into();
2171        let job_id: JobId = 30.into();
2172        let database_id: DatabaseId = DatabaseId::new(7);
2173        let source_id: SourceId = 99.into();
2174
2175        let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2176        let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2177
2178        let entry_fragment = build_fragment(
2179            entry_fragment_id,
2180            job_id,
2181            source_mask,
2182            DistributionType::Hash,
2183            4,
2184            StreamingParallelism::Fixed(2),
2185        );
2186
2187        let downstream_fragment = build_fragment(
2188            downstream_fragment_id,
2189            job_id,
2190            source_scan_mask,
2191            DistributionType::Hash,
2192            4,
2193            StreamingParallelism::Fixed(2),
2194        );
2195
2196        let job_model = streaming_job::Model {
2197            job_id,
2198            job_status: JobStatus::Created,
2199            create_type: CreateType::Background,
2200            timezone: None,
2201            config_override: None,
2202            adaptive_parallelism_strategy: None,
2203            parallelism: StreamingParallelism::Fixed(2),
2204            backfill_parallelism: None,
2205            backfill_adaptive_parallelism_strategy: None,
2206            backfill_orders: None,
2207            max_parallelism: 2,
2208            specific_resource_group: None,
2209            is_serverless_backfill: false,
2210            refresh_interval_sec: None,
2211        };
2212
2213        let database_model = database::Model {
2214            database_id,
2215            name: "split_db".into(),
2216            resource_group: "rg-source".into(),
2217            barrier_interval_ms: None,
2218            checkpoint_frequency: None,
2219        };
2220
2221        let ensembles = vec![NoShuffleEnsemble {
2222            entries: HashSet::from([entry_fragment_id]),
2223            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2224        }];
2225
2226        let fragment_map = HashMap::from([
2227            (entry_fragment_id, entry_fragment),
2228            (downstream_fragment_id, downstream_fragment),
2229        ]);
2230        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2231        let job_map = HashMap::from([(job_id, job_model)]);
2232
2233        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2234            (1.into(), build_worker_node(1, 1, "rg-source")),
2235            (2.into(), build_worker_node(2, 1, "rg-source")),
2236        ]);
2237
2238        let split_a = SplitImpl::Test(TestSourceSplit {
2239            id: Arc::<str>::from("split-a"),
2240            properties: HashMap::new(),
2241            offset: "0".into(),
2242        });
2243        let split_b = SplitImpl::Test(TestSourceSplit {
2244            id: Arc::<str>::from("split-b"),
2245            properties: HashMap::new(),
2246            offset: "0".into(),
2247        });
2248
2249        let fragment_source_ids = HashMap::from([
2250            (entry_fragment_id, source_id),
2251            (downstream_fragment_id, source_id),
2252        ]);
2253        let fragment_splits =
2254            HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
2255        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2256        let database_map = HashMap::from([(database_id, database_model)]);
2257
2258        let context = RenderActorsContext {
2259            fragment_source_ids: &fragment_source_ids,
2260            fragment_splits: &fragment_splits,
2261            streaming_job_databases: &streaming_job_databases,
2262            database_map: &database_map,
2263        };
2264
2265        let result = render_actors(
2266            &actor_id_counter,
2267            &ensembles,
2268            &job_fragments,
2269            &job_map,
2270            &worker_map,
2271            context,
2272        )
2273        .expect("actor rendering succeeds");
2274
2275        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2276        let downstream_state =
2277            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2278
2279        assert_eq!(entry_state, downstream_state);
2280
2281        let split_ids: BTreeSet<_> = entry_state
2282            .iter()
2283            .flat_map(|(_, _, _, splits)| splits.iter().cloned())
2284            .collect();
2285        assert_eq!(
2286            split_ids,
2287            BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
2288        );
2289        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2290    }
2291
2292    #[test]
2293    fn preview_actor_assignments_defers_actor_id_allocation() {
2294        let actor_id_counter = AtomicU32::new(100);
2295        let entry_fragment_id: FragmentId = 11.into();
2296        let downstream_fragment_id: FragmentId = 12.into();
2297        let job_id: JobId = 30.into();
2298        let database_id: DatabaseId = DatabaseId::new(7);
2299        let source_id: SourceId = 99.into();
2300
2301        let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2302        let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2303
2304        let entry_fragment = build_fragment(
2305            entry_fragment_id,
2306            job_id,
2307            source_mask,
2308            DistributionType::Hash,
2309            4,
2310            StreamingParallelism::Fixed(2),
2311        );
2312
2313        let downstream_fragment = build_fragment(
2314            downstream_fragment_id,
2315            job_id,
2316            source_scan_mask,
2317            DistributionType::Hash,
2318            4,
2319            StreamingParallelism::Fixed(2),
2320        );
2321
2322        let job_model = streaming_job::Model {
2323            job_id,
2324            job_status: JobStatus::Created,
2325            create_type: CreateType::Background,
2326            timezone: None,
2327            config_override: None,
2328            adaptive_parallelism_strategy: None,
2329            parallelism: StreamingParallelism::Fixed(2),
2330            backfill_parallelism: None,
2331            backfill_adaptive_parallelism_strategy: None,
2332            backfill_orders: None,
2333            max_parallelism: 2,
2334            specific_resource_group: None,
2335            is_serverless_backfill: false,
2336            refresh_interval_sec: None,
2337        };
2338
2339        let database_model = database::Model {
2340            database_id,
2341            name: "preview_db".into(),
2342            resource_group: "rg-source".into(),
2343            barrier_interval_ms: None,
2344            checkpoint_frequency: None,
2345        };
2346
2347        let ensembles = vec![NoShuffleEnsemble {
2348            entries: HashSet::from([entry_fragment_id]),
2349            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2350        }];
2351
2352        let fragment_map = HashMap::from([
2353            (entry_fragment_id, entry_fragment),
2354            (downstream_fragment_id, downstream_fragment),
2355        ]);
2356        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2357        let job_map = HashMap::from([(job_id, job_model)]);
2358
2359        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2360            (1.into(), build_worker_node(1, 1, "rg-source")),
2361            (2.into(), build_worker_node(2, 1, "rg-source")),
2362        ]);
2363
2364        let split_a = SplitImpl::Test(TestSourceSplit {
2365            id: Arc::<str>::from("split-a"),
2366            properties: HashMap::new(),
2367            offset: "0".into(),
2368        });
2369        let split_b = SplitImpl::Test(TestSourceSplit {
2370            id: Arc::<str>::from("split-b"),
2371            properties: HashMap::new(),
2372            offset: "0".into(),
2373        });
2374
2375        let loaded = LoadedFragmentContext {
2376            ensembles,
2377            job_fragments,
2378            job_map,
2379            streaming_job_databases: HashMap::from([(job_id, database_id)]),
2380            database_map: HashMap::from([(database_id, database_model)]),
2381            fragment_source_ids: HashMap::from([
2382                (entry_fragment_id, source_id),
2383                (downstream_fragment_id, source_id),
2384            ]),
2385            fragment_splits: HashMap::from([(entry_fragment_id, vec![split_a, split_b])]),
2386        };
2387
2388        let preview =
2389            preview_actor_assignments(&worker_map, &loaded).expect("preview rendering succeeds");
2390        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 100);
2391
2392        let preview_entry_state =
2393            collect_actor_state(&preview.fragments[&database_id][&job_id][&entry_fragment_id]);
2394        let preview_downstream_state =
2395            collect_actor_state(&preview.fragments[&database_id][&job_id][&downstream_fragment_id]);
2396        assert_eq!(preview_entry_state, preview_downstream_state);
2397
2398        let materialized = materialize_actor_assignments(&actor_id_counter, preview);
2399
2400        let materialized_entry_state =
2401            collect_actor_state(&materialized.fragments[&database_id][&job_id][&entry_fragment_id]);
2402        let materialized_downstream_state = collect_actor_state(
2403            &materialized.fragments[&database_id][&job_id][&downstream_fragment_id],
2404        );
2405
2406        assert_eq!(materialized_entry_state, materialized_downstream_state);
2407        assert_eq!(materialized_entry_state, preview_entry_state);
2408        assert_eq!(materialized_downstream_state, preview_downstream_state);
2409        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 104);
2410    }
2411
2412    /// Test that job-level strategy overrides global strategy for Adaptive parallelism.
2413    #[test]
2414    fn render_actors_job_strategy_overrides_global() {
2415        let actor_id_counter = AtomicU32::new(0);
2416        let fragment_id: FragmentId = 1.into();
2417        let job_id: JobId = 100.into();
2418        let database_id: DatabaseId = DatabaseId::new(10);
2419
2420        // Fragment with Adaptive parallelism, vnode_count = 8
2421        let fragment_model = build_fragment(
2422            fragment_id,
2423            job_id,
2424            0,
2425            DistributionType::Hash,
2426            8,
2427            StreamingParallelism::Adaptive,
2428        );
2429
2430        // Job has custom strategy: BOUNDED(2)
2431        let job_model = streaming_job::Model {
2432            job_id,
2433            job_status: JobStatus::Created,
2434            create_type: CreateType::Foreground,
2435            timezone: None,
2436            config_override: None,
2437            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2438            parallelism: StreamingParallelism::Adaptive,
2439            backfill_parallelism: None,
2440            backfill_adaptive_parallelism_strategy: None,
2441            backfill_orders: None,
2442            max_parallelism: 8,
2443            specific_resource_group: None,
2444            is_serverless_backfill: false,
2445            refresh_interval_sec: None,
2446        };
2447
2448        let database_model = database::Model {
2449            database_id,
2450            name: "test_db".into(),
2451            resource_group: "default".into(),
2452            barrier_interval_ms: None,
2453            checkpoint_frequency: None,
2454        };
2455
2456        let ensembles = vec![NoShuffleEnsemble {
2457            entries: HashSet::from([fragment_id]),
2458            components: HashSet::from([fragment_id]),
2459        }];
2460
2461        let fragment_map =
2462            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2463        let job_map = HashMap::from([(job_id, job_model)]);
2464
2465        // 4 workers with 1 parallelism each = total 4 parallelism
2466        let worker_map = HashMap::from([
2467            (1.into(), build_worker_node(1, 1, "default")),
2468            (2.into(), build_worker_node(2, 1, "default")),
2469            (3.into(), build_worker_node(3, 1, "default")),
2470            (4.into(), build_worker_node(4, 1, "default")),
2471        ]);
2472
2473        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2474        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2475        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2476        let database_map = HashMap::from([(database_id, database_model)]);
2477
2478        let context = RenderActorsContext {
2479            fragment_source_ids: &fragment_source_ids,
2480            fragment_splits: &fragment_splits,
2481            streaming_job_databases: &streaming_job_databases,
2482            database_map: &database_map,
2483        };
2484
2485        // Global strategy is FULL (would give 4 actors), but job strategy is BOUNDED(2)
2486        let result = render_actors(
2487            &actor_id_counter,
2488            &ensembles,
2489            &fragment_map,
2490            &job_map,
2491            &worker_map,
2492            context,
2493        )
2494        .expect("actor rendering succeeds");
2495
2496        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2497        // Job strategy BOUNDED(2) should limit to 2 actors, not 4 (global FULL)
2498        assert_eq!(
2499            state.len(),
2500            2,
2501            "Job strategy BOUNDED(2) should override global FULL"
2502        );
2503    }
2504
2505    /// Test that adaptive jobs without a persisted strategy fall back to the default adaptive
2506    /// strategy instead of failing.
2507    #[test]
2508    fn render_actors_falls_back_to_default_when_adaptive_job_has_no_strategy() {
2509        let actor_id_counter = AtomicU32::new(0);
2510        let fragment_id: FragmentId = 1.into();
2511        let job_id: JobId = 101.into();
2512        let database_id: DatabaseId = DatabaseId::new(11);
2513
2514        let fragment_model = build_fragment(
2515            fragment_id,
2516            job_id,
2517            0,
2518            DistributionType::Hash,
2519            8,
2520            StreamingParallelism::Adaptive,
2521        );
2522
2523        // Job has NO custom strategy (None)
2524        let job_model = streaming_job::Model {
2525            job_id,
2526            job_status: JobStatus::Created,
2527            create_type: CreateType::Foreground,
2528            timezone: None,
2529            config_override: None,
2530            adaptive_parallelism_strategy: None, // No custom strategy
2531            parallelism: StreamingParallelism::Adaptive,
2532            backfill_parallelism: None,
2533            backfill_adaptive_parallelism_strategy: None,
2534            backfill_orders: None,
2535            max_parallelism: 8,
2536            specific_resource_group: None,
2537            is_serverless_backfill: false,
2538            refresh_interval_sec: None,
2539        };
2540
2541        let database_model = database::Model {
2542            database_id,
2543            name: "test_db".into(),
2544            resource_group: "default".into(),
2545            barrier_interval_ms: None,
2546            checkpoint_frequency: None,
2547        };
2548
2549        let ensembles = vec![NoShuffleEnsemble {
2550            entries: HashSet::from([fragment_id]),
2551            components: HashSet::from([fragment_id]),
2552        }];
2553
2554        let fragment_map =
2555            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2556        let job_map = HashMap::from([(job_id, job_model)]);
2557
2558        // 4 workers = total 4 parallelism
2559        let worker_map = HashMap::from([
2560            (1.into(), build_worker_node(1, 1, "default")),
2561            (2.into(), build_worker_node(2, 1, "default")),
2562            (3.into(), build_worker_node(3, 1, "default")),
2563            (4.into(), build_worker_node(4, 1, "default")),
2564        ]);
2565
2566        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2567        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2568        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2569        let database_map = HashMap::from([(database_id, database_model)]);
2570
2571        let context = RenderActorsContext {
2572            fragment_source_ids: &fragment_source_ids,
2573            fragment_splits: &fragment_splits,
2574            streaming_job_databases: &streaming_job_databases,
2575            database_map: &database_map,
2576        };
2577
2578        let result = render_actors(
2579            &actor_id_counter,
2580            &ensembles,
2581            &fragment_map,
2582            &job_map,
2583            &worker_map,
2584            context,
2585        )
2586        .expect("actor rendering succeeds");
2587
2588        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2589        assert_eq!(
2590            state.len(),
2591            4,
2592            "default adaptive strategy should use the full available parallelism"
2593        );
2594    }
2595
2596    /// Test that Fixed parallelism ignores strategy entirely.
2597    #[test]
2598    fn render_actors_fixed_parallelism_ignores_strategy() {
2599        let actor_id_counter = AtomicU32::new(0);
2600        let fragment_id: FragmentId = 1.into();
2601        let job_id: JobId = 102.into();
2602        let database_id: DatabaseId = DatabaseId::new(12);
2603
2604        // Fragment with FIXED parallelism
2605        let fragment_model = build_fragment(
2606            fragment_id,
2607            job_id,
2608            0,
2609            DistributionType::Hash,
2610            8,
2611            StreamingParallelism::Fixed(5),
2612        );
2613
2614        // Job has custom strategy, but it should be ignored for Fixed parallelism
2615        let job_model = streaming_job::Model {
2616            job_id,
2617            job_status: JobStatus::Created,
2618            create_type: CreateType::Foreground,
2619            timezone: None,
2620            config_override: None,
2621            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2622            parallelism: StreamingParallelism::Fixed(5),
2623            backfill_parallelism: None,
2624            backfill_adaptive_parallelism_strategy: None,
2625            backfill_orders: None,
2626            max_parallelism: 8,
2627            specific_resource_group: None,
2628            is_serverless_backfill: false,
2629            refresh_interval_sec: None,
2630        };
2631
2632        let database_model = database::Model {
2633            database_id,
2634            name: "test_db".into(),
2635            resource_group: "default".into(),
2636            barrier_interval_ms: None,
2637            checkpoint_frequency: None,
2638        };
2639
2640        let ensembles = vec![NoShuffleEnsemble {
2641            entries: HashSet::from([fragment_id]),
2642            components: HashSet::from([fragment_id]),
2643        }];
2644
2645        let fragment_map =
2646            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2647        let job_map = HashMap::from([(job_id, job_model)]);
2648
2649        // 6 workers = total 6 parallelism
2650        let worker_map = HashMap::from([
2651            (1.into(), build_worker_node(1, 1, "default")),
2652            (2.into(), build_worker_node(2, 1, "default")),
2653            (3.into(), build_worker_node(3, 1, "default")),
2654            (4.into(), build_worker_node(4, 1, "default")),
2655            (5.into(), build_worker_node(5, 1, "default")),
2656            (6.into(), build_worker_node(6, 1, "default")),
2657        ]);
2658
2659        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2660        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2661        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2662        let database_map = HashMap::from([(database_id, database_model)]);
2663
2664        let context = RenderActorsContext {
2665            fragment_source_ids: &fragment_source_ids,
2666            fragment_splits: &fragment_splits,
2667            streaming_job_databases: &streaming_job_databases,
2668            database_map: &database_map,
2669        };
2670
2671        let result = render_actors(
2672            &actor_id_counter,
2673            &ensembles,
2674            &fragment_map,
2675            &job_map,
2676            &worker_map,
2677            context,
2678        )
2679        .expect("actor rendering succeeds");
2680
2681        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2682        // Fixed(5) should be used, ignoring both job strategy BOUNDED(2) and global FULL
2683        assert_eq!(
2684            state.len(),
2685            5,
2686            "Fixed parallelism should ignore all strategies"
2687        );
2688    }
2689
2690    /// Test RATIO strategy calculation.
2691    #[test]
2692    fn render_actors_ratio_strategy() {
2693        let actor_id_counter = AtomicU32::new(0);
2694        let fragment_id: FragmentId = 1.into();
2695        let job_id: JobId = 103.into();
2696        let database_id: DatabaseId = DatabaseId::new(13);
2697
2698        let fragment_model = build_fragment(
2699            fragment_id,
2700            job_id,
2701            0,
2702            DistributionType::Hash,
2703            16,
2704            StreamingParallelism::Adaptive,
2705        );
2706
2707        // Job has RATIO(0.5) strategy
2708        let job_model = streaming_job::Model {
2709            job_id,
2710            job_status: JobStatus::Created,
2711            create_type: CreateType::Foreground,
2712            timezone: None,
2713            config_override: None,
2714            adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2715            parallelism: StreamingParallelism::Adaptive,
2716            backfill_parallelism: None,
2717            backfill_adaptive_parallelism_strategy: None,
2718            backfill_orders: None,
2719            max_parallelism: 16,
2720            specific_resource_group: None,
2721            is_serverless_backfill: false,
2722            refresh_interval_sec: None,
2723        };
2724
2725        let database_model = database::Model {
2726            database_id,
2727            name: "test_db".into(),
2728            resource_group: "default".into(),
2729            barrier_interval_ms: None,
2730            checkpoint_frequency: None,
2731        };
2732
2733        let ensembles = vec![NoShuffleEnsemble {
2734            entries: HashSet::from([fragment_id]),
2735            components: HashSet::from([fragment_id]),
2736        }];
2737
2738        let fragment_map =
2739            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2740        let job_map = HashMap::from([(job_id, job_model)]);
2741
2742        // 8 workers = total 8 parallelism
2743        let worker_map = HashMap::from([
2744            (1.into(), build_worker_node(1, 1, "default")),
2745            (2.into(), build_worker_node(2, 1, "default")),
2746            (3.into(), build_worker_node(3, 1, "default")),
2747            (4.into(), build_worker_node(4, 1, "default")),
2748            (5.into(), build_worker_node(5, 1, "default")),
2749            (6.into(), build_worker_node(6, 1, "default")),
2750            (7.into(), build_worker_node(7, 1, "default")),
2751            (8.into(), build_worker_node(8, 1, "default")),
2752        ]);
2753
2754        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2755        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2756        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2757        let database_map = HashMap::from([(database_id, database_model)]);
2758
2759        let context = RenderActorsContext {
2760            fragment_source_ids: &fragment_source_ids,
2761            fragment_splits: &fragment_splits,
2762            streaming_job_databases: &streaming_job_databases,
2763            database_map: &database_map,
2764        };
2765
2766        let result = render_actors(
2767            &actor_id_counter,
2768            &ensembles,
2769            &fragment_map,
2770            &job_map,
2771            &worker_map,
2772            context,
2773        )
2774        .expect("actor rendering succeeds");
2775
2776        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2777        // RATIO(0.5) of 8 = 4
2778        assert_eq!(
2779            state.len(),
2780            4,
2781            "RATIO(0.5) of 8 workers should give 4 actors"
2782        );
2783    }
2784
2785    #[test]
2786    fn render_actors_backfill_strategy_overrides_job_strategy() {
2787        let actor_id_counter = AtomicU32::new(0);
2788        let fragment_id: FragmentId = 1.into();
2789        let job_id: JobId = 104.into();
2790        let database_id: DatabaseId = DatabaseId::new(14);
2791
2792        let fragment_model = build_fragment(
2793            fragment_id,
2794            job_id,
2795            0,
2796            DistributionType::Hash,
2797            16,
2798            StreamingParallelism::Adaptive,
2799        );
2800
2801        let job_model = streaming_job::Model {
2802            job_id,
2803            job_status: JobStatus::Creating,
2804            create_type: CreateType::Background,
2805            timezone: None,
2806            config_override: None,
2807            adaptive_parallelism_strategy: Some("BOUNDED(4)".to_owned()),
2808            parallelism: StreamingParallelism::Adaptive,
2809            backfill_parallelism: Some(StreamingParallelism::Adaptive),
2810            backfill_adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2811            backfill_orders: None,
2812            max_parallelism: 16,
2813            specific_resource_group: None,
2814            is_serverless_backfill: false,
2815            refresh_interval_sec: None,
2816        };
2817
2818        let database_model = database::Model {
2819            database_id,
2820            name: "test_db".into(),
2821            resource_group: "default".into(),
2822            barrier_interval_ms: None,
2823            checkpoint_frequency: None,
2824        };
2825
2826        let ensembles = vec![NoShuffleEnsemble {
2827            entries: HashSet::from([fragment_id]),
2828            components: HashSet::from([fragment_id]),
2829        }];
2830
2831        let fragment_map =
2832            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2833        let job_map = HashMap::from([(job_id, job_model)]);
2834
2835        let worker_map = HashMap::from([
2836            (1.into(), build_worker_node(1, 1, "default")),
2837            (2.into(), build_worker_node(2, 1, "default")),
2838            (3.into(), build_worker_node(3, 1, "default")),
2839            (4.into(), build_worker_node(4, 1, "default")),
2840        ]);
2841
2842        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2843        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2844        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2845        let database_map = HashMap::from([(database_id, database_model)]);
2846        let context = RenderActorsContext {
2847            fragment_source_ids: &fragment_source_ids,
2848            fragment_splits: &fragment_splits,
2849            streaming_job_databases: &streaming_job_databases,
2850            database_map: &database_map,
2851        };
2852
2853        let result = render_actors(
2854            &actor_id_counter,
2855            &ensembles,
2856            &fragment_map,
2857            &job_map,
2858            &worker_map,
2859            context,
2860        )
2861        .expect("actor rendering succeeds");
2862
2863        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2864        assert_eq!(
2865            state.len(),
2866            2,
2867            "Backfill strategy BOUNDED(2) should override the job strategy during backfill"
2868        );
2869    }
2870
2871    #[test]
2872    fn render_actors_created_job_ignores_backfill_strategy() {
2873        let actor_id_counter = AtomicU32::new(0);
2874        let fragment_id: FragmentId = 1.into();
2875        let job_id: JobId = 105.into();
2876        let database_id: DatabaseId = DatabaseId::new(15);
2877
2878        let fragment_model = build_fragment(
2879            fragment_id,
2880            job_id,
2881            0,
2882            DistributionType::Hash,
2883            16,
2884            StreamingParallelism::Adaptive,
2885        );
2886
2887        let job_model = streaming_job::Model {
2888            job_id,
2889            job_status: JobStatus::Created,
2890            create_type: CreateType::Foreground,
2891            timezone: None,
2892            config_override: None,
2893            adaptive_parallelism_strategy: Some("BOUNDED(4)".to_owned()),
2894            parallelism: StreamingParallelism::Adaptive,
2895            backfill_parallelism: Some(StreamingParallelism::Adaptive),
2896            backfill_adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2897            backfill_orders: None,
2898            max_parallelism: 16,
2899            specific_resource_group: None,
2900            is_serverless_backfill: false,
2901            refresh_interval_sec: None,
2902        };
2903
2904        let database_model = database::Model {
2905            database_id,
2906            name: "test_db".into(),
2907            resource_group: "default".into(),
2908            barrier_interval_ms: None,
2909            checkpoint_frequency: None,
2910        };
2911
2912        let ensembles = vec![NoShuffleEnsemble {
2913            entries: HashSet::from([fragment_id]),
2914            components: HashSet::from([fragment_id]),
2915        }];
2916
2917        let fragment_map =
2918            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2919        let job_map = HashMap::from([(job_id, job_model)]);
2920
2921        let worker_map = HashMap::from([
2922            (1.into(), build_worker_node(1, 1, "default")),
2923            (2.into(), build_worker_node(2, 1, "default")),
2924            (3.into(), build_worker_node(3, 1, "default")),
2925            (4.into(), build_worker_node(4, 1, "default")),
2926        ]);
2927
2928        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2929        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2930        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2931        let database_map = HashMap::from([(database_id, database_model)]);
2932        let context = RenderActorsContext {
2933            fragment_source_ids: &fragment_source_ids,
2934            fragment_splits: &fragment_splits,
2935            streaming_job_databases: &streaming_job_databases,
2936            database_map: &database_map,
2937        };
2938
2939        let result = render_actors(
2940            &actor_id_counter,
2941            &ensembles,
2942            &fragment_map,
2943            &job_map,
2944            &worker_map,
2945            context,
2946        )
2947        .expect("actor rendering succeeds");
2948
2949        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2950        assert_eq!(
2951            state.len(),
2952            4,
2953            "created jobs should use the main job strategy before any backfill override applies"
2954        );
2955    }
2956}