risingwave_meta/controller/
scale.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23#[cfg(debug_assertions)]
24use risingwave_common::hash::VnodeBitmapExt;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_meta_model::prelude::{
32    Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
33};
34use risingwave_meta_model::{
35    DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism, WorkerId,
36    database, fragment, fragment_relation, fragment_splits, object, sink, source, streaming_job,
37    table,
38};
39use risingwave_meta_model_migration::Condition;
40use risingwave_pb::common::WorkerNode;
41use risingwave_pb::stream_plan::PbStreamNode;
42use sea_orm::{
43    ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
44    RelationTrait,
45};
46
47use crate::MetaResult;
48use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
49use crate::controller::utils::resolve_no_shuffle_actor_mapping;
50use crate::manager::ActiveStreamingWorkerNodes;
51use crate::model::{ActorId, StreamActor};
52use crate::stream::{AssignerBuilder, SplitDiffOptions};
53
54pub(crate) async fn resolve_streaming_job_definition<C>(
55    txn: &C,
56    job_ids: &HashSet<JobId>,
57) -> MetaResult<HashMap<JobId, String>>
58where
59    C: ConnectionTrait,
60{
61    let job_ids = job_ids.iter().cloned().collect_vec();
62
63    // including table, materialized view, index
64    let common_job_definitions: Vec<(JobId, String)> = Table::find()
65        .select_only()
66        .columns([
67            table::Column::TableId,
68            #[cfg(not(debug_assertions))]
69            table::Column::Name,
70            #[cfg(debug_assertions)]
71            table::Column::Definition,
72        ])
73        .filter(table::Column::TableId.is_in(job_ids.clone()))
74        .into_tuple()
75        .all(txn)
76        .await?;
77
78    let sink_definitions: Vec<(JobId, String)> = Sink::find()
79        .select_only()
80        .columns([
81            sink::Column::SinkId,
82            #[cfg(not(debug_assertions))]
83            sink::Column::Name,
84            #[cfg(debug_assertions)]
85            sink::Column::Definition,
86        ])
87        .filter(sink::Column::SinkId.is_in(job_ids.clone()))
88        .into_tuple()
89        .all(txn)
90        .await?;
91
92    let source_definitions: Vec<(JobId, String)> = Source::find()
93        .select_only()
94        .columns([
95            source::Column::SourceId,
96            #[cfg(not(debug_assertions))]
97            source::Column::Name,
98            #[cfg(debug_assertions)]
99            source::Column::Definition,
100        ])
101        .filter(source::Column::SourceId.is_in(job_ids.clone()))
102        .into_tuple()
103        .all(txn)
104        .await?;
105
106    let definitions: HashMap<JobId, String> = common_job_definitions
107        .into_iter()
108        .chain(sink_definitions.into_iter())
109        .chain(source_definitions.into_iter())
110        .collect();
111
112    Ok(definitions)
113}
114
115pub async fn load_fragment_info<C>(
116    txn: &C,
117    actor_id_counter: &AtomicU32,
118    database_id: Option<DatabaseId>,
119    worker_nodes: &ActiveStreamingWorkerNodes,
120) -> MetaResult<FragmentRenderMap>
121where
122    C: ConnectionTrait,
123{
124    let mut query = StreamingJob::find()
125        .select_only()
126        .column(streaming_job::Column::JobId);
127
128    if let Some(database_id) = database_id {
129        query = query
130            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
131            .filter(object::Column::DatabaseId.eq(database_id));
132    }
133
134    let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
135
136    if jobs.is_empty() {
137        return Ok(HashMap::new());
138    }
139
140    let jobs: HashSet<JobId> = jobs.into_iter().collect();
141
142    let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
143
144    if loaded.is_empty() {
145        return Ok(HashMap::new());
146    }
147
148    let RenderedGraph { fragments, .. } =
149        render_actor_assignments(actor_id_counter, worker_nodes.current(), &loaded)?;
150
151    Ok(fragments)
152}
153
154#[derive(Debug)]
155pub struct TargetResourcePolicy {
156    pub resource_group: Option<String>,
157    pub parallelism: StreamingParallelism,
158}
159
160#[derive(Debug, Clone)]
161pub struct WorkerInfo {
162    pub parallelism: NonZeroUsize,
163    pub resource_group: Option<String>,
164}
165
166pub type FragmentRenderMap =
167    HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
168
169#[derive(Default)]
170pub struct RenderedGraph {
171    pub fragments: FragmentRenderMap,
172    pub ensembles: Vec<NoShuffleEnsemble>,
173}
174
175impl RenderedGraph {
176    pub fn empty() -> Self {
177        Self::default()
178    }
179}
180
181/// Context loaded asynchronously from database, containing all metadata
182/// required to render actor assignments. This separates async I/O from
183/// sync rendering logic.
184#[derive(Clone, Debug)]
185pub struct LoadedFragment {
186    pub fragment_id: FragmentId,
187    pub job_id: JobId,
188    pub fragment_type_mask: FragmentTypeMask,
189    pub distribution_type: DistributionType,
190    pub vnode_count: usize,
191    pub nodes: PbStreamNode,
192    pub state_table_ids: HashSet<TableId>,
193    pub parallelism: Option<StreamingParallelism>,
194}
195
196impl From<fragment::Model> for LoadedFragment {
197    fn from(model: fragment::Model) -> Self {
198        Self {
199            fragment_id: model.fragment_id,
200            job_id: model.job_id,
201            fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
202            distribution_type: model.distribution_type,
203            vnode_count: model.vnode_count as usize,
204            nodes: model.stream_node.to_protobuf(),
205            state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
206            parallelism: model.parallelism,
207        }
208    }
209}
210
211#[derive(Default, Debug, Clone)]
212pub struct LoadedFragmentContext {
213    pub ensembles: Vec<NoShuffleEnsemble>,
214    pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
215    pub job_map: HashMap<JobId, streaming_job::Model>,
216    pub streaming_job_databases: HashMap<JobId, DatabaseId>,
217    pub database_map: HashMap<DatabaseId, database::Model>,
218    pub fragment_source_ids: HashMap<FragmentId, SourceId>,
219    pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
220}
221
222impl LoadedFragmentContext {
223    pub fn is_empty(&self) -> bool {
224        if self.ensembles.is_empty() {
225            assert!(
226                self.job_fragments.is_empty(),
227                "non-empty job fragments for empty ensembles: {:?}",
228                self.job_fragments
229            );
230            true
231        } else {
232            false
233        }
234    }
235
236    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
237        let job_ids: HashSet<JobId> = self
238            .streaming_job_databases
239            .iter()
240            .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
241            .collect();
242
243        if job_ids.is_empty() {
244            return None;
245        }
246
247        let job_fragments: HashMap<_, _> = job_ids
248            .iter()
249            .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
250            .collect();
251
252        let fragment_ids: HashSet<_> = job_fragments
253            .values()
254            .flat_map(|fragments| fragments.keys().copied())
255            .collect();
256
257        assert!(
258            !fragment_ids.is_empty(),
259            "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
260        );
261
262        let ensembles: Vec<NoShuffleEnsemble> = self
263            .ensembles
264            .iter()
265            .filter(|ensemble| {
266                if ensemble
267                    .components
268                    .iter()
269                    .any(|fragment_id| fragment_ids.contains(fragment_id))
270                {
271                    assert!(
272                        ensemble
273                            .components
274                            .iter()
275                            .all(|fragment_id| fragment_ids.contains(fragment_id)),
276                        "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
277                    );
278                    true
279                } else {
280                    false
281                }
282            })
283            .cloned()
284            .collect();
285
286        assert!(
287            !ensembles.is_empty(),
288            "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
289        );
290
291        let job_map = job_ids
292            .iter()
293            .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
294            .collect();
295
296        let streaming_job_databases = job_ids
297            .iter()
298            .filter_map(|job_id| {
299                self.streaming_job_databases
300                    .get(job_id)
301                    .map(|db_id| (*job_id, *db_id))
302            })
303            .collect();
304
305        let database_model = self.database_map[&database_id].clone();
306        let database_map = HashMap::from([(database_id, database_model)]);
307
308        let fragment_source_ids = self
309            .fragment_source_ids
310            .iter()
311            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
312            .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
313            .collect();
314
315        let fragment_splits = self
316            .fragment_splits
317            .iter()
318            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
319            .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
320            .collect();
321
322        Some(Self {
323            ensembles,
324            job_fragments,
325            job_map,
326            streaming_job_databases,
327            database_map,
328            fragment_source_ids,
329            fragment_splits,
330        })
331    }
332
333    /// Split this loaded context by database in a single ownership pass to avoid cloning large
334    /// fragment payloads (for example `stream_node` in `LoadedFragment`).
335    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
336        let Self {
337            ensembles,
338            mut job_fragments,
339            mut job_map,
340            streaming_job_databases,
341            mut database_map,
342            mut fragment_source_ids,
343            mut fragment_splits,
344        } = self;
345
346        let mut contexts = HashMap::<DatabaseId, Self>::new();
347        let mut fragment_databases = HashMap::<FragmentId, DatabaseId>::new();
348        let mut unresolved_ensembles = 0usize;
349        let mut unresolved_ensemble_sample: Option<Vec<FragmentId>> = None;
350
351        for (job_id, database_id) in streaming_job_databases {
352            let context = contexts.entry(database_id).or_insert_with(|| {
353                let database_model = database_map
354                    .remove(&database_id)
355                    .expect("database should exist for streaming job");
356                Self {
357                    ensembles: Vec::new(),
358                    job_fragments: HashMap::new(),
359                    job_map: HashMap::new(),
360                    streaming_job_databases: HashMap::new(),
361                    database_map: HashMap::from([(database_id, database_model)]),
362                    fragment_source_ids: HashMap::new(),
363                    fragment_splits: HashMap::new(),
364                }
365            });
366
367            let fragments = job_fragments
368                .remove(&job_id)
369                .expect("job fragments should exist for streaming job");
370            for fragment_id in fragments.keys().copied() {
371                fragment_databases.insert(fragment_id, database_id);
372                if let Some(source_id) = fragment_source_ids.remove(&fragment_id) {
373                    context.fragment_source_ids.insert(fragment_id, source_id);
374                }
375                if let Some(splits) = fragment_splits.remove(&fragment_id) {
376                    context.fragment_splits.insert(fragment_id, splits);
377                }
378            }
379
380            assert!(
381                context
382                    .job_map
383                    .insert(
384                        job_id,
385                        job_map
386                            .remove(&job_id)
387                            .expect("streaming job should exist for loaded context"),
388                    )
389                    .is_none(),
390                "duplicated streaming job"
391            );
392            assert!(
393                context.job_fragments.insert(job_id, fragments).is_none(),
394                "duplicated job fragments"
395            );
396            assert!(
397                context
398                    .streaming_job_databases
399                    .insert(job_id, database_id)
400                    .is_none(),
401                "duplicated job database mapping"
402            );
403        }
404
405        for ensemble in ensembles {
406            let Some(database_id) = ensemble
407                .components
408                .iter()
409                .find_map(|fragment_id| fragment_databases.get(fragment_id).copied())
410            else {
411                unresolved_ensembles += 1;
412                if unresolved_ensemble_sample.is_none() {
413                    unresolved_ensemble_sample =
414                        Some(ensemble.components.iter().copied().collect());
415                }
416                continue;
417            };
418
419            debug_assert!(
420                ensemble
421                    .components
422                    .iter()
423                    .all(|fragment_id| fragment_databases.get(fragment_id) == Some(&database_id)),
424                "ensemble {ensemble:?} should belong to a single database"
425            );
426
427            contexts
428                .get_mut(&database_id)
429                .expect("database context should exist for ensemble")
430                .ensembles
431                .push(ensemble);
432        }
433
434        if unresolved_ensembles > 0 {
435            tracing::warn!(
436                unresolved_ensembles,
437                ?unresolved_ensemble_sample,
438                known_fragments = fragment_databases.len(),
439                "skip ensembles without resolved database while splitting loaded context"
440            );
441        }
442        debug_assert_eq!(
443            unresolved_ensembles, 0,
444            "all ensembles should be mappable to a database"
445        );
446
447        contexts
448    }
449}
450
451/// Async load stage for fragment-scoped rendering. It resolves all metadata required to later
452/// render actor assignments with arbitrary worker sets.
453pub async fn load_fragment_context<C>(
454    txn: &C,
455    ensembles: Vec<NoShuffleEnsemble>,
456) -> MetaResult<LoadedFragmentContext>
457where
458    C: ConnectionTrait,
459{
460    if ensembles.is_empty() {
461        return Ok(LoadedFragmentContext::default());
462    }
463
464    let required_fragment_ids: HashSet<_> = ensembles
465        .iter()
466        .flat_map(|ensemble| ensemble.components.iter().copied())
467        .collect();
468
469    let fragment_models = Fragment::find()
470        .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
471        .all(txn)
472        .await?;
473
474    let found_fragment_ids: HashSet<_> = fragment_models
475        .iter()
476        .map(|fragment| fragment.fragment_id)
477        .collect();
478
479    if found_fragment_ids.len() != required_fragment_ids.len() {
480        let missing = required_fragment_ids
481            .difference(&found_fragment_ids)
482            .copied()
483            .collect_vec();
484        return Err(anyhow!("fragments {:?} not found", missing).into());
485    }
486
487    let fragment_models: HashMap<_, _> = fragment_models
488        .into_iter()
489        .map(|fragment| (fragment.fragment_id, fragment))
490        .collect();
491
492    let job_ids: HashSet<_> = fragment_models
493        .values()
494        .map(|fragment| fragment.job_id)
495        .collect();
496
497    if job_ids.is_empty() {
498        return Ok(LoadedFragmentContext::default());
499    }
500
501    let jobs: HashMap<_, _> = StreamingJob::find()
502        .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
503        .all(txn)
504        .await?
505        .into_iter()
506        .map(|job| (job.job_id, job))
507        .collect();
508
509    let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
510    if found_job_ids.len() != job_ids.len() {
511        let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
512        return Err(anyhow!("streaming jobs {:?} not found", missing).into());
513    }
514
515    build_loaded_context(txn, ensembles, fragment_models, jobs).await
516}
517
518/// Async load stage for job-scoped rendering. It collects all no-shuffle ensembles and the
519/// metadata required to render actor assignments later with a provided worker set.
520pub async fn load_fragment_context_for_jobs<C>(
521    txn: &C,
522    job_ids: HashSet<JobId>,
523) -> MetaResult<LoadedFragmentContext>
524where
525    C: ConnectionTrait,
526{
527    if job_ids.is_empty() {
528        return Ok(LoadedFragmentContext::default());
529    }
530
531    let excluded_fragments_query = FragmentRelation::find()
532        .select_only()
533        .column(fragment_relation::Column::TargetFragmentId)
534        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
535        .into_query();
536
537    let condition = Condition::all()
538        .add(fragment::Column::JobId.is_in(job_ids.clone()))
539        .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
540
541    let fragments: Vec<FragmentId> = Fragment::find()
542        .select_only()
543        .column(fragment::Column::FragmentId)
544        .filter(condition)
545        .into_tuple()
546        .all(txn)
547        .await?;
548
549    let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
550
551    let fragments = Fragment::find()
552        .filter(
553            fragment::Column::FragmentId.is_in(
554                ensembles
555                    .iter()
556                    .flat_map(|graph| graph.components.iter())
557                    .cloned()
558                    .collect_vec(),
559            ),
560        )
561        .all(txn)
562        .await?;
563
564    let fragment_map: HashMap<_, _> = fragments
565        .into_iter()
566        .map(|fragment| (fragment.fragment_id, fragment))
567        .collect();
568
569    let job_ids = fragment_map
570        .values()
571        .map(|fragment| fragment.job_id)
572        .collect::<BTreeSet<_>>()
573        .into_iter()
574        .collect_vec();
575
576    let jobs: HashMap<_, _> = StreamingJob::find()
577        .filter(streaming_job::Column::JobId.is_in(job_ids))
578        .all(txn)
579        .await?
580        .into_iter()
581        .map(|job| (job.job_id, job))
582        .collect();
583
584    build_loaded_context(txn, ensembles, fragment_map, jobs).await
585}
586
587/// Sync render stage: uses loaded fragment context and current worker info
588/// to produce actor-to-worker assignments and vnode bitmaps.
589pub(crate) fn render_actor_assignments(
590    actor_id_counter: &AtomicU32,
591    worker_map: &HashMap<WorkerId, WorkerNode>,
592    loaded: &LoadedFragmentContext,
593) -> MetaResult<RenderedGraph> {
594    let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
595    render_actor_assignments_with_allocator(&mut actor_id_allocator, worker_map, loaded)
596}
597
598/// Render a graph with preview-only actor ids so callers can compare layouts
599/// without consuming the global actor id generator.
600pub(crate) fn preview_actor_assignments(
601    worker_map: &HashMap<WorkerId, WorkerNode>,
602    loaded: &LoadedFragmentContext,
603) -> MetaResult<RenderedGraph> {
604    let mut actor_id_allocator = RenderActorIdAllocator::Preview { next_actor_id: 0 };
605    render_actor_assignments_with_allocator(&mut actor_id_allocator, worker_map, loaded)
606}
607
608/// Replace preview actor ids with real actor ids after the caller has decided
609/// the rendered layout is not a no-op.
610pub(crate) fn materialize_actor_assignments(
611    actor_id_counter: &AtomicU32,
612    rendered: RenderedGraph,
613) -> RenderedGraph {
614    let RenderedGraph {
615        fragments,
616        ensembles,
617    } = rendered;
618    #[cfg(debug_assertions)]
619    let preview_slots = collect_relative_actor_slots_by_fragment(&fragments);
620
621    let mut ordered_fragments = fragments
622        .into_iter()
623        .flat_map(|(database_id, jobs)| {
624            jobs.into_iter().flat_map(move |(job_id, fragments)| {
625                fragments.into_iter().map(move |(fragment_id, fragment)| {
626                    (database_id, job_id, fragment_id, fragment)
627                })
628            })
629        })
630        .collect_vec();
631    // Preserve preview order when remapping ids so fragments from the same
632    // no-shuffle ensemble keep their relative slot alignment.
633    //
634    // This works because the preview allocator assigns actor ids with a single
635    // monotonically-increasing counter, so `min(actor_id)` per fragment faithfully
636    // reproduces the original allocation order. If the preview allocator ever
637    // changes to a non-monotonic scheme this sort key must be revisited.
638    //
639    // Empty fragments should not appear, but fragment_id keeps the ordering
640    // deterministic if they do.
641    ordered_fragments.sort_by_key(|(_, _, fragment_id, fragment)| {
642        (
643            fragment
644                .actors
645                .keys()
646                .copied()
647                .min()
648                .map(|actor_id| actor_id.as_raw_id())
649                .unwrap_or(u32::MAX),
650            *fragment_id,
651        )
652    });
653
654    let mut materialized = FragmentRenderMap::new();
655    for (database_id, job_id, fragment_id, fragment) in ordered_fragments {
656        materialized
657            .entry(database_id)
658            .or_default()
659            .entry(job_id)
660            .or_default()
661            .insert(
662                fragment_id,
663                materialize_fragment(fragment, actor_id_counter),
664            );
665    }
666
667    #[cfg(debug_assertions)]
668    assert_materialization_preserves_preview_slots(&preview_slots, &materialized);
669
670    RenderedGraph {
671        fragments: materialized,
672        ensembles,
673    }
674}
675
676#[cfg(debug_assertions)]
677type RelativeActorSlot = (u32, WorkerId, Option<Vec<usize>>, Vec<SplitId>);
678
679#[cfg(debug_assertions)]
680fn collect_relative_actor_slots(fragment: &InflightFragmentInfo) -> Vec<RelativeActorSlot> {
681    let base = fragment
682        .actors
683        .keys()
684        .copied()
685        .min()
686        .map(|actor_id| actor_id.as_raw_id())
687        .unwrap_or_default();
688
689    let mut entries = fragment
690        .actors
691        .iter()
692        .map(|(&actor_id, info)| {
693            let idx = actor_id.as_raw_id() - base;
694            let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
695                bitmap
696                    .iter_vnodes()
697                    .map(|vnode| vnode.to_index())
698                    .collect_vec()
699            });
700            let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
701            splits.sort_unstable();
702            (idx, info.worker_id, vnode_bitmap, splits)
703        })
704        .collect_vec();
705    entries.sort_unstable_by_key(|(idx, ..)| *idx);
706    entries
707}
708
709#[cfg(debug_assertions)]
710fn collect_relative_actor_slots_by_fragment(
711    fragments: &FragmentRenderMap,
712) -> HashMap<FragmentId, Vec<RelativeActorSlot>> {
713    fragments
714        .values()
715        .flat_map(|jobs| jobs.values())
716        .flatten()
717        .map(|(fragment_id, fragment)| (*fragment_id, collect_relative_actor_slots(fragment)))
718        .collect()
719}
720
721#[cfg(debug_assertions)]
722fn assert_materialization_preserves_preview_slots(
723    preview_slots: &HashMap<FragmentId, Vec<RelativeActorSlot>>,
724    materialized_fragments: &FragmentRenderMap,
725) {
726    for (database_id, jobs) in materialized_fragments {
727        for (job_id, fragments) in jobs {
728            for (fragment_id, fragment) in fragments {
729                let expected_slots = preview_slots.get(fragment_id).unwrap_or_else(|| {
730                    panic!(
731                        "preview fragment {fragment_id} for database {database_id}, job {job_id} not found"
732                    )
733                });
734
735                assert_eq!(
736                    collect_relative_actor_slots(fragment),
737                    *expected_slots,
738                    "materialization changed preview slot ordering for database {database_id}, job {job_id}, fragment {fragment_id}"
739                );
740            }
741        }
742    }
743}
744
745fn render_actor_assignments_with_allocator(
746    actor_id_allocator: &mut RenderActorIdAllocator<'_>,
747    worker_map: &HashMap<WorkerId, WorkerNode>,
748    loaded: &LoadedFragmentContext,
749) -> MetaResult<RenderedGraph> {
750    if loaded.is_empty() {
751        return Ok(RenderedGraph::empty());
752    }
753
754    let render_context = RenderActorsContext {
755        fragment_source_ids: &loaded.fragment_source_ids,
756        fragment_splits: &loaded.fragment_splits,
757        streaming_job_databases: &loaded.streaming_job_databases,
758        database_map: &loaded.database_map,
759    };
760
761    let fragments = render_actors_with_allocator(
762        actor_id_allocator,
763        &loaded.ensembles,
764        &loaded.job_fragments,
765        &loaded.job_map,
766        worker_map,
767        render_context,
768    )?;
769
770    Ok(RenderedGraph {
771        fragments,
772        ensembles: loaded.ensembles.clone(),
773    })
774}
775
776fn materialize_fragment(
777    mut fragment: InflightFragmentInfo,
778    actor_id_counter: &AtomicU32,
779) -> InflightFragmentInfo {
780    let actor_count = u32::try_from(fragment.actors.len()).expect("actor count exceeds u32::MAX");
781    let actor_id_base: ActorId = actor_id_counter
782        .fetch_add(actor_count, Ordering::Relaxed)
783        .into();
784
785    let mut actors = fragment.actors.into_iter().collect_vec();
786    actors.sort_by_key(|(actor_id, _)| *actor_id);
787
788    fragment.actors = actors
789        .into_iter()
790        .enumerate()
791        .map(|(idx, (_, info))| {
792            let actor_offset = u32::try_from(idx).expect("actor index exceeds u32::MAX");
793            (actor_id_base + actor_offset, info)
794        })
795        .collect();
796
797    fragment
798}
799
800async fn build_loaded_context<C>(
801    txn: &C,
802    ensembles: Vec<NoShuffleEnsemble>,
803    fragment_models: HashMap<FragmentId, fragment::Model>,
804    job_map: HashMap<JobId, streaming_job::Model>,
805) -> MetaResult<LoadedFragmentContext>
806where
807    C: ConnectionTrait,
808{
809    if ensembles.is_empty() {
810        return Ok(LoadedFragmentContext::default());
811    }
812
813    let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
814    for (fragment_id, model) in fragment_models {
815        job_fragments
816            .entry(model.job_id)
817            .or_default()
818            .try_insert(fragment_id, LoadedFragment::from(model))
819            .expect("duplicate fragment id for job");
820    }
821
822    #[cfg(debug_assertions)]
823    {
824        debug_sanity_check(&ensembles, &job_fragments, &job_map);
825    }
826
827    let (fragment_source_ids, fragment_splits) =
828        resolve_source_fragments(txn, &job_fragments).await?;
829
830    let job_ids = job_map.keys().copied().collect_vec();
831
832    let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
833        .select_only()
834        .column(streaming_job::Column::JobId)
835        .column(object::Column::DatabaseId)
836        .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
837        .filter(streaming_job::Column::JobId.is_in(job_ids))
838        .into_tuple()
839        .all(txn)
840        .await?
841        .into_iter()
842        .collect();
843
844    let database_map: HashMap<_, _> = Database::find()
845        .filter(
846            database::Column::DatabaseId
847                .is_in(streaming_job_databases.values().copied().collect_vec()),
848        )
849        .all(txn)
850        .await?
851        .into_iter()
852        .map(|db| (db.database_id, db))
853        .collect();
854
855    Ok(LoadedFragmentContext {
856        ensembles,
857        job_fragments,
858        job_map,
859        streaming_job_databases,
860        database_map,
861        fragment_source_ids,
862        fragment_splits,
863    })
864}
865
866// Only metadata resolved asynchronously lives here so the renderer stays synchronous
867// and the call site keeps the runtime dependencies (maps, actor counter, etc.) explicit.
868struct RenderActorsContext<'a> {
869    fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
870    fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
871    streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
872    database_map: &'a HashMap<DatabaseId, database::Model>,
873}
874
875enum RenderActorIdAllocator<'a> {
876    Persistent(&'a AtomicU32),
877    Preview { next_actor_id: u32 },
878}
879
880impl RenderActorIdAllocator<'_> {
881    fn allocate_block(&mut self, actor_count: u32) -> ActorId {
882        match self {
883            Self::Persistent(actor_id_counter) => {
884                let actor_id_base: ActorId = actor_id_counter
885                    .fetch_add(actor_count, Ordering::Relaxed)
886                    .into();
887                actor_id_base
888            }
889            Self::Preview { next_actor_id } => {
890                let actor_id_base = *next_actor_id;
891                *next_actor_id = next_actor_id
892                    .checked_add(actor_count)
893                    .expect("preview actor id overflow");
894                let actor_id_base: ActorId = actor_id_base.into();
895                actor_id_base
896            }
897        }
898    }
899}
900
901#[cfg(test)]
902fn render_actors(
903    actor_id_counter: &AtomicU32,
904    ensembles: &[NoShuffleEnsemble],
905    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
906    job_map: &HashMap<JobId, streaming_job::Model>,
907    worker_map: &HashMap<WorkerId, WorkerNode>,
908    context: RenderActorsContext<'_>,
909) -> MetaResult<FragmentRenderMap> {
910    let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
911    render_actors_with_allocator(
912        &mut actor_id_allocator,
913        ensembles,
914        job_fragments,
915        job_map,
916        worker_map,
917        context,
918    )
919}
920
921fn render_actors_with_allocator(
922    actor_id_allocator: &mut RenderActorIdAllocator<'_>,
923    ensembles: &[NoShuffleEnsemble],
924    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
925    job_map: &HashMap<JobId, streaming_job::Model>,
926    worker_map: &HashMap<WorkerId, WorkerNode>,
927    context: RenderActorsContext<'_>,
928) -> MetaResult<FragmentRenderMap> {
929    let RenderActorsContext {
930        fragment_source_ids,
931        fragment_splits: fragment_splits_map,
932        streaming_job_databases,
933        database_map,
934    } = context;
935
936    let mut all_fragments: FragmentRenderMap = HashMap::new();
937    let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
938        .values()
939        .flat_map(|fragments| fragments.iter())
940        .map(|(fragment_id, fragment)| (*fragment_id, fragment))
941        .collect();
942
943    for NoShuffleEnsemble {
944        entries,
945        components,
946    } in ensembles
947    {
948        tracing::debug!("rendering ensemble entries {:?}", entries);
949
950        let entry_fragments = entries
951            .iter()
952            .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
953            .collect_vec();
954
955        let entry_fragment_parallelism = entry_fragments
956            .iter()
957            .map(|fragment| fragment.parallelism.clone())
958            .dedup()
959            .exactly_one()
960            .map_err(|_| {
961                anyhow!(
962                    "entry fragments {:?} have inconsistent parallelism settings",
963                    entries.iter().copied().collect_vec()
964                )
965            })?;
966
967        let (job_id, distribution_type, vnode_count) = entry_fragments
968            .iter()
969            .map(|f| (f.job_id, f.distribution_type, f.vnode_count))
970            .dedup()
971            .exactly_one()
972            .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
973
974        let job = job_map
975            .get(&job_id)
976            .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
977
978        let database_resource_group = streaming_job_databases
979            .get(&job_id)
980            .and_then(|database_id| database_map.get(database_id))
981            .unwrap()
982            .resource_group
983            .clone();
984
985        let source_entry_fragment = entry_fragments.iter().find(|f| {
986            let mask = f.fragment_type_mask;
987            if mask.contains(FragmentTypeFlag::Source) {
988                assert!(!mask.contains(FragmentTypeFlag::SourceScan))
989            }
990            mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
991        });
992
993        let actor_template = EnsembleActorTemplate::render_new(
994            job,
995            worker_map,
996            entry_fragment_parallelism,
997            database_resource_group,
998            distribution_type,
999            vnode_count,
1000        )?;
1001
1002        let source_splits = match source_entry_fragment {
1003            Some(entry_fragment) => {
1004                let source_id = fragment_source_ids
1005                    .get(&entry_fragment.fragment_id)
1006                    .ok_or_else(|| {
1007                        anyhow!(
1008                            "missing source id in source fragment {}",
1009                            entry_fragment.fragment_id
1010                        )
1011                    })?;
1012
1013                let entry_fragment_id = entry_fragment.fragment_id;
1014
1015                let splits = fragment_splits_map
1016                    .get(&entry_fragment_id)
1017                    .cloned()
1018                    .unwrap_or_default();
1019
1020                let splits: std::collections::BTreeMap<_, _> =
1021                    splits.into_iter().map(|s| (s.id(), s)).collect();
1022                let splits = actor_template.assign_splits(entry_fragment_id, splits);
1023                Some((splits, *source_id))
1024            }
1025            None => None,
1026        };
1027
1028        for component_fragment_id in components {
1029            let fragment = fragment_lookup.get(component_fragment_id).unwrap();
1030            let fragment_id = fragment.fragment_id;
1031            let job_id = fragment.job_id;
1032            let fragment_type_mask = fragment.fragment_type_mask;
1033            let distribution_type = fragment.distribution_type;
1034            let stream_node = &fragment.nodes;
1035            let state_table_ids = &fragment.state_table_ids;
1036            let vnode_count = fragment.vnode_count;
1037            let source_id = fragment_source_ids.get(&fragment_id).cloned();
1038
1039            let aligner = ComponentFragmentAligner::new(&actor_template, actor_id_allocator);
1040            let actors = aligner.align_component_actor(distribution_type);
1041            let mut splits = source_id
1042                .map(|source_id| {
1043                    let (fragment_splits, shared_source_id) = source_splits.as_ref().unwrap();
1044                    assert_eq!(*shared_source_id, source_id);
1045                    aligner.align_component_splits(fragment_splits)
1046                })
1047                .unwrap_or_default();
1048
1049            let actors: HashMap<ActorId, InflightActorInfo> = actors
1050                .into_iter()
1051                .map(|(actor_id, (worker_id, vnode_bitmap))| {
1052                    (
1053                        actor_id,
1054                        InflightActorInfo {
1055                            worker_id,
1056                            vnode_bitmap,
1057                            splits: splits.remove(&actor_id).unwrap_or_default(),
1058                        },
1059                    )
1060                })
1061                .collect();
1062
1063            let fragment = InflightFragmentInfo {
1064                fragment_id,
1065                distribution_type,
1066                fragment_type_mask,
1067                vnode_count,
1068                nodes: stream_node.clone(),
1069                actors,
1070                state_table_ids: state_table_ids.clone(),
1071            };
1072
1073            let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
1074                anyhow!("streaming job {job_id} not found in streaming_job_databases")
1075            })?;
1076
1077            all_fragments
1078                .entry(database_id)
1079                .or_default()
1080                .entry(job_id)
1081                .or_default()
1082                .insert(fragment_id, fragment);
1083        }
1084    }
1085
1086    Ok(all_fragments)
1087}
1088
1089pub(crate) struct EnsembleActorTemplate {
1090    assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>>,
1091    distribution_type: DistributionType,
1092    actor_count: u32,
1093}
1094
1095impl EnsembleActorTemplate {
1096    pub(crate) fn render_new(
1097        job: &streaming_job::Model,
1098        worker_map: &HashMap<WorkerId, WorkerNode>,
1099        entry_fragment_parallelism: Option<StreamingParallelism>,
1100        database_resource_group: String,
1101        distribution_type: DistributionType,
1102        vnode_count: usize,
1103    ) -> MetaResult<Self> {
1104        let job_id = job.job_id;
1105        let job_strategy = job
1106            .adaptive_parallelism_strategy
1107            .as_deref()
1108            .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1109        let backfill_job_strategy = job
1110            .backfill_adaptive_parallelism_strategy
1111            .as_deref()
1112            .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1113
1114        let resource_group = match &job.specific_resource_group {
1115            None => database_resource_group,
1116            Some(resource_group) => resource_group.clone(),
1117        };
1118
1119        let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
1120            .iter()
1121            .filter_map(|(worker_id, worker)| {
1122                if worker
1123                    .resource_group()
1124                    .as_deref()
1125                    .unwrap_or(DEFAULT_RESOURCE_GROUP)
1126                    == resource_group.as_str()
1127                {
1128                    Some((
1129                        *worker_id,
1130                        worker
1131                            .parallelism()
1132                            .expect("should have parallelism for compute node")
1133                            .try_into()
1134                            .expect("parallelism for compute node"),
1135                    ))
1136                } else {
1137                    None
1138                }
1139            })
1140            .collect();
1141
1142        let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
1143
1144        let effective_job_parallelism = if job.job_status != JobStatus::Created {
1145            job.backfill_parallelism
1146                .as_ref()
1147                .unwrap_or(&job.parallelism)
1148        } else {
1149            &job.parallelism
1150        };
1151        let effective_job_strategy = if job.job_status != JobStatus::Created {
1152            backfill_job_strategy.or(job_strategy)
1153        } else {
1154            job_strategy
1155        };
1156
1157        let target_parallelism = match entry_fragment_parallelism
1158            .as_ref()
1159            .unwrap_or(effective_job_parallelism)
1160        {
1161            StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
1162                let effective_job_strategy = effective_job_strategy.unwrap_or_else(|| {
1163                    tracing::warn!(
1164                        job_id = %job_id,
1165                        ?effective_job_parallelism,
1166                        "adaptive/custom job is missing adaptive strategy in StreamContext; falling back to default"
1167                    );
1168                    AdaptiveParallelismStrategy::default()
1169                });
1170                effective_job_strategy.compute_target_parallelism(total_parallelism)
1171            }
1172            StreamingParallelism::Fixed(n) => *n,
1173        };
1174        let actual_parallelism = target_parallelism
1175            .min(vnode_count)
1176            .min(job.max_parallelism as usize);
1177        if actual_parallelism != target_parallelism {
1178            tracing::warn!(
1179                job_id = %job_id,
1180                target_parallelism,
1181                actual_parallelism,
1182                vnode_count,
1183                job_max_parallelism = job.max_parallelism,
1184                ?effective_job_parallelism,
1185                ?entry_fragment_parallelism,
1186                "streaming job parallelism was capped by vnode count or max parallelism"
1187            );
1188        }
1189
1190        tracing::debug!(
1191            "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
1192            job_id,
1193            actual_parallelism,
1194            job.parallelism,
1195            total_parallelism,
1196            job.max_parallelism,
1197            vnode_count,
1198            entry_fragment_parallelism
1199        );
1200
1201        let assigner = AssignerBuilder::new(job_id).build();
1202
1203        let actors = (0..(actual_parallelism as u32)).collect_vec();
1204        let vnodes = (0..vnode_count).collect_vec();
1205
1206        let raw_assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
1207
1208        let assignment = raw_assignment
1209            .into_iter()
1210            .map(|(worker_id, actors)| {
1211                let actors = actors
1212                    .into_iter()
1213                    .map(|(actor_idx, vnodes)| {
1214                        let bitmap = match distribution_type {
1215                            DistributionType::Single => None,
1216                            DistributionType::Hash => {
1217                                Some(Bitmap::from_indices(vnode_count, &vnodes))
1218                            }
1219                        };
1220                        (actor_idx, bitmap)
1221                    })
1222                    .collect();
1223                (worker_id, actors)
1224            })
1225            .collect();
1226
1227        let actor_count = u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
1228
1229        Ok(Self {
1230            assignment,
1231            distribution_type,
1232            actor_count,
1233        })
1234    }
1235
1236    pub(crate) fn from_existing_inflight_fragment(fragment: &InflightFragmentInfo) -> Self {
1237        if fragment.actors.is_empty() {
1238            return Self {
1239                assignment: BTreeMap::new(),
1240                distribution_type: fragment.distribution_type,
1241                actor_count: 0,
1242            };
1243        }
1244
1245        let actor_count = fragment.actors.len() as u32;
1246
1247        let mut assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>> = BTreeMap::new();
1248        // Enumerate actors starting from 0 index, instead of deriving from actor_id.
1249        for (actor_idx, (&_actor_id, actor_info)) in fragment.actors.iter().enumerate() {
1250            let actor_idx = actor_idx as u32;
1251            assignment
1252                .entry(actor_info.worker_id)
1253                .or_default()
1254                .insert(actor_idx, actor_info.vnode_bitmap.clone());
1255        }
1256
1257        Self {
1258            assignment,
1259            distribution_type: fragment.distribution_type,
1260            actor_count,
1261        }
1262    }
1263
1264    /// Assert that two `EnsembleActorTemplate` are aligned: same distribution type,
1265    /// same actor count, and same vnode bitmap / worker placement. Used to verify that
1266    /// multiple existing fragments within the same no-shuffle ensemble are consistent.
1267    ///
1268    /// Internally calls [`resolve_no_shuffle_actor_mapping`] which asserts distribution
1269    /// type, count, and bitmap equality. Then additionally verifies worker placement.
1270    pub(crate) fn assert_aligned_with(
1271        &self,
1272        other: &Self,
1273        self_fragment_id: FragmentId,
1274        other_fragment_id: FragmentId,
1275    ) {
1276        let mapping = resolve_no_shuffle_actor_mapping(
1277            self.distribution_type,
1278            self.assignment
1279                .iter()
1280                .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1281            other.distribution_type,
1282            other
1283                .assignment
1284                .iter()
1285                .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1286        );
1287
1288        for ((self_worker, _self_idx), (other_worker, _other_idx)) in &mapping {
1289            assert_eq!(
1290                self_worker, other_worker,
1291                "fragments {} and {} disagree on worker placement: {:?}",
1292                self_fragment_id, other_fragment_id, mapping,
1293            );
1294        }
1295    }
1296
1297    fn assign_splits(
1298        &self,
1299        entry_fragment_id: FragmentId,
1300        splits: BTreeMap<SplitId, SplitImpl>,
1301    ) -> HashMap<u32, Vec<SplitImpl>> {
1302        {
1303            {
1304                let empty_actor_splits: HashMap<_, _> = self
1305                    .assignment
1306                    .values()
1307                    .flat_map(|actors| actors.keys())
1308                    .map(|actor_id| (*actor_id, vec![]))
1309                    .collect();
1310
1311                crate::stream::source_manager::reassign_splits(
1312                    entry_fragment_id,
1313                    empty_actor_splits,
1314                    &splits,
1315                    SplitDiffOptions::default(),
1316                )
1317                .unwrap_or_default()
1318            }
1319        }
1320    }
1321}
1322
1323pub(crate) struct ComponentFragmentAligner<'a> {
1324    actor_template: &'a EnsembleActorTemplate,
1325    actor_id_base: ActorId,
1326}
1327
1328impl<'a> ComponentFragmentAligner<'a> {
1329    fn new(
1330        actor_template: &'a EnsembleActorTemplate,
1331        actor_id_allocator: &mut RenderActorIdAllocator<'_>,
1332    ) -> Self {
1333        let actor_id_base = actor_id_allocator.allocate_block(actor_template.actor_count);
1334        Self {
1335            actor_template,
1336            actor_id_base,
1337        }
1338    }
1339
1340    pub(crate) fn new_persistent(
1341        actor_template: &'a EnsembleActorTemplate,
1342        actor_id_counter: &AtomicU32,
1343    ) -> Self {
1344        let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
1345        Self::new(actor_template, &mut actor_id_allocator)
1346    }
1347
1348    pub(crate) fn align_component_actor(
1349        &self,
1350        distribution_type: DistributionType,
1351    ) -> HashMap<ActorId, (WorkerId, Option<Bitmap>)> {
1352        let EnsembleActorTemplate {
1353            assignment,
1354            actor_count,
1355            distribution_type: _,
1356        } = &self.actor_template;
1357        let actor_id_base = self.actor_id_base;
1358        {
1359            assignment
1360                .iter()
1361                .flat_map(|(worker_id, actors)| {
1362                    actors
1363                        .iter()
1364                        .map(move |(actor_idx, bitmap)| (worker_id, actor_idx, bitmap))
1365                })
1366                .map(|(&worker_id, &actor_idx, bitmap)| {
1367                    if distribution_type == DistributionType::Single {
1368                        assert_eq!(*actor_count, 1);
1369                    }
1370
1371                    let actor_id = actor_id_base + actor_idx;
1372
1373                    (actor_id, (worker_id, bitmap.clone()))
1374                })
1375                .collect()
1376        }
1377    }
1378
1379    pub(crate) fn align_component_splits(
1380        &self,
1381        split_assignment: &HashMap<u32, Vec<SplitImpl>>,
1382    ) -> HashMap<ActorId, Vec<SplitImpl>> {
1383        (0..self.actor_template.actor_count)
1384            .filter_map(|actor_idx| {
1385                split_assignment
1386                    .get(&actor_idx)
1387                    .map(|splits| ((self.actor_id_base + actor_idx), splits.clone()))
1388            })
1389            .collect()
1390    }
1391}
1392
1393#[cfg(debug_assertions)]
1394fn debug_sanity_check(
1395    ensembles: &[NoShuffleEnsemble],
1396    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1397    jobs: &HashMap<JobId, streaming_job::Model>,
1398) {
1399    let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
1400        .iter()
1401        .flat_map(|(job_id, fragments)| {
1402            fragments
1403                .iter()
1404                .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
1405        })
1406        .collect();
1407
1408    // Debug-only assertions to catch inconsistent ensemble metadata early.
1409    debug_assert!(
1410        ensembles
1411            .iter()
1412            .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
1413        "entries must be subset of components"
1414    );
1415
1416    let mut missing_fragments = BTreeSet::new();
1417    let mut missing_jobs = BTreeSet::new();
1418
1419    for fragment_id in ensembles
1420        .iter()
1421        .flat_map(|ensemble| ensemble.components.iter())
1422    {
1423        match fragment_lookup.get(fragment_id) {
1424            Some((fragment, job_id)) => {
1425                if !jobs.contains_key(&fragment.job_id) {
1426                    missing_jobs.insert(*job_id);
1427                }
1428            }
1429            None => {
1430                missing_fragments.insert(*fragment_id);
1431            }
1432        }
1433    }
1434
1435    debug_assert!(
1436        missing_fragments.is_empty(),
1437        "missing fragments in fragment_map: {:?}",
1438        missing_fragments
1439    );
1440
1441    debug_assert!(
1442        missing_jobs.is_empty(),
1443        "missing jobs for fragments' job_id: {:?}",
1444        missing_jobs
1445    );
1446
1447    for ensemble in ensembles {
1448        let unique_vnode_counts: Vec<_> = ensemble
1449            .components
1450            .iter()
1451            .flat_map(|fragment_id| {
1452                fragment_lookup
1453                    .get(fragment_id)
1454                    .map(|(fragment, _)| fragment.vnode_count)
1455            })
1456            .unique()
1457            .collect();
1458
1459        debug_assert!(
1460            unique_vnode_counts.len() <= 1,
1461            "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
1462            ensemble.components,
1463            unique_vnode_counts
1464        );
1465    }
1466}
1467
1468async fn resolve_source_fragments<C>(
1469    txn: &C,
1470    job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1471) -> MetaResult<(
1472    HashMap<FragmentId, SourceId>,
1473    HashMap<FragmentId, Vec<SplitImpl>>,
1474)>
1475where
1476    C: ConnectionTrait,
1477{
1478    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1479    for (fragment_id, fragment) in job_fragments.values().flatten() {
1480        let mask = fragment.fragment_type_mask;
1481        if mask.contains(FragmentTypeFlag::Source)
1482            && let Some(source_id) = fragment.nodes.find_stream_source()
1483        {
1484            source_fragment_ids
1485                .entry(source_id)
1486                .or_default()
1487                .insert(*fragment_id);
1488        }
1489
1490        if mask.contains(FragmentTypeFlag::SourceScan)
1491            && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1492        {
1493            source_fragment_ids
1494                .entry(source_id)
1495                .or_default()
1496                .insert(*fragment_id);
1497        }
1498    }
1499
1500    let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1501        .iter()
1502        .flat_map(|(source_id, fragment_ids)| {
1503            fragment_ids
1504                .iter()
1505                .map(|fragment_id| (*fragment_id, *source_id))
1506        })
1507        .collect();
1508
1509    let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1510
1511    let fragment_splits: Vec<_> = FragmentSplits::find()
1512        .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1513        .all(txn)
1514        .await?;
1515
1516    let fragment_splits: HashMap<_, _> = fragment_splits
1517        .into_iter()
1518        .flat_map(|model| {
1519            model.splits.map(|splits| {
1520                (
1521                    model.fragment_id,
1522                    splits
1523                        .to_protobuf()
1524                        .splits
1525                        .iter()
1526                        .flat_map(SplitImpl::try_from)
1527                        .collect_vec(),
1528                )
1529            })
1530        })
1531        .collect();
1532
1533    Ok((fragment_source_ids, fragment_splits))
1534}
1535
1536// Helper struct to make the function signature cleaner and to properly bundle the required data.
1537#[derive(Debug)]
1538pub struct ActorGraph<'a> {
1539    pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1540    pub locations: &'a HashMap<ActorId, WorkerId>,
1541}
1542
1543#[derive(Debug, Clone)]
1544pub struct NoShuffleEnsemble {
1545    entries: HashSet<FragmentId>,
1546    components: HashSet<FragmentId>,
1547}
1548
1549impl NoShuffleEnsemble {
1550    /// Create a single-fragment ensemble (for standalone fragments with no `NoShuffle` edges).
1551    pub(crate) fn singleton(fragment_id: FragmentId) -> Self {
1552        Self {
1553            entries: HashSet::from_iter([fragment_id]),
1554            components: HashSet::from_iter([fragment_id]),
1555        }
1556    }
1557
1558    #[cfg(test)]
1559    pub fn for_test(
1560        entries: impl IntoIterator<Item = FragmentId>,
1561        components: impl IntoIterator<Item = FragmentId>,
1562    ) -> Self {
1563        let entries = entries.into_iter().collect();
1564        let components = components.into_iter().collect();
1565        Self {
1566            entries,
1567            components,
1568        }
1569    }
1570
1571    pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1572        self.components.iter().cloned()
1573    }
1574
1575    pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1576        self.entries.iter().copied()
1577    }
1578
1579    pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1580        self.components.iter().copied()
1581    }
1582
1583    pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1584        self.entries.contains(fragment_id)
1585    }
1586}
1587
1588pub async fn find_fragment_no_shuffle_dags_detailed(
1589    db: &impl ConnectionTrait,
1590    initial_fragment_ids: &[FragmentId],
1591) -> MetaResult<Vec<NoShuffleEnsemble>> {
1592    let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1593        .columns([
1594            fragment_relation::Column::SourceFragmentId,
1595            fragment_relation::Column::TargetFragmentId,
1596        ])
1597        .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1598        .into_tuple()
1599        .all(db)
1600        .await?;
1601
1602    let (forward_edges, backward_edges) =
1603        build_no_shuffle_fragment_graph_edges(all_no_shuffle_relations);
1604
1605    find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1606}
1607
1608pub(crate) fn build_no_shuffle_fragment_graph_edges(
1609    relations: impl IntoIterator<Item = (FragmentId, FragmentId)>,
1610) -> (
1611    HashMap<FragmentId, Vec<FragmentId>>,
1612    HashMap<FragmentId, Vec<FragmentId>>,
1613) {
1614    let mut forward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1615    let mut backward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1616
1617    for (src, dst) in relations {
1618        forward_edges.entry(src).or_default().insert(dst);
1619        backward_edges.entry(dst).or_default().insert(src);
1620    }
1621
1622    let forward_edges = forward_edges
1623        .into_iter()
1624        .map(|(src, dst_set)| (src, dst_set.into_iter().collect()))
1625        .collect();
1626    let backward_edges = backward_edges
1627        .into_iter()
1628        .map(|(dst, src_set)| (dst, src_set.into_iter().collect()))
1629        .collect();
1630
1631    (forward_edges, backward_edges)
1632}
1633
1634pub(crate) fn find_no_shuffle_graphs(
1635    initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1636    forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1637    backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1638) -> MetaResult<Vec<NoShuffleEnsemble>> {
1639    let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1640    let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1641
1642    for &init_id in initial_fragment_ids {
1643        let init_id = init_id.into();
1644        if globally_visited.contains(&init_id) {
1645            continue;
1646        }
1647
1648        // Found a new component. Traverse it to find all its nodes.
1649        let mut components = HashSet::new();
1650        let mut queue: VecDeque<FragmentId> = VecDeque::new();
1651
1652        queue.push_back(init_id);
1653        globally_visited.insert(init_id);
1654
1655        while let Some(current_id) = queue.pop_front() {
1656            components.insert(current_id);
1657            let neighbors = forward_edges
1658                .get(&current_id)
1659                .into_iter()
1660                .flatten()
1661                .chain(backward_edges.get(&current_id).into_iter().flatten());
1662
1663            for &neighbor_id in neighbors {
1664                if globally_visited.insert(neighbor_id) {
1665                    queue.push_back(neighbor_id);
1666                }
1667            }
1668        }
1669
1670        // For the newly found component, identify its roots.
1671        let mut entries = HashSet::new();
1672        for &node_id in &components {
1673            let is_root = match backward_edges.get(&node_id) {
1674                Some(parents) => parents.iter().all(|p| !components.contains(p)),
1675                None => true,
1676            };
1677            if is_root {
1678                entries.insert(node_id);
1679            }
1680        }
1681
1682        // Store the detailed DAG structure (roots, all nodes in this DAG).
1683        if !entries.is_empty() {
1684            graphs.push(NoShuffleEnsemble {
1685                entries,
1686                components,
1687            });
1688        }
1689    }
1690
1691    Ok(graphs)
1692}
1693
1694#[cfg(test)]
1695mod tests {
1696    use std::collections::{BTreeSet, HashMap, HashSet};
1697    use std::sync::Arc;
1698
1699    use risingwave_connector::source::SplitImpl;
1700    use risingwave_connector::source::test_source::TestSourceSplit;
1701    use risingwave_meta_model::{CreateType, JobStatus};
1702    use risingwave_pb::common::WorkerType;
1703    use risingwave_pb::common::worker_node::Property as WorkerProperty;
1704    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1705
1706    use super::*;
1707
1708    // Helper type aliases for cleaner test code
1709    // Using the actual FragmentId type from the module
1710    type Edges = (
1711        HashMap<FragmentId, Vec<FragmentId>>,
1712        HashMap<FragmentId, Vec<FragmentId>>,
1713    );
1714
1715    /// A helper function to build forward and backward edge maps from a simple list of tuples.
1716    /// This reduces boilerplate in each test.
1717    fn build_edges(relations: &[(u32, u32)]) -> Edges {
1718        let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1719        let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1720        for &(src, dst) in relations {
1721            forward_edges
1722                .entry(src.into())
1723                .or_default()
1724                .push(dst.into());
1725            backward_edges
1726                .entry(dst.into())
1727                .or_default()
1728                .push(src.into());
1729        }
1730        (forward_edges, backward_edges)
1731    }
1732
1733    /// Helper function to create a `HashSet` from a slice easily.
1734    fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1735        ids.iter().map(|id| (*id).into()).collect()
1736    }
1737
1738    fn build_fragment(
1739        fragment_id: FragmentId,
1740        job_id: JobId,
1741        fragment_type_mask: i32,
1742        distribution_type: DistributionType,
1743        vnode_count: i32,
1744        parallelism: StreamingParallelism,
1745    ) -> LoadedFragment {
1746        LoadedFragment {
1747            fragment_id,
1748            job_id,
1749            fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1750            distribution_type,
1751            vnode_count: vnode_count as usize,
1752            nodes: PbStreamNode::default(),
1753            state_table_ids: HashSet::new(),
1754            parallelism: Some(parallelism),
1755        }
1756    }
1757
1758    type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1759
1760    fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1761        let base = fragment.actors.keys().copied().min().unwrap_or_default();
1762
1763        let mut entries: Vec<_> = fragment
1764            .actors
1765            .iter()
1766            .map(|(&actor_id, info)| {
1767                let idx = actor_id.as_raw_id() - base.as_raw_id();
1768                let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1769                    bitmap
1770                        .iter()
1771                        .enumerate()
1772                        .filter_map(|(pos, is_set)| is_set.then_some(pos))
1773                        .collect::<Vec<_>>()
1774                });
1775                let splits = info
1776                    .splits
1777                    .iter()
1778                    .map(|split| split.id().to_string())
1779                    .collect::<Vec<_>>();
1780                (idx.into(), info.worker_id, vnode_indices, splits)
1781            })
1782            .collect();
1783
1784        entries.sort_by_key(|(idx, _, _, _)| *idx);
1785        entries
1786    }
1787
1788    fn build_worker_node(
1789        id: impl Into<WorkerId>,
1790        parallelism: usize,
1791        resource_group: &str,
1792    ) -> WorkerNode {
1793        WorkerNode {
1794            id: id.into(),
1795            r#type: WorkerType::ComputeNode as i32,
1796            property: Some(WorkerProperty {
1797                is_streaming: true,
1798                parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1799                resource_group: Some(resource_group.to_owned()),
1800                ..Default::default()
1801            }),
1802            ..Default::default()
1803        }
1804    }
1805
1806    #[test]
1807    fn test_single_linear_chain() {
1808        // Scenario: A simple linear graph 1 -> 2 -> 3.
1809        // We start from the middle node (2).
1810        let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1811        let initial_ids = &[2];
1812
1813        // Act
1814        let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1815
1816        // Assert
1817        assert!(result.is_ok());
1818        let graphs = result.unwrap();
1819
1820        assert_eq!(graphs.len(), 1);
1821        let graph = &graphs[0];
1822        assert_eq!(graph.entries, to_hashset(&[1]));
1823        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1824    }
1825
1826    #[test]
1827    fn test_two_disconnected_graphs() {
1828        // Scenario: Two separate graphs: 1->2 and 10->11.
1829        // We start with one node from each graph.
1830        let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1831        let initial_ids = &[2, 10];
1832
1833        // Act
1834        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1835
1836        // Assert
1837        assert_eq!(graphs.len(), 2);
1838
1839        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1840        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1841
1842        // Graph 1
1843        assert_eq!(graphs[0].entries, to_hashset(&[1]));
1844        assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1845
1846        // Graph 2
1847        assert_eq!(graphs[1].entries, to_hashset(&[10]));
1848        assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1849    }
1850
1851    #[test]
1852    fn test_multiple_entries_in_one_graph() {
1853        // Scenario: A graph with two roots feeding into one node: 1->3, 2->3.
1854        let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1855        let initial_ids = &[3];
1856
1857        // Act
1858        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1859
1860        // Assert
1861        assert_eq!(graphs.len(), 1);
1862        let graph = &graphs[0];
1863        assert_eq!(graph.entries, to_hashset(&[1, 2]));
1864        assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1865    }
1866
1867    #[test]
1868    fn test_diamond_shape_graph() {
1869        // Scenario: A diamond shape: 1->2, 1->3, 2->4, 3->4
1870        let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1871        let initial_ids = &[4];
1872
1873        // Act
1874        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1875
1876        // Assert
1877        assert_eq!(graphs.len(), 1);
1878        let graph = &graphs[0];
1879        assert_eq!(graph.entries, to_hashset(&[1]));
1880        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1881    }
1882
1883    #[test]
1884    fn test_starting_with_multiple_nodes_in_same_graph() {
1885        // Scenario: Start with two different nodes (2 and 4) from the same component.
1886        // Should only identify one graph, not two.
1887        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1888        let initial_ids = &[2, 4];
1889
1890        // Act
1891        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1892
1893        // Assert
1894        assert_eq!(graphs.len(), 1);
1895        let graph = &graphs[0];
1896        assert_eq!(graph.entries, to_hashset(&[1]));
1897        assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1898    }
1899
1900    #[test]
1901    fn test_empty_initial_ids() {
1902        // Scenario: The initial ID list is empty.
1903        let (forward, backward) = build_edges(&[(1, 2)]);
1904        let initial_ids: &[u32] = &[];
1905
1906        // Act
1907        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1908
1909        // Assert
1910        assert!(graphs.is_empty());
1911    }
1912
1913    #[test]
1914    fn test_isolated_node_as_input() {
1915        // Scenario: Start with an ID that has no relations.
1916        let (forward, backward) = build_edges(&[(1, 2)]);
1917        let initial_ids = &[100];
1918
1919        // Act
1920        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1921
1922        // Assert
1923        assert_eq!(graphs.len(), 1);
1924        let graph = &graphs[0];
1925        assert_eq!(graph.entries, to_hashset(&[100]));
1926        assert_eq!(graph.components, to_hashset(&[100]));
1927    }
1928
1929    #[test]
1930    fn test_graph_with_a_cycle() {
1931        // Scenario: A graph with a cycle: 1 -> 2 -> 3 -> 1.
1932        // The algorithm should correctly identify all nodes in the component.
1933        // Crucially, NO node is a root because every node has a parent *within the component*.
1934        // Therefore, the `entries` set should be empty, and the graph should not be included in the results.
1935        let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1936        let initial_ids = &[2];
1937
1938        // Act
1939        let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1940
1941        // Assert
1942        assert!(
1943            graphs.is_empty(),
1944            "A graph with no entries should not be returned"
1945        );
1946    }
1947    #[test]
1948    fn test_custom_complex() {
1949        let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1950        let initial_ids = &[1, 2, 4, 6];
1951
1952        // Act
1953        let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1954
1955        // Assert
1956        assert_eq!(graphs.len(), 2);
1957        // Sort results to make the test deterministic, as HashMap iteration order is not guaranteed.
1958        graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1959
1960        // Graph 1
1961        assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1962        assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1963
1964        // Graph 2
1965        assert_eq!(graphs[1].entries, to_hashset(&[6]));
1966        assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1967    }
1968
1969    #[test]
1970    fn render_actors_increments_actor_counter() {
1971        let actor_id_counter = AtomicU32::new(100);
1972        let fragment_id: FragmentId = 1.into();
1973        let job_id: JobId = 10.into();
1974        let database_id: DatabaseId = DatabaseId::new(3);
1975
1976        let fragment_model = build_fragment(
1977            fragment_id,
1978            job_id,
1979            0,
1980            DistributionType::Single,
1981            1,
1982            StreamingParallelism::Fixed(1),
1983        );
1984
1985        let job_model = streaming_job::Model {
1986            job_id,
1987            job_status: JobStatus::Created,
1988            create_type: CreateType::Foreground,
1989            timezone: None,
1990            config_override: None,
1991            adaptive_parallelism_strategy: None,
1992            parallelism: StreamingParallelism::Fixed(1),
1993            backfill_parallelism: None,
1994            backfill_adaptive_parallelism_strategy: None,
1995            backfill_orders: None,
1996            max_parallelism: 1,
1997            specific_resource_group: None,
1998            is_serverless_backfill: false,
1999            refresh_interval_sec: None,
2000        };
2001
2002        let database_model = database::Model {
2003            database_id,
2004            name: "test_db".into(),
2005            resource_group: "rg-a".into(),
2006            barrier_interval_ms: None,
2007            checkpoint_frequency: None,
2008        };
2009
2010        let ensembles = vec![NoShuffleEnsemble {
2011            entries: HashSet::from([fragment_id]),
2012            components: HashSet::from([fragment_id]),
2013        }];
2014
2015        let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
2016        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2017        let job_map = HashMap::from([(job_id, job_model)]);
2018
2019        let worker_map: HashMap<WorkerId, WorkerNode> =
2020            HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
2021
2022        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2023        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2024        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2025        let database_map = HashMap::from([(database_id, database_model)]);
2026
2027        let context = RenderActorsContext {
2028            fragment_source_ids: &fragment_source_ids,
2029            fragment_splits: &fragment_splits,
2030            streaming_job_databases: &streaming_job_databases,
2031            database_map: &database_map,
2032        };
2033
2034        let result = render_actors(
2035            &actor_id_counter,
2036            &ensembles,
2037            &job_fragments,
2038            &job_map,
2039            &worker_map,
2040            context,
2041        )
2042        .expect("actor rendering succeeds");
2043
2044        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2045        assert_eq!(state.len(), 1);
2046        assert!(
2047            state[0].2.is_none(),
2048            "single distribution should not assign vnode bitmaps"
2049        );
2050        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
2051    }
2052
2053    #[test]
2054    fn render_actors_aligns_hash_vnode_bitmaps() {
2055        let actor_id_counter = AtomicU32::new(0);
2056        let entry_fragment_id: FragmentId = 1.into();
2057        let downstream_fragment_id: FragmentId = 2.into();
2058        let job_id: JobId = 20.into();
2059        let database_id: DatabaseId = DatabaseId::new(5);
2060
2061        let entry_fragment = build_fragment(
2062            entry_fragment_id,
2063            job_id,
2064            0,
2065            DistributionType::Hash,
2066            4,
2067            StreamingParallelism::Fixed(2),
2068        );
2069
2070        let downstream_fragment = build_fragment(
2071            downstream_fragment_id,
2072            job_id,
2073            0,
2074            DistributionType::Hash,
2075            4,
2076            StreamingParallelism::Fixed(2),
2077        );
2078
2079        let job_model = streaming_job::Model {
2080            job_id,
2081            job_status: JobStatus::Created,
2082            create_type: CreateType::Background,
2083            timezone: None,
2084            config_override: None,
2085            adaptive_parallelism_strategy: None,
2086            parallelism: StreamingParallelism::Fixed(2),
2087            backfill_parallelism: None,
2088            backfill_adaptive_parallelism_strategy: None,
2089            backfill_orders: None,
2090            max_parallelism: 2,
2091            specific_resource_group: None,
2092            is_serverless_backfill: false,
2093            refresh_interval_sec: None,
2094        };
2095
2096        let database_model = database::Model {
2097            database_id,
2098            name: "test_db_hash".into(),
2099            resource_group: "rg-hash".into(),
2100            barrier_interval_ms: None,
2101            checkpoint_frequency: None,
2102        };
2103
2104        let ensembles = vec![NoShuffleEnsemble {
2105            entries: HashSet::from([entry_fragment_id]),
2106            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2107        }];
2108
2109        let fragment_map = HashMap::from([
2110            (entry_fragment_id, entry_fragment),
2111            (downstream_fragment_id, downstream_fragment),
2112        ]);
2113        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2114        let job_map = HashMap::from([(job_id, job_model)]);
2115
2116        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2117            (1.into(), build_worker_node(1, 1, "rg-hash")),
2118            (2.into(), build_worker_node(2, 1, "rg-hash")),
2119        ]);
2120
2121        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2122        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2123        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2124        let database_map = HashMap::from([(database_id, database_model)]);
2125
2126        let context = RenderActorsContext {
2127            fragment_source_ids: &fragment_source_ids,
2128            fragment_splits: &fragment_splits,
2129            streaming_job_databases: &streaming_job_databases,
2130            database_map: &database_map,
2131        };
2132
2133        let result = render_actors(
2134            &actor_id_counter,
2135            &ensembles,
2136            &job_fragments,
2137            &job_map,
2138            &worker_map,
2139            context,
2140        )
2141        .expect("actor rendering succeeds");
2142
2143        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2144        let downstream_state =
2145            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2146
2147        assert_eq!(entry_state.len(), 2);
2148        assert_eq!(entry_state, downstream_state);
2149
2150        let assigned_vnodes: BTreeSet<_> = entry_state
2151            .iter()
2152            .flat_map(|(_, _, vnodes, _)| {
2153                vnodes
2154                    .as_ref()
2155                    .expect("hash distribution should populate vnode bitmap")
2156                    .iter()
2157                    .copied()
2158            })
2159            .collect();
2160        assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
2161        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2162    }
2163
2164    #[test]
2165    fn render_actors_propagates_source_splits() {
2166        let actor_id_counter = AtomicU32::new(0);
2167        let entry_fragment_id: FragmentId = 11.into();
2168        let downstream_fragment_id: FragmentId = 12.into();
2169        let job_id: JobId = 30.into();
2170        let database_id: DatabaseId = DatabaseId::new(7);
2171        let source_id: SourceId = 99.into();
2172
2173        let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2174        let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2175
2176        let entry_fragment = build_fragment(
2177            entry_fragment_id,
2178            job_id,
2179            source_mask,
2180            DistributionType::Hash,
2181            4,
2182            StreamingParallelism::Fixed(2),
2183        );
2184
2185        let downstream_fragment = build_fragment(
2186            downstream_fragment_id,
2187            job_id,
2188            source_scan_mask,
2189            DistributionType::Hash,
2190            4,
2191            StreamingParallelism::Fixed(2),
2192        );
2193
2194        let job_model = streaming_job::Model {
2195            job_id,
2196            job_status: JobStatus::Created,
2197            create_type: CreateType::Background,
2198            timezone: None,
2199            config_override: None,
2200            adaptive_parallelism_strategy: None,
2201            parallelism: StreamingParallelism::Fixed(2),
2202            backfill_parallelism: None,
2203            backfill_adaptive_parallelism_strategy: None,
2204            backfill_orders: None,
2205            max_parallelism: 2,
2206            specific_resource_group: None,
2207            is_serverless_backfill: false,
2208            refresh_interval_sec: None,
2209        };
2210
2211        let database_model = database::Model {
2212            database_id,
2213            name: "split_db".into(),
2214            resource_group: "rg-source".into(),
2215            barrier_interval_ms: None,
2216            checkpoint_frequency: None,
2217        };
2218
2219        let ensembles = vec![NoShuffleEnsemble {
2220            entries: HashSet::from([entry_fragment_id]),
2221            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2222        }];
2223
2224        let fragment_map = HashMap::from([
2225            (entry_fragment_id, entry_fragment),
2226            (downstream_fragment_id, downstream_fragment),
2227        ]);
2228        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2229        let job_map = HashMap::from([(job_id, job_model)]);
2230
2231        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2232            (1.into(), build_worker_node(1, 1, "rg-source")),
2233            (2.into(), build_worker_node(2, 1, "rg-source")),
2234        ]);
2235
2236        let split_a = SplitImpl::Test(TestSourceSplit {
2237            id: Arc::<str>::from("split-a"),
2238            properties: HashMap::new(),
2239            offset: "0".into(),
2240        });
2241        let split_b = SplitImpl::Test(TestSourceSplit {
2242            id: Arc::<str>::from("split-b"),
2243            properties: HashMap::new(),
2244            offset: "0".into(),
2245        });
2246
2247        let fragment_source_ids = HashMap::from([
2248            (entry_fragment_id, source_id),
2249            (downstream_fragment_id, source_id),
2250        ]);
2251        let fragment_splits =
2252            HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
2253        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2254        let database_map = HashMap::from([(database_id, database_model)]);
2255
2256        let context = RenderActorsContext {
2257            fragment_source_ids: &fragment_source_ids,
2258            fragment_splits: &fragment_splits,
2259            streaming_job_databases: &streaming_job_databases,
2260            database_map: &database_map,
2261        };
2262
2263        let result = render_actors(
2264            &actor_id_counter,
2265            &ensembles,
2266            &job_fragments,
2267            &job_map,
2268            &worker_map,
2269            context,
2270        )
2271        .expect("actor rendering succeeds");
2272
2273        let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2274        let downstream_state =
2275            collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2276
2277        assert_eq!(entry_state, downstream_state);
2278
2279        let split_ids: BTreeSet<_> = entry_state
2280            .iter()
2281            .flat_map(|(_, _, _, splits)| splits.iter().cloned())
2282            .collect();
2283        assert_eq!(
2284            split_ids,
2285            BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
2286        );
2287        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2288    }
2289
2290    #[test]
2291    fn preview_actor_assignments_defers_actor_id_allocation() {
2292        let actor_id_counter = AtomicU32::new(100);
2293        let entry_fragment_id: FragmentId = 11.into();
2294        let downstream_fragment_id: FragmentId = 12.into();
2295        let job_id: JobId = 30.into();
2296        let database_id: DatabaseId = DatabaseId::new(7);
2297        let source_id: SourceId = 99.into();
2298
2299        let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2300        let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2301
2302        let entry_fragment = build_fragment(
2303            entry_fragment_id,
2304            job_id,
2305            source_mask,
2306            DistributionType::Hash,
2307            4,
2308            StreamingParallelism::Fixed(2),
2309        );
2310
2311        let downstream_fragment = build_fragment(
2312            downstream_fragment_id,
2313            job_id,
2314            source_scan_mask,
2315            DistributionType::Hash,
2316            4,
2317            StreamingParallelism::Fixed(2),
2318        );
2319
2320        let job_model = streaming_job::Model {
2321            job_id,
2322            job_status: JobStatus::Created,
2323            create_type: CreateType::Background,
2324            timezone: None,
2325            config_override: None,
2326            adaptive_parallelism_strategy: None,
2327            parallelism: StreamingParallelism::Fixed(2),
2328            backfill_parallelism: None,
2329            backfill_adaptive_parallelism_strategy: None,
2330            backfill_orders: None,
2331            max_parallelism: 2,
2332            specific_resource_group: None,
2333            is_serverless_backfill: false,
2334            refresh_interval_sec: None,
2335        };
2336
2337        let database_model = database::Model {
2338            database_id,
2339            name: "preview_db".into(),
2340            resource_group: "rg-source".into(),
2341            barrier_interval_ms: None,
2342            checkpoint_frequency: None,
2343        };
2344
2345        let ensembles = vec![NoShuffleEnsemble {
2346            entries: HashSet::from([entry_fragment_id]),
2347            components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2348        }];
2349
2350        let fragment_map = HashMap::from([
2351            (entry_fragment_id, entry_fragment),
2352            (downstream_fragment_id, downstream_fragment),
2353        ]);
2354        let job_fragments = HashMap::from([(job_id, fragment_map)]);
2355        let job_map = HashMap::from([(job_id, job_model)]);
2356
2357        let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2358            (1.into(), build_worker_node(1, 1, "rg-source")),
2359            (2.into(), build_worker_node(2, 1, "rg-source")),
2360        ]);
2361
2362        let split_a = SplitImpl::Test(TestSourceSplit {
2363            id: Arc::<str>::from("split-a"),
2364            properties: HashMap::new(),
2365            offset: "0".into(),
2366        });
2367        let split_b = SplitImpl::Test(TestSourceSplit {
2368            id: Arc::<str>::from("split-b"),
2369            properties: HashMap::new(),
2370            offset: "0".into(),
2371        });
2372
2373        let loaded = LoadedFragmentContext {
2374            ensembles,
2375            job_fragments,
2376            job_map,
2377            streaming_job_databases: HashMap::from([(job_id, database_id)]),
2378            database_map: HashMap::from([(database_id, database_model)]),
2379            fragment_source_ids: HashMap::from([
2380                (entry_fragment_id, source_id),
2381                (downstream_fragment_id, source_id),
2382            ]),
2383            fragment_splits: HashMap::from([(entry_fragment_id, vec![split_a, split_b])]),
2384        };
2385
2386        let preview =
2387            preview_actor_assignments(&worker_map, &loaded).expect("preview rendering succeeds");
2388        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 100);
2389
2390        let preview_entry_state =
2391            collect_actor_state(&preview.fragments[&database_id][&job_id][&entry_fragment_id]);
2392        let preview_downstream_state =
2393            collect_actor_state(&preview.fragments[&database_id][&job_id][&downstream_fragment_id]);
2394        assert_eq!(preview_entry_state, preview_downstream_state);
2395
2396        let materialized = materialize_actor_assignments(&actor_id_counter, preview);
2397
2398        let materialized_entry_state =
2399            collect_actor_state(&materialized.fragments[&database_id][&job_id][&entry_fragment_id]);
2400        let materialized_downstream_state = collect_actor_state(
2401            &materialized.fragments[&database_id][&job_id][&downstream_fragment_id],
2402        );
2403
2404        assert_eq!(materialized_entry_state, materialized_downstream_state);
2405        assert_eq!(materialized_entry_state, preview_entry_state);
2406        assert_eq!(materialized_downstream_state, preview_downstream_state);
2407        assert_eq!(actor_id_counter.load(Ordering::Relaxed), 104);
2408    }
2409
2410    /// Test that job-level strategy overrides global strategy for Adaptive parallelism.
2411    #[test]
2412    fn render_actors_job_strategy_overrides_global() {
2413        let actor_id_counter = AtomicU32::new(0);
2414        let fragment_id: FragmentId = 1.into();
2415        let job_id: JobId = 100.into();
2416        let database_id: DatabaseId = DatabaseId::new(10);
2417
2418        // Fragment with Adaptive parallelism, vnode_count = 8
2419        let fragment_model = build_fragment(
2420            fragment_id,
2421            job_id,
2422            0,
2423            DistributionType::Hash,
2424            8,
2425            StreamingParallelism::Adaptive,
2426        );
2427
2428        // Job has custom strategy: BOUNDED(2)
2429        let job_model = streaming_job::Model {
2430            job_id,
2431            job_status: JobStatus::Created,
2432            create_type: CreateType::Foreground,
2433            timezone: None,
2434            config_override: None,
2435            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2436            parallelism: StreamingParallelism::Adaptive,
2437            backfill_parallelism: None,
2438            backfill_adaptive_parallelism_strategy: None,
2439            backfill_orders: None,
2440            max_parallelism: 8,
2441            specific_resource_group: None,
2442            is_serverless_backfill: false,
2443            refresh_interval_sec: None,
2444        };
2445
2446        let database_model = database::Model {
2447            database_id,
2448            name: "test_db".into(),
2449            resource_group: "default".into(),
2450            barrier_interval_ms: None,
2451            checkpoint_frequency: None,
2452        };
2453
2454        let ensembles = vec![NoShuffleEnsemble {
2455            entries: HashSet::from([fragment_id]),
2456            components: HashSet::from([fragment_id]),
2457        }];
2458
2459        let fragment_map =
2460            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2461        let job_map = HashMap::from([(job_id, job_model)]);
2462
2463        // 4 workers with 1 parallelism each = total 4 parallelism
2464        let worker_map = HashMap::from([
2465            (1.into(), build_worker_node(1, 1, "default")),
2466            (2.into(), build_worker_node(2, 1, "default")),
2467            (3.into(), build_worker_node(3, 1, "default")),
2468            (4.into(), build_worker_node(4, 1, "default")),
2469        ]);
2470
2471        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2472        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2473        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2474        let database_map = HashMap::from([(database_id, database_model)]);
2475
2476        let context = RenderActorsContext {
2477            fragment_source_ids: &fragment_source_ids,
2478            fragment_splits: &fragment_splits,
2479            streaming_job_databases: &streaming_job_databases,
2480            database_map: &database_map,
2481        };
2482
2483        // Global strategy is FULL (would give 4 actors), but job strategy is BOUNDED(2)
2484        let result = render_actors(
2485            &actor_id_counter,
2486            &ensembles,
2487            &fragment_map,
2488            &job_map,
2489            &worker_map,
2490            context,
2491        )
2492        .expect("actor rendering succeeds");
2493
2494        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2495        // Job strategy BOUNDED(2) should limit to 2 actors, not 4 (global FULL)
2496        assert_eq!(
2497            state.len(),
2498            2,
2499            "Job strategy BOUNDED(2) should override global FULL"
2500        );
2501    }
2502
2503    /// Test that adaptive jobs without a persisted strategy fall back to the default adaptive
2504    /// strategy instead of failing.
2505    #[test]
2506    fn render_actors_falls_back_to_default_when_adaptive_job_has_no_strategy() {
2507        let actor_id_counter = AtomicU32::new(0);
2508        let fragment_id: FragmentId = 1.into();
2509        let job_id: JobId = 101.into();
2510        let database_id: DatabaseId = DatabaseId::new(11);
2511
2512        let fragment_model = build_fragment(
2513            fragment_id,
2514            job_id,
2515            0,
2516            DistributionType::Hash,
2517            8,
2518            StreamingParallelism::Adaptive,
2519        );
2520
2521        // Job has NO custom strategy (None)
2522        let job_model = streaming_job::Model {
2523            job_id,
2524            job_status: JobStatus::Created,
2525            create_type: CreateType::Foreground,
2526            timezone: None,
2527            config_override: None,
2528            adaptive_parallelism_strategy: None, // No custom strategy
2529            parallelism: StreamingParallelism::Adaptive,
2530            backfill_parallelism: None,
2531            backfill_adaptive_parallelism_strategy: None,
2532            backfill_orders: None,
2533            max_parallelism: 8,
2534            specific_resource_group: None,
2535            is_serverless_backfill: false,
2536            refresh_interval_sec: None,
2537        };
2538
2539        let database_model = database::Model {
2540            database_id,
2541            name: "test_db".into(),
2542            resource_group: "default".into(),
2543            barrier_interval_ms: None,
2544            checkpoint_frequency: None,
2545        };
2546
2547        let ensembles = vec![NoShuffleEnsemble {
2548            entries: HashSet::from([fragment_id]),
2549            components: HashSet::from([fragment_id]),
2550        }];
2551
2552        let fragment_map =
2553            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2554        let job_map = HashMap::from([(job_id, job_model)]);
2555
2556        // 4 workers = total 4 parallelism
2557        let worker_map = HashMap::from([
2558            (1.into(), build_worker_node(1, 1, "default")),
2559            (2.into(), build_worker_node(2, 1, "default")),
2560            (3.into(), build_worker_node(3, 1, "default")),
2561            (4.into(), build_worker_node(4, 1, "default")),
2562        ]);
2563
2564        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2565        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2566        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2567        let database_map = HashMap::from([(database_id, database_model)]);
2568
2569        let context = RenderActorsContext {
2570            fragment_source_ids: &fragment_source_ids,
2571            fragment_splits: &fragment_splits,
2572            streaming_job_databases: &streaming_job_databases,
2573            database_map: &database_map,
2574        };
2575
2576        let result = render_actors(
2577            &actor_id_counter,
2578            &ensembles,
2579            &fragment_map,
2580            &job_map,
2581            &worker_map,
2582            context,
2583        )
2584        .expect("actor rendering succeeds");
2585
2586        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2587        assert_eq!(
2588            state.len(),
2589            4,
2590            "default adaptive strategy should use the full available parallelism"
2591        );
2592    }
2593
2594    /// Test that Fixed parallelism ignores strategy entirely.
2595    #[test]
2596    fn render_actors_fixed_parallelism_ignores_strategy() {
2597        let actor_id_counter = AtomicU32::new(0);
2598        let fragment_id: FragmentId = 1.into();
2599        let job_id: JobId = 102.into();
2600        let database_id: DatabaseId = DatabaseId::new(12);
2601
2602        // Fragment with FIXED parallelism
2603        let fragment_model = build_fragment(
2604            fragment_id,
2605            job_id,
2606            0,
2607            DistributionType::Hash,
2608            8,
2609            StreamingParallelism::Fixed(5),
2610        );
2611
2612        // Job has custom strategy, but it should be ignored for Fixed parallelism
2613        let job_model = streaming_job::Model {
2614            job_id,
2615            job_status: JobStatus::Created,
2616            create_type: CreateType::Foreground,
2617            timezone: None,
2618            config_override: None,
2619            adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2620            parallelism: StreamingParallelism::Fixed(5),
2621            backfill_parallelism: None,
2622            backfill_adaptive_parallelism_strategy: None,
2623            backfill_orders: None,
2624            max_parallelism: 8,
2625            specific_resource_group: None,
2626            is_serverless_backfill: false,
2627            refresh_interval_sec: None,
2628        };
2629
2630        let database_model = database::Model {
2631            database_id,
2632            name: "test_db".into(),
2633            resource_group: "default".into(),
2634            barrier_interval_ms: None,
2635            checkpoint_frequency: None,
2636        };
2637
2638        let ensembles = vec![NoShuffleEnsemble {
2639            entries: HashSet::from([fragment_id]),
2640            components: HashSet::from([fragment_id]),
2641        }];
2642
2643        let fragment_map =
2644            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2645        let job_map = HashMap::from([(job_id, job_model)]);
2646
2647        // 6 workers = total 6 parallelism
2648        let worker_map = HashMap::from([
2649            (1.into(), build_worker_node(1, 1, "default")),
2650            (2.into(), build_worker_node(2, 1, "default")),
2651            (3.into(), build_worker_node(3, 1, "default")),
2652            (4.into(), build_worker_node(4, 1, "default")),
2653            (5.into(), build_worker_node(5, 1, "default")),
2654            (6.into(), build_worker_node(6, 1, "default")),
2655        ]);
2656
2657        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2658        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2659        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2660        let database_map = HashMap::from([(database_id, database_model)]);
2661
2662        let context = RenderActorsContext {
2663            fragment_source_ids: &fragment_source_ids,
2664            fragment_splits: &fragment_splits,
2665            streaming_job_databases: &streaming_job_databases,
2666            database_map: &database_map,
2667        };
2668
2669        let result = render_actors(
2670            &actor_id_counter,
2671            &ensembles,
2672            &fragment_map,
2673            &job_map,
2674            &worker_map,
2675            context,
2676        )
2677        .expect("actor rendering succeeds");
2678
2679        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2680        // Fixed(5) should be used, ignoring both job strategy BOUNDED(2) and global FULL
2681        assert_eq!(
2682            state.len(),
2683            5,
2684            "Fixed parallelism should ignore all strategies"
2685        );
2686    }
2687
2688    /// Test RATIO strategy calculation.
2689    #[test]
2690    fn render_actors_ratio_strategy() {
2691        let actor_id_counter = AtomicU32::new(0);
2692        let fragment_id: FragmentId = 1.into();
2693        let job_id: JobId = 103.into();
2694        let database_id: DatabaseId = DatabaseId::new(13);
2695
2696        let fragment_model = build_fragment(
2697            fragment_id,
2698            job_id,
2699            0,
2700            DistributionType::Hash,
2701            16,
2702            StreamingParallelism::Adaptive,
2703        );
2704
2705        // Job has RATIO(0.5) strategy
2706        let job_model = streaming_job::Model {
2707            job_id,
2708            job_status: JobStatus::Created,
2709            create_type: CreateType::Foreground,
2710            timezone: None,
2711            config_override: None,
2712            adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2713            parallelism: StreamingParallelism::Adaptive,
2714            backfill_parallelism: None,
2715            backfill_adaptive_parallelism_strategy: None,
2716            backfill_orders: None,
2717            max_parallelism: 16,
2718            specific_resource_group: None,
2719            is_serverless_backfill: false,
2720            refresh_interval_sec: None,
2721        };
2722
2723        let database_model = database::Model {
2724            database_id,
2725            name: "test_db".into(),
2726            resource_group: "default".into(),
2727            barrier_interval_ms: None,
2728            checkpoint_frequency: None,
2729        };
2730
2731        let ensembles = vec![NoShuffleEnsemble {
2732            entries: HashSet::from([fragment_id]),
2733            components: HashSet::from([fragment_id]),
2734        }];
2735
2736        let fragment_map =
2737            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2738        let job_map = HashMap::from([(job_id, job_model)]);
2739
2740        // 8 workers = total 8 parallelism
2741        let worker_map = HashMap::from([
2742            (1.into(), build_worker_node(1, 1, "default")),
2743            (2.into(), build_worker_node(2, 1, "default")),
2744            (3.into(), build_worker_node(3, 1, "default")),
2745            (4.into(), build_worker_node(4, 1, "default")),
2746            (5.into(), build_worker_node(5, 1, "default")),
2747            (6.into(), build_worker_node(6, 1, "default")),
2748            (7.into(), build_worker_node(7, 1, "default")),
2749            (8.into(), build_worker_node(8, 1, "default")),
2750        ]);
2751
2752        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2753        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2754        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2755        let database_map = HashMap::from([(database_id, database_model)]);
2756
2757        let context = RenderActorsContext {
2758            fragment_source_ids: &fragment_source_ids,
2759            fragment_splits: &fragment_splits,
2760            streaming_job_databases: &streaming_job_databases,
2761            database_map: &database_map,
2762        };
2763
2764        let result = render_actors(
2765            &actor_id_counter,
2766            &ensembles,
2767            &fragment_map,
2768            &job_map,
2769            &worker_map,
2770            context,
2771        )
2772        .expect("actor rendering succeeds");
2773
2774        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2775        // RATIO(0.5) of 8 = 4
2776        assert_eq!(
2777            state.len(),
2778            4,
2779            "RATIO(0.5) of 8 workers should give 4 actors"
2780        );
2781    }
2782
2783    #[test]
2784    fn render_actors_backfill_strategy_overrides_job_strategy() {
2785        let actor_id_counter = AtomicU32::new(0);
2786        let fragment_id: FragmentId = 1.into();
2787        let job_id: JobId = 104.into();
2788        let database_id: DatabaseId = DatabaseId::new(14);
2789
2790        let fragment_model = build_fragment(
2791            fragment_id,
2792            job_id,
2793            0,
2794            DistributionType::Hash,
2795            16,
2796            StreamingParallelism::Adaptive,
2797        );
2798
2799        let job_model = streaming_job::Model {
2800            job_id,
2801            job_status: JobStatus::Creating,
2802            create_type: CreateType::Background,
2803            timezone: None,
2804            config_override: None,
2805            adaptive_parallelism_strategy: Some("BOUNDED(4)".to_owned()),
2806            parallelism: StreamingParallelism::Adaptive,
2807            backfill_parallelism: Some(StreamingParallelism::Adaptive),
2808            backfill_adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2809            backfill_orders: None,
2810            max_parallelism: 16,
2811            specific_resource_group: None,
2812            is_serverless_backfill: false,
2813            refresh_interval_sec: None,
2814        };
2815
2816        let database_model = database::Model {
2817            database_id,
2818            name: "test_db".into(),
2819            resource_group: "default".into(),
2820            barrier_interval_ms: None,
2821            checkpoint_frequency: None,
2822        };
2823
2824        let ensembles = vec![NoShuffleEnsemble {
2825            entries: HashSet::from([fragment_id]),
2826            components: HashSet::from([fragment_id]),
2827        }];
2828
2829        let fragment_map =
2830            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2831        let job_map = HashMap::from([(job_id, job_model)]);
2832
2833        let worker_map = HashMap::from([
2834            (1.into(), build_worker_node(1, 1, "default")),
2835            (2.into(), build_worker_node(2, 1, "default")),
2836            (3.into(), build_worker_node(3, 1, "default")),
2837            (4.into(), build_worker_node(4, 1, "default")),
2838        ]);
2839
2840        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2841        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2842        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2843        let database_map = HashMap::from([(database_id, database_model)]);
2844        let context = RenderActorsContext {
2845            fragment_source_ids: &fragment_source_ids,
2846            fragment_splits: &fragment_splits,
2847            streaming_job_databases: &streaming_job_databases,
2848            database_map: &database_map,
2849        };
2850
2851        let result = render_actors(
2852            &actor_id_counter,
2853            &ensembles,
2854            &fragment_map,
2855            &job_map,
2856            &worker_map,
2857            context,
2858        )
2859        .expect("actor rendering succeeds");
2860
2861        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2862        assert_eq!(
2863            state.len(),
2864            2,
2865            "Backfill strategy BOUNDED(2) should override the job strategy during backfill"
2866        );
2867    }
2868
2869    #[test]
2870    fn render_actors_created_job_ignores_backfill_strategy() {
2871        let actor_id_counter = AtomicU32::new(0);
2872        let fragment_id: FragmentId = 1.into();
2873        let job_id: JobId = 105.into();
2874        let database_id: DatabaseId = DatabaseId::new(15);
2875
2876        let fragment_model = build_fragment(
2877            fragment_id,
2878            job_id,
2879            0,
2880            DistributionType::Hash,
2881            16,
2882            StreamingParallelism::Adaptive,
2883        );
2884
2885        let job_model = streaming_job::Model {
2886            job_id,
2887            job_status: JobStatus::Created,
2888            create_type: CreateType::Foreground,
2889            timezone: None,
2890            config_override: None,
2891            adaptive_parallelism_strategy: Some("BOUNDED(4)".to_owned()),
2892            parallelism: StreamingParallelism::Adaptive,
2893            backfill_parallelism: Some(StreamingParallelism::Adaptive),
2894            backfill_adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2895            backfill_orders: None,
2896            max_parallelism: 16,
2897            specific_resource_group: None,
2898            is_serverless_backfill: false,
2899            refresh_interval_sec: None,
2900        };
2901
2902        let database_model = database::Model {
2903            database_id,
2904            name: "test_db".into(),
2905            resource_group: "default".into(),
2906            barrier_interval_ms: None,
2907            checkpoint_frequency: None,
2908        };
2909
2910        let ensembles = vec![NoShuffleEnsemble {
2911            entries: HashSet::from([fragment_id]),
2912            components: HashSet::from([fragment_id]),
2913        }];
2914
2915        let fragment_map =
2916            HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2917        let job_map = HashMap::from([(job_id, job_model)]);
2918
2919        let worker_map = HashMap::from([
2920            (1.into(), build_worker_node(1, 1, "default")),
2921            (2.into(), build_worker_node(2, 1, "default")),
2922            (3.into(), build_worker_node(3, 1, "default")),
2923            (4.into(), build_worker_node(4, 1, "default")),
2924        ]);
2925
2926        let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2927        let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2928        let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2929        let database_map = HashMap::from([(database_id, database_model)]);
2930        let context = RenderActorsContext {
2931            fragment_source_ids: &fragment_source_ids,
2932            fragment_splits: &fragment_splits,
2933            streaming_job_databases: &streaming_job_databases,
2934            database_map: &database_map,
2935        };
2936
2937        let result = render_actors(
2938            &actor_id_counter,
2939            &ensembles,
2940            &fragment_map,
2941            &job_map,
2942            &worker_map,
2943            context,
2944        )
2945        .expect("actor rendering succeeds");
2946
2947        let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2948        assert_eq!(
2949            state.len(),
2950            4,
2951            "created jobs should use the main job strategy before any backfill override applies"
2952        );
2953    }
2954}