1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23#[cfg(debug_assertions)]
24use risingwave_common::hash::VnodeBitmapExt;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_meta_model::prelude::{
32 Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
33};
34use risingwave_meta_model::{
35 DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism, WorkerId,
36 database, fragment, fragment_relation, fragment_splits, object, sink, source, streaming_job,
37 table,
38};
39use risingwave_meta_model_migration::Condition;
40use risingwave_pb::common::WorkerNode;
41use risingwave_pb::stream_plan::PbStreamNode;
42use sea_orm::{
43 ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
44 RelationTrait,
45};
46
47use crate::MetaResult;
48use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
49use crate::controller::utils::resolve_no_shuffle_actor_mapping;
50use crate::manager::ActiveStreamingWorkerNodes;
51use crate::model::{ActorId, StreamActor};
52use crate::stream::{AssignerBuilder, SplitDiffOptions};
53
54pub(crate) async fn resolve_streaming_job_definition<C>(
55 txn: &C,
56 job_ids: &HashSet<JobId>,
57) -> MetaResult<HashMap<JobId, String>>
58where
59 C: ConnectionTrait,
60{
61 let job_ids = job_ids.iter().cloned().collect_vec();
62
63 let common_job_definitions: Vec<(JobId, String)> = Table::find()
65 .select_only()
66 .columns([
67 table::Column::TableId,
68 #[cfg(not(debug_assertions))]
69 table::Column::Name,
70 #[cfg(debug_assertions)]
71 table::Column::Definition,
72 ])
73 .filter(table::Column::TableId.is_in(job_ids.clone()))
74 .into_tuple()
75 .all(txn)
76 .await?;
77
78 let sink_definitions: Vec<(JobId, String)> = Sink::find()
79 .select_only()
80 .columns([
81 sink::Column::SinkId,
82 #[cfg(not(debug_assertions))]
83 sink::Column::Name,
84 #[cfg(debug_assertions)]
85 sink::Column::Definition,
86 ])
87 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
88 .into_tuple()
89 .all(txn)
90 .await?;
91
92 let source_definitions: Vec<(JobId, String)> = Source::find()
93 .select_only()
94 .columns([
95 source::Column::SourceId,
96 #[cfg(not(debug_assertions))]
97 source::Column::Name,
98 #[cfg(debug_assertions)]
99 source::Column::Definition,
100 ])
101 .filter(source::Column::SourceId.is_in(job_ids.clone()))
102 .into_tuple()
103 .all(txn)
104 .await?;
105
106 let definitions: HashMap<JobId, String> = common_job_definitions
107 .into_iter()
108 .chain(sink_definitions.into_iter())
109 .chain(source_definitions.into_iter())
110 .collect();
111
112 Ok(definitions)
113}
114
115pub async fn load_fragment_info<C>(
116 txn: &C,
117 actor_id_counter: &AtomicU32,
118 database_id: Option<DatabaseId>,
119 worker_nodes: &ActiveStreamingWorkerNodes,
120) -> MetaResult<FragmentRenderMap>
121where
122 C: ConnectionTrait,
123{
124 let mut query = StreamingJob::find()
125 .select_only()
126 .column(streaming_job::Column::JobId);
127
128 if let Some(database_id) = database_id {
129 query = query
130 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
131 .filter(object::Column::DatabaseId.eq(database_id));
132 }
133
134 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
135
136 if jobs.is_empty() {
137 return Ok(HashMap::new());
138 }
139
140 let jobs: HashSet<JobId> = jobs.into_iter().collect();
141
142 let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
143
144 if loaded.is_empty() {
145 return Ok(HashMap::new());
146 }
147
148 let RenderedGraph { fragments, .. } =
149 render_actor_assignments(actor_id_counter, worker_nodes.current(), &loaded)?;
150
151 Ok(fragments)
152}
153
154#[derive(Debug)]
155pub struct TargetResourcePolicy {
156 pub resource_group: Option<String>,
157 pub parallelism: StreamingParallelism,
158}
159
160#[derive(Debug, Clone)]
161pub struct WorkerInfo {
162 pub parallelism: NonZeroUsize,
163 pub resource_group: Option<String>,
164}
165
166pub type FragmentRenderMap =
167 HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
168
169#[derive(Default)]
170pub struct RenderedGraph {
171 pub fragments: FragmentRenderMap,
172 pub ensembles: Vec<NoShuffleEnsemble>,
173}
174
175impl RenderedGraph {
176 pub fn empty() -> Self {
177 Self::default()
178 }
179}
180
181#[derive(Clone, Debug)]
185pub struct LoadedFragment {
186 pub fragment_id: FragmentId,
187 pub job_id: JobId,
188 pub fragment_type_mask: FragmentTypeMask,
189 pub distribution_type: DistributionType,
190 pub vnode_count: usize,
191 pub nodes: PbStreamNode,
192 pub state_table_ids: HashSet<TableId>,
193 pub parallelism: Option<StreamingParallelism>,
194}
195
196impl From<fragment::Model> for LoadedFragment {
197 fn from(model: fragment::Model) -> Self {
198 Self {
199 fragment_id: model.fragment_id,
200 job_id: model.job_id,
201 fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
202 distribution_type: model.distribution_type,
203 vnode_count: model.vnode_count as usize,
204 nodes: model.stream_node.to_protobuf(),
205 state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
206 parallelism: model.parallelism,
207 }
208 }
209}
210
211#[derive(Default, Debug, Clone)]
212pub struct LoadedFragmentContext {
213 pub ensembles: Vec<NoShuffleEnsemble>,
214 pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
215 pub job_map: HashMap<JobId, streaming_job::Model>,
216 pub streaming_job_databases: HashMap<JobId, DatabaseId>,
217 pub database_map: HashMap<DatabaseId, database::Model>,
218 pub fragment_source_ids: HashMap<FragmentId, SourceId>,
219 pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
220}
221
222impl LoadedFragmentContext {
223 pub fn is_empty(&self) -> bool {
224 if self.ensembles.is_empty() {
225 assert!(
226 self.job_fragments.is_empty(),
227 "non-empty job fragments for empty ensembles: {:?}",
228 self.job_fragments
229 );
230 true
231 } else {
232 false
233 }
234 }
235
236 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
237 let job_ids: HashSet<JobId> = self
238 .streaming_job_databases
239 .iter()
240 .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
241 .collect();
242
243 if job_ids.is_empty() {
244 return None;
245 }
246
247 let job_fragments: HashMap<_, _> = job_ids
248 .iter()
249 .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
250 .collect();
251
252 let fragment_ids: HashSet<_> = job_fragments
253 .values()
254 .flat_map(|fragments| fragments.keys().copied())
255 .collect();
256
257 assert!(
258 !fragment_ids.is_empty(),
259 "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
260 );
261
262 let ensembles: Vec<NoShuffleEnsemble> = self
263 .ensembles
264 .iter()
265 .filter(|ensemble| {
266 if ensemble
267 .components
268 .iter()
269 .any(|fragment_id| fragment_ids.contains(fragment_id))
270 {
271 assert!(
272 ensemble
273 .components
274 .iter()
275 .all(|fragment_id| fragment_ids.contains(fragment_id)),
276 "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
277 );
278 true
279 } else {
280 false
281 }
282 })
283 .cloned()
284 .collect();
285
286 assert!(
287 !ensembles.is_empty(),
288 "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
289 );
290
291 let job_map = job_ids
292 .iter()
293 .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
294 .collect();
295
296 let streaming_job_databases = job_ids
297 .iter()
298 .filter_map(|job_id| {
299 self.streaming_job_databases
300 .get(job_id)
301 .map(|db_id| (*job_id, *db_id))
302 })
303 .collect();
304
305 let database_model = self.database_map[&database_id].clone();
306 let database_map = HashMap::from([(database_id, database_model)]);
307
308 let fragment_source_ids = self
309 .fragment_source_ids
310 .iter()
311 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
312 .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
313 .collect();
314
315 let fragment_splits = self
316 .fragment_splits
317 .iter()
318 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
319 .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
320 .collect();
321
322 Some(Self {
323 ensembles,
324 job_fragments,
325 job_map,
326 streaming_job_databases,
327 database_map,
328 fragment_source_ids,
329 fragment_splits,
330 })
331 }
332
333 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
336 let Self {
337 ensembles,
338 mut job_fragments,
339 mut job_map,
340 streaming_job_databases,
341 mut database_map,
342 mut fragment_source_ids,
343 mut fragment_splits,
344 } = self;
345
346 let mut contexts = HashMap::<DatabaseId, Self>::new();
347 let mut fragment_databases = HashMap::<FragmentId, DatabaseId>::new();
348 let mut unresolved_ensembles = 0usize;
349 let mut unresolved_ensemble_sample: Option<Vec<FragmentId>> = None;
350
351 for (job_id, database_id) in streaming_job_databases {
352 let context = contexts.entry(database_id).or_insert_with(|| {
353 let database_model = database_map
354 .remove(&database_id)
355 .expect("database should exist for streaming job");
356 Self {
357 ensembles: Vec::new(),
358 job_fragments: HashMap::new(),
359 job_map: HashMap::new(),
360 streaming_job_databases: HashMap::new(),
361 database_map: HashMap::from([(database_id, database_model)]),
362 fragment_source_ids: HashMap::new(),
363 fragment_splits: HashMap::new(),
364 }
365 });
366
367 let fragments = job_fragments
368 .remove(&job_id)
369 .expect("job fragments should exist for streaming job");
370 for fragment_id in fragments.keys().copied() {
371 fragment_databases.insert(fragment_id, database_id);
372 if let Some(source_id) = fragment_source_ids.remove(&fragment_id) {
373 context.fragment_source_ids.insert(fragment_id, source_id);
374 }
375 if let Some(splits) = fragment_splits.remove(&fragment_id) {
376 context.fragment_splits.insert(fragment_id, splits);
377 }
378 }
379
380 assert!(
381 context
382 .job_map
383 .insert(
384 job_id,
385 job_map
386 .remove(&job_id)
387 .expect("streaming job should exist for loaded context"),
388 )
389 .is_none(),
390 "duplicated streaming job"
391 );
392 assert!(
393 context.job_fragments.insert(job_id, fragments).is_none(),
394 "duplicated job fragments"
395 );
396 assert!(
397 context
398 .streaming_job_databases
399 .insert(job_id, database_id)
400 .is_none(),
401 "duplicated job database mapping"
402 );
403 }
404
405 for ensemble in ensembles {
406 let Some(database_id) = ensemble
407 .components
408 .iter()
409 .find_map(|fragment_id| fragment_databases.get(fragment_id).copied())
410 else {
411 unresolved_ensembles += 1;
412 if unresolved_ensemble_sample.is_none() {
413 unresolved_ensemble_sample =
414 Some(ensemble.components.iter().copied().collect());
415 }
416 continue;
417 };
418
419 debug_assert!(
420 ensemble
421 .components
422 .iter()
423 .all(|fragment_id| fragment_databases.get(fragment_id) == Some(&database_id)),
424 "ensemble {ensemble:?} should belong to a single database"
425 );
426
427 contexts
428 .get_mut(&database_id)
429 .expect("database context should exist for ensemble")
430 .ensembles
431 .push(ensemble);
432 }
433
434 if unresolved_ensembles > 0 {
435 tracing::warn!(
436 unresolved_ensembles,
437 ?unresolved_ensemble_sample,
438 known_fragments = fragment_databases.len(),
439 "skip ensembles without resolved database while splitting loaded context"
440 );
441 }
442 debug_assert_eq!(
443 unresolved_ensembles, 0,
444 "all ensembles should be mappable to a database"
445 );
446
447 contexts
448 }
449}
450
451pub async fn load_fragment_context<C>(
454 txn: &C,
455 ensembles: Vec<NoShuffleEnsemble>,
456) -> MetaResult<LoadedFragmentContext>
457where
458 C: ConnectionTrait,
459{
460 if ensembles.is_empty() {
461 return Ok(LoadedFragmentContext::default());
462 }
463
464 let required_fragment_ids: HashSet<_> = ensembles
465 .iter()
466 .flat_map(|ensemble| ensemble.components.iter().copied())
467 .collect();
468
469 let fragment_models = Fragment::find()
470 .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
471 .all(txn)
472 .await?;
473
474 let found_fragment_ids: HashSet<_> = fragment_models
475 .iter()
476 .map(|fragment| fragment.fragment_id)
477 .collect();
478
479 if found_fragment_ids.len() != required_fragment_ids.len() {
480 let missing = required_fragment_ids
481 .difference(&found_fragment_ids)
482 .copied()
483 .collect_vec();
484 return Err(anyhow!("fragments {:?} not found", missing).into());
485 }
486
487 let fragment_models: HashMap<_, _> = fragment_models
488 .into_iter()
489 .map(|fragment| (fragment.fragment_id, fragment))
490 .collect();
491
492 let job_ids: HashSet<_> = fragment_models
493 .values()
494 .map(|fragment| fragment.job_id)
495 .collect();
496
497 if job_ids.is_empty() {
498 return Ok(LoadedFragmentContext::default());
499 }
500
501 let jobs: HashMap<_, _> = StreamingJob::find()
502 .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
503 .all(txn)
504 .await?
505 .into_iter()
506 .map(|job| (job.job_id, job))
507 .collect();
508
509 let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
510 if found_job_ids.len() != job_ids.len() {
511 let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
512 return Err(anyhow!("streaming jobs {:?} not found", missing).into());
513 }
514
515 build_loaded_context(txn, ensembles, fragment_models, jobs).await
516}
517
518pub async fn load_fragment_context_for_jobs<C>(
521 txn: &C,
522 job_ids: HashSet<JobId>,
523) -> MetaResult<LoadedFragmentContext>
524where
525 C: ConnectionTrait,
526{
527 if job_ids.is_empty() {
528 return Ok(LoadedFragmentContext::default());
529 }
530
531 let excluded_fragments_query = FragmentRelation::find()
532 .select_only()
533 .column(fragment_relation::Column::TargetFragmentId)
534 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
535 .into_query();
536
537 let condition = Condition::all()
538 .add(fragment::Column::JobId.is_in(job_ids.clone()))
539 .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
540
541 let fragments: Vec<FragmentId> = Fragment::find()
542 .select_only()
543 .column(fragment::Column::FragmentId)
544 .filter(condition)
545 .into_tuple()
546 .all(txn)
547 .await?;
548
549 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
550
551 let fragments = Fragment::find()
552 .filter(
553 fragment::Column::FragmentId.is_in(
554 ensembles
555 .iter()
556 .flat_map(|graph| graph.components.iter())
557 .cloned()
558 .collect_vec(),
559 ),
560 )
561 .all(txn)
562 .await?;
563
564 let fragment_map: HashMap<_, _> = fragments
565 .into_iter()
566 .map(|fragment| (fragment.fragment_id, fragment))
567 .collect();
568
569 let job_ids = fragment_map
570 .values()
571 .map(|fragment| fragment.job_id)
572 .collect::<BTreeSet<_>>()
573 .into_iter()
574 .collect_vec();
575
576 let jobs: HashMap<_, _> = StreamingJob::find()
577 .filter(streaming_job::Column::JobId.is_in(job_ids))
578 .all(txn)
579 .await?
580 .into_iter()
581 .map(|job| (job.job_id, job))
582 .collect();
583
584 build_loaded_context(txn, ensembles, fragment_map, jobs).await
585}
586
587pub(crate) fn render_actor_assignments(
590 actor_id_counter: &AtomicU32,
591 worker_map: &HashMap<WorkerId, WorkerNode>,
592 loaded: &LoadedFragmentContext,
593) -> MetaResult<RenderedGraph> {
594 let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
595 render_actor_assignments_with_allocator(&mut actor_id_allocator, worker_map, loaded)
596}
597
598pub(crate) fn preview_actor_assignments(
601 worker_map: &HashMap<WorkerId, WorkerNode>,
602 loaded: &LoadedFragmentContext,
603) -> MetaResult<RenderedGraph> {
604 let mut actor_id_allocator = RenderActorIdAllocator::Preview { next_actor_id: 0 };
605 render_actor_assignments_with_allocator(&mut actor_id_allocator, worker_map, loaded)
606}
607
608pub(crate) fn materialize_actor_assignments(
611 actor_id_counter: &AtomicU32,
612 rendered: RenderedGraph,
613) -> RenderedGraph {
614 let RenderedGraph {
615 fragments,
616 ensembles,
617 } = rendered;
618 #[cfg(debug_assertions)]
619 let preview_slots = collect_relative_actor_slots_by_fragment(&fragments);
620
621 let mut ordered_fragments = fragments
622 .into_iter()
623 .flat_map(|(database_id, jobs)| {
624 jobs.into_iter().flat_map(move |(job_id, fragments)| {
625 fragments.into_iter().map(move |(fragment_id, fragment)| {
626 (database_id, job_id, fragment_id, fragment)
627 })
628 })
629 })
630 .collect_vec();
631 ordered_fragments.sort_by_key(|(_, _, fragment_id, fragment)| {
642 (
643 fragment
644 .actors
645 .keys()
646 .copied()
647 .min()
648 .map(|actor_id| actor_id.as_raw_id())
649 .unwrap_or(u32::MAX),
650 *fragment_id,
651 )
652 });
653
654 let mut materialized = FragmentRenderMap::new();
655 for (database_id, job_id, fragment_id, fragment) in ordered_fragments {
656 materialized
657 .entry(database_id)
658 .or_default()
659 .entry(job_id)
660 .or_default()
661 .insert(
662 fragment_id,
663 materialize_fragment(fragment, actor_id_counter),
664 );
665 }
666
667 #[cfg(debug_assertions)]
668 assert_materialization_preserves_preview_slots(&preview_slots, &materialized);
669
670 RenderedGraph {
671 fragments: materialized,
672 ensembles,
673 }
674}
675
676#[cfg(debug_assertions)]
677type RelativeActorSlot = (u32, WorkerId, Option<Vec<usize>>, Vec<SplitId>);
678
679#[cfg(debug_assertions)]
680fn collect_relative_actor_slots(fragment: &InflightFragmentInfo) -> Vec<RelativeActorSlot> {
681 let base = fragment
682 .actors
683 .keys()
684 .copied()
685 .min()
686 .map(|actor_id| actor_id.as_raw_id())
687 .unwrap_or_default();
688
689 let mut entries = fragment
690 .actors
691 .iter()
692 .map(|(&actor_id, info)| {
693 let idx = actor_id.as_raw_id() - base;
694 let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
695 bitmap
696 .iter_vnodes()
697 .map(|vnode| vnode.to_index())
698 .collect_vec()
699 });
700 let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
701 splits.sort_unstable();
702 (idx, info.worker_id, vnode_bitmap, splits)
703 })
704 .collect_vec();
705 entries.sort_unstable_by_key(|(idx, ..)| *idx);
706 entries
707}
708
709#[cfg(debug_assertions)]
710fn collect_relative_actor_slots_by_fragment(
711 fragments: &FragmentRenderMap,
712) -> HashMap<FragmentId, Vec<RelativeActorSlot>> {
713 fragments
714 .values()
715 .flat_map(|jobs| jobs.values())
716 .flatten()
717 .map(|(fragment_id, fragment)| (*fragment_id, collect_relative_actor_slots(fragment)))
718 .collect()
719}
720
721#[cfg(debug_assertions)]
722fn assert_materialization_preserves_preview_slots(
723 preview_slots: &HashMap<FragmentId, Vec<RelativeActorSlot>>,
724 materialized_fragments: &FragmentRenderMap,
725) {
726 for (database_id, jobs) in materialized_fragments {
727 for (job_id, fragments) in jobs {
728 for (fragment_id, fragment) in fragments {
729 let expected_slots = preview_slots.get(fragment_id).unwrap_or_else(|| {
730 panic!(
731 "preview fragment {fragment_id} for database {database_id}, job {job_id} not found"
732 )
733 });
734
735 assert_eq!(
736 collect_relative_actor_slots(fragment),
737 *expected_slots,
738 "materialization changed preview slot ordering for database {database_id}, job {job_id}, fragment {fragment_id}"
739 );
740 }
741 }
742 }
743}
744
745fn render_actor_assignments_with_allocator(
746 actor_id_allocator: &mut RenderActorIdAllocator<'_>,
747 worker_map: &HashMap<WorkerId, WorkerNode>,
748 loaded: &LoadedFragmentContext,
749) -> MetaResult<RenderedGraph> {
750 if loaded.is_empty() {
751 return Ok(RenderedGraph::empty());
752 }
753
754 let render_context = RenderActorsContext {
755 fragment_source_ids: &loaded.fragment_source_ids,
756 fragment_splits: &loaded.fragment_splits,
757 streaming_job_databases: &loaded.streaming_job_databases,
758 database_map: &loaded.database_map,
759 };
760
761 let fragments = render_actors_with_allocator(
762 actor_id_allocator,
763 &loaded.ensembles,
764 &loaded.job_fragments,
765 &loaded.job_map,
766 worker_map,
767 render_context,
768 )?;
769
770 Ok(RenderedGraph {
771 fragments,
772 ensembles: loaded.ensembles.clone(),
773 })
774}
775
776fn materialize_fragment(
777 mut fragment: InflightFragmentInfo,
778 actor_id_counter: &AtomicU32,
779) -> InflightFragmentInfo {
780 let actor_count = u32::try_from(fragment.actors.len()).expect("actor count exceeds u32::MAX");
781 let actor_id_base: ActorId = actor_id_counter
782 .fetch_add(actor_count, Ordering::Relaxed)
783 .into();
784
785 let mut actors = fragment.actors.into_iter().collect_vec();
786 actors.sort_by_key(|(actor_id, _)| *actor_id);
787
788 fragment.actors = actors
789 .into_iter()
790 .enumerate()
791 .map(|(idx, (_, info))| {
792 let actor_offset = u32::try_from(idx).expect("actor index exceeds u32::MAX");
793 (actor_id_base + actor_offset, info)
794 })
795 .collect();
796
797 fragment
798}
799
800async fn build_loaded_context<C>(
801 txn: &C,
802 ensembles: Vec<NoShuffleEnsemble>,
803 fragment_models: HashMap<FragmentId, fragment::Model>,
804 job_map: HashMap<JobId, streaming_job::Model>,
805) -> MetaResult<LoadedFragmentContext>
806where
807 C: ConnectionTrait,
808{
809 if ensembles.is_empty() {
810 return Ok(LoadedFragmentContext::default());
811 }
812
813 let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
814 for (fragment_id, model) in fragment_models {
815 job_fragments
816 .entry(model.job_id)
817 .or_default()
818 .try_insert(fragment_id, LoadedFragment::from(model))
819 .expect("duplicate fragment id for job");
820 }
821
822 #[cfg(debug_assertions)]
823 {
824 debug_sanity_check(&ensembles, &job_fragments, &job_map);
825 }
826
827 let (fragment_source_ids, fragment_splits) =
828 resolve_source_fragments(txn, &job_fragments).await?;
829
830 let job_ids = job_map.keys().copied().collect_vec();
831
832 let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
833 .select_only()
834 .column(streaming_job::Column::JobId)
835 .column(object::Column::DatabaseId)
836 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
837 .filter(streaming_job::Column::JobId.is_in(job_ids))
838 .into_tuple()
839 .all(txn)
840 .await?
841 .into_iter()
842 .collect();
843
844 let database_map: HashMap<_, _> = Database::find()
845 .filter(
846 database::Column::DatabaseId
847 .is_in(streaming_job_databases.values().copied().collect_vec()),
848 )
849 .all(txn)
850 .await?
851 .into_iter()
852 .map(|db| (db.database_id, db))
853 .collect();
854
855 Ok(LoadedFragmentContext {
856 ensembles,
857 job_fragments,
858 job_map,
859 streaming_job_databases,
860 database_map,
861 fragment_source_ids,
862 fragment_splits,
863 })
864}
865
866struct RenderActorsContext<'a> {
869 fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
870 fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
871 streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
872 database_map: &'a HashMap<DatabaseId, database::Model>,
873}
874
875enum RenderActorIdAllocator<'a> {
876 Persistent(&'a AtomicU32),
877 Preview { next_actor_id: u32 },
878}
879
880impl RenderActorIdAllocator<'_> {
881 fn allocate_block(&mut self, actor_count: u32) -> ActorId {
882 match self {
883 Self::Persistent(actor_id_counter) => {
884 let actor_id_base: ActorId = actor_id_counter
885 .fetch_add(actor_count, Ordering::Relaxed)
886 .into();
887 actor_id_base
888 }
889 Self::Preview { next_actor_id } => {
890 let actor_id_base = *next_actor_id;
891 *next_actor_id = next_actor_id
892 .checked_add(actor_count)
893 .expect("preview actor id overflow");
894 let actor_id_base: ActorId = actor_id_base.into();
895 actor_id_base
896 }
897 }
898 }
899}
900
901#[cfg(test)]
902fn render_actors(
903 actor_id_counter: &AtomicU32,
904 ensembles: &[NoShuffleEnsemble],
905 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
906 job_map: &HashMap<JobId, streaming_job::Model>,
907 worker_map: &HashMap<WorkerId, WorkerNode>,
908 context: RenderActorsContext<'_>,
909) -> MetaResult<FragmentRenderMap> {
910 let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
911 render_actors_with_allocator(
912 &mut actor_id_allocator,
913 ensembles,
914 job_fragments,
915 job_map,
916 worker_map,
917 context,
918 )
919}
920
921fn render_actors_with_allocator(
922 actor_id_allocator: &mut RenderActorIdAllocator<'_>,
923 ensembles: &[NoShuffleEnsemble],
924 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
925 job_map: &HashMap<JobId, streaming_job::Model>,
926 worker_map: &HashMap<WorkerId, WorkerNode>,
927 context: RenderActorsContext<'_>,
928) -> MetaResult<FragmentRenderMap> {
929 let RenderActorsContext {
930 fragment_source_ids,
931 fragment_splits: fragment_splits_map,
932 streaming_job_databases,
933 database_map,
934 } = context;
935
936 let mut all_fragments: FragmentRenderMap = HashMap::new();
937 let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
938 .values()
939 .flat_map(|fragments| fragments.iter())
940 .map(|(fragment_id, fragment)| (*fragment_id, fragment))
941 .collect();
942
943 for NoShuffleEnsemble {
944 entries,
945 components,
946 } in ensembles
947 {
948 tracing::debug!("rendering ensemble entries {:?}", entries);
949
950 let entry_fragments = entries
951 .iter()
952 .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
953 .collect_vec();
954
955 let entry_fragment_parallelism = entry_fragments
956 .iter()
957 .map(|fragment| fragment.parallelism.clone())
958 .dedup()
959 .exactly_one()
960 .map_err(|_| {
961 anyhow!(
962 "entry fragments {:?} have inconsistent parallelism settings",
963 entries.iter().copied().collect_vec()
964 )
965 })?;
966
967 let (job_id, distribution_type, vnode_count) = entry_fragments
968 .iter()
969 .map(|f| (f.job_id, f.distribution_type, f.vnode_count))
970 .dedup()
971 .exactly_one()
972 .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
973
974 let job = job_map
975 .get(&job_id)
976 .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
977
978 let database_resource_group = streaming_job_databases
979 .get(&job_id)
980 .and_then(|database_id| database_map.get(database_id))
981 .unwrap()
982 .resource_group
983 .clone();
984
985 let source_entry_fragment = entry_fragments.iter().find(|f| {
986 let mask = f.fragment_type_mask;
987 if mask.contains(FragmentTypeFlag::Source) {
988 assert!(!mask.contains(FragmentTypeFlag::SourceScan))
989 }
990 mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
991 });
992
993 let actor_template = EnsembleActorTemplate::render_new(
994 job,
995 worker_map,
996 entry_fragment_parallelism,
997 database_resource_group,
998 distribution_type,
999 vnode_count,
1000 )?;
1001
1002 let source_splits = match source_entry_fragment {
1003 Some(entry_fragment) => {
1004 let source_id = fragment_source_ids
1005 .get(&entry_fragment.fragment_id)
1006 .ok_or_else(|| {
1007 anyhow!(
1008 "missing source id in source fragment {}",
1009 entry_fragment.fragment_id
1010 )
1011 })?;
1012
1013 let entry_fragment_id = entry_fragment.fragment_id;
1014
1015 let splits = fragment_splits_map
1016 .get(&entry_fragment_id)
1017 .cloned()
1018 .unwrap_or_default();
1019
1020 let splits: std::collections::BTreeMap<_, _> =
1021 splits.into_iter().map(|s| (s.id(), s)).collect();
1022 let splits = actor_template.assign_splits(entry_fragment_id, splits);
1023 Some((splits, *source_id))
1024 }
1025 None => None,
1026 };
1027
1028 for component_fragment_id in components {
1029 let fragment = fragment_lookup.get(component_fragment_id).unwrap();
1030 let fragment_id = fragment.fragment_id;
1031 let job_id = fragment.job_id;
1032 let fragment_type_mask = fragment.fragment_type_mask;
1033 let distribution_type = fragment.distribution_type;
1034 let stream_node = &fragment.nodes;
1035 let state_table_ids = &fragment.state_table_ids;
1036 let vnode_count = fragment.vnode_count;
1037 let source_id = fragment_source_ids.get(&fragment_id).cloned();
1038
1039 let aligner = ComponentFragmentAligner::new(&actor_template, actor_id_allocator);
1040 let actors = aligner.align_component_actor(distribution_type);
1041 let mut splits = source_id
1042 .map(|source_id| {
1043 let (fragment_splits, shared_source_id) = source_splits.as_ref().unwrap();
1044 assert_eq!(*shared_source_id, source_id);
1045 aligner.align_component_splits(fragment_splits)
1046 })
1047 .unwrap_or_default();
1048
1049 let actors: HashMap<ActorId, InflightActorInfo> = actors
1050 .into_iter()
1051 .map(|(actor_id, (worker_id, vnode_bitmap))| {
1052 (
1053 actor_id,
1054 InflightActorInfo {
1055 worker_id,
1056 vnode_bitmap,
1057 splits: splits.remove(&actor_id).unwrap_or_default(),
1058 },
1059 )
1060 })
1061 .collect();
1062
1063 let fragment = InflightFragmentInfo {
1064 fragment_id,
1065 distribution_type,
1066 fragment_type_mask,
1067 vnode_count,
1068 nodes: stream_node.clone(),
1069 actors,
1070 state_table_ids: state_table_ids.clone(),
1071 };
1072
1073 let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
1074 anyhow!("streaming job {job_id} not found in streaming_job_databases")
1075 })?;
1076
1077 all_fragments
1078 .entry(database_id)
1079 .or_default()
1080 .entry(job_id)
1081 .or_default()
1082 .insert(fragment_id, fragment);
1083 }
1084 }
1085
1086 Ok(all_fragments)
1087}
1088
1089pub(crate) struct EnsembleActorTemplate {
1090 assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>>,
1091 distribution_type: DistributionType,
1092 actor_count: u32,
1093}
1094
1095impl EnsembleActorTemplate {
1096 pub(crate) fn render_new(
1097 job: &streaming_job::Model,
1098 worker_map: &HashMap<WorkerId, WorkerNode>,
1099 entry_fragment_parallelism: Option<StreamingParallelism>,
1100 database_resource_group: String,
1101 distribution_type: DistributionType,
1102 vnode_count: usize,
1103 ) -> MetaResult<Self> {
1104 let job_id = job.job_id;
1105 let job_strategy = job
1106 .adaptive_parallelism_strategy
1107 .as_deref()
1108 .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1109 let backfill_job_strategy = job
1110 .backfill_adaptive_parallelism_strategy
1111 .as_deref()
1112 .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1113
1114 let resource_group = match &job.specific_resource_group {
1115 None => database_resource_group,
1116 Some(resource_group) => resource_group.clone(),
1117 };
1118
1119 let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
1120 .iter()
1121 .filter_map(|(worker_id, worker)| {
1122 if worker
1123 .resource_group()
1124 .as_deref()
1125 .unwrap_or(DEFAULT_RESOURCE_GROUP)
1126 == resource_group.as_str()
1127 {
1128 Some((
1129 *worker_id,
1130 worker
1131 .parallelism()
1132 .expect("should have parallelism for compute node")
1133 .try_into()
1134 .expect("parallelism for compute node"),
1135 ))
1136 } else {
1137 None
1138 }
1139 })
1140 .collect();
1141
1142 let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
1143
1144 let effective_job_parallelism = if job.job_status != JobStatus::Created {
1145 job.backfill_parallelism
1146 .as_ref()
1147 .unwrap_or(&job.parallelism)
1148 } else {
1149 &job.parallelism
1150 };
1151 let effective_job_strategy = if job.job_status != JobStatus::Created {
1152 backfill_job_strategy.or(job_strategy)
1153 } else {
1154 job_strategy
1155 };
1156
1157 let target_parallelism = match entry_fragment_parallelism
1158 .as_ref()
1159 .unwrap_or(effective_job_parallelism)
1160 {
1161 StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
1162 let effective_job_strategy = effective_job_strategy.unwrap_or_else(|| {
1163 tracing::warn!(
1164 job_id = %job_id,
1165 ?effective_job_parallelism,
1166 "adaptive/custom job is missing adaptive strategy in StreamContext; falling back to default"
1167 );
1168 AdaptiveParallelismStrategy::default()
1169 });
1170 effective_job_strategy.compute_target_parallelism(total_parallelism)
1171 }
1172 StreamingParallelism::Fixed(n) => *n,
1173 };
1174 let actual_parallelism = target_parallelism
1175 .min(vnode_count)
1176 .min(job.max_parallelism as usize);
1177 if actual_parallelism != target_parallelism {
1178 tracing::warn!(
1179 job_id = %job_id,
1180 target_parallelism,
1181 actual_parallelism,
1182 vnode_count,
1183 job_max_parallelism = job.max_parallelism,
1184 ?effective_job_parallelism,
1185 ?entry_fragment_parallelism,
1186 "streaming job parallelism was capped by vnode count or max parallelism"
1187 );
1188 }
1189
1190 tracing::debug!(
1191 "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
1192 job_id,
1193 actual_parallelism,
1194 job.parallelism,
1195 total_parallelism,
1196 job.max_parallelism,
1197 vnode_count,
1198 entry_fragment_parallelism
1199 );
1200
1201 let assigner = AssignerBuilder::new(job_id).build();
1202
1203 let actors = (0..(actual_parallelism as u32)).collect_vec();
1204 let vnodes = (0..vnode_count).collect_vec();
1205
1206 let raw_assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
1207
1208 let assignment = raw_assignment
1209 .into_iter()
1210 .map(|(worker_id, actors)| {
1211 let actors = actors
1212 .into_iter()
1213 .map(|(actor_idx, vnodes)| {
1214 let bitmap = match distribution_type {
1215 DistributionType::Single => None,
1216 DistributionType::Hash => {
1217 Some(Bitmap::from_indices(vnode_count, &vnodes))
1218 }
1219 };
1220 (actor_idx, bitmap)
1221 })
1222 .collect();
1223 (worker_id, actors)
1224 })
1225 .collect();
1226
1227 let actor_count = u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
1228
1229 Ok(Self {
1230 assignment,
1231 distribution_type,
1232 actor_count,
1233 })
1234 }
1235
1236 pub(crate) fn from_existing_inflight_fragment(fragment: &InflightFragmentInfo) -> Self {
1237 if fragment.actors.is_empty() {
1238 return Self {
1239 assignment: BTreeMap::new(),
1240 distribution_type: fragment.distribution_type,
1241 actor_count: 0,
1242 };
1243 }
1244
1245 let actor_count = fragment.actors.len() as u32;
1246
1247 let mut assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>> = BTreeMap::new();
1248 for (actor_idx, (&_actor_id, actor_info)) in fragment.actors.iter().enumerate() {
1250 let actor_idx = actor_idx as u32;
1251 assignment
1252 .entry(actor_info.worker_id)
1253 .or_default()
1254 .insert(actor_idx, actor_info.vnode_bitmap.clone());
1255 }
1256
1257 Self {
1258 assignment,
1259 distribution_type: fragment.distribution_type,
1260 actor_count,
1261 }
1262 }
1263
1264 pub(crate) fn assert_aligned_with(
1271 &self,
1272 other: &Self,
1273 self_fragment_id: FragmentId,
1274 other_fragment_id: FragmentId,
1275 ) {
1276 let mapping = resolve_no_shuffle_actor_mapping(
1277 self.distribution_type,
1278 self.assignment
1279 .iter()
1280 .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1281 other.distribution_type,
1282 other
1283 .assignment
1284 .iter()
1285 .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1286 );
1287
1288 for ((self_worker, _self_idx), (other_worker, _other_idx)) in &mapping {
1289 assert_eq!(
1290 self_worker, other_worker,
1291 "fragments {} and {} disagree on worker placement: {:?}",
1292 self_fragment_id, other_fragment_id, mapping,
1293 );
1294 }
1295 }
1296
1297 fn assign_splits(
1298 &self,
1299 entry_fragment_id: FragmentId,
1300 splits: BTreeMap<SplitId, SplitImpl>,
1301 ) -> HashMap<u32, Vec<SplitImpl>> {
1302 {
1303 {
1304 let empty_actor_splits: HashMap<_, _> = self
1305 .assignment
1306 .values()
1307 .flat_map(|actors| actors.keys())
1308 .map(|actor_id| (*actor_id, vec![]))
1309 .collect();
1310
1311 crate::stream::source_manager::reassign_splits(
1312 entry_fragment_id,
1313 empty_actor_splits,
1314 &splits,
1315 SplitDiffOptions::default(),
1316 )
1317 .unwrap_or_default()
1318 }
1319 }
1320 }
1321}
1322
1323pub(crate) struct ComponentFragmentAligner<'a> {
1324 actor_template: &'a EnsembleActorTemplate,
1325 actor_id_base: ActorId,
1326}
1327
1328impl<'a> ComponentFragmentAligner<'a> {
1329 fn new(
1330 actor_template: &'a EnsembleActorTemplate,
1331 actor_id_allocator: &mut RenderActorIdAllocator<'_>,
1332 ) -> Self {
1333 let actor_id_base = actor_id_allocator.allocate_block(actor_template.actor_count);
1334 Self {
1335 actor_template,
1336 actor_id_base,
1337 }
1338 }
1339
1340 pub(crate) fn new_persistent(
1341 actor_template: &'a EnsembleActorTemplate,
1342 actor_id_counter: &AtomicU32,
1343 ) -> Self {
1344 let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
1345 Self::new(actor_template, &mut actor_id_allocator)
1346 }
1347
1348 pub(crate) fn align_component_actor(
1349 &self,
1350 distribution_type: DistributionType,
1351 ) -> HashMap<ActorId, (WorkerId, Option<Bitmap>)> {
1352 let EnsembleActorTemplate {
1353 assignment,
1354 actor_count,
1355 distribution_type: _,
1356 } = &self.actor_template;
1357 let actor_id_base = self.actor_id_base;
1358 {
1359 assignment
1360 .iter()
1361 .flat_map(|(worker_id, actors)| {
1362 actors
1363 .iter()
1364 .map(move |(actor_idx, bitmap)| (worker_id, actor_idx, bitmap))
1365 })
1366 .map(|(&worker_id, &actor_idx, bitmap)| {
1367 if distribution_type == DistributionType::Single {
1368 assert_eq!(*actor_count, 1);
1369 }
1370
1371 let actor_id = actor_id_base + actor_idx;
1372
1373 (actor_id, (worker_id, bitmap.clone()))
1374 })
1375 .collect()
1376 }
1377 }
1378
1379 pub(crate) fn align_component_splits(
1380 &self,
1381 split_assignment: &HashMap<u32, Vec<SplitImpl>>,
1382 ) -> HashMap<ActorId, Vec<SplitImpl>> {
1383 (0..self.actor_template.actor_count)
1384 .filter_map(|actor_idx| {
1385 split_assignment
1386 .get(&actor_idx)
1387 .map(|splits| ((self.actor_id_base + actor_idx), splits.clone()))
1388 })
1389 .collect()
1390 }
1391}
1392
1393#[cfg(debug_assertions)]
1394fn debug_sanity_check(
1395 ensembles: &[NoShuffleEnsemble],
1396 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1397 jobs: &HashMap<JobId, streaming_job::Model>,
1398) {
1399 let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
1400 .iter()
1401 .flat_map(|(job_id, fragments)| {
1402 fragments
1403 .iter()
1404 .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
1405 })
1406 .collect();
1407
1408 debug_assert!(
1410 ensembles
1411 .iter()
1412 .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
1413 "entries must be subset of components"
1414 );
1415
1416 let mut missing_fragments = BTreeSet::new();
1417 let mut missing_jobs = BTreeSet::new();
1418
1419 for fragment_id in ensembles
1420 .iter()
1421 .flat_map(|ensemble| ensemble.components.iter())
1422 {
1423 match fragment_lookup.get(fragment_id) {
1424 Some((fragment, job_id)) => {
1425 if !jobs.contains_key(&fragment.job_id) {
1426 missing_jobs.insert(*job_id);
1427 }
1428 }
1429 None => {
1430 missing_fragments.insert(*fragment_id);
1431 }
1432 }
1433 }
1434
1435 debug_assert!(
1436 missing_fragments.is_empty(),
1437 "missing fragments in fragment_map: {:?}",
1438 missing_fragments
1439 );
1440
1441 debug_assert!(
1442 missing_jobs.is_empty(),
1443 "missing jobs for fragments' job_id: {:?}",
1444 missing_jobs
1445 );
1446
1447 for ensemble in ensembles {
1448 let unique_vnode_counts: Vec<_> = ensemble
1449 .components
1450 .iter()
1451 .flat_map(|fragment_id| {
1452 fragment_lookup
1453 .get(fragment_id)
1454 .map(|(fragment, _)| fragment.vnode_count)
1455 })
1456 .unique()
1457 .collect();
1458
1459 debug_assert!(
1460 unique_vnode_counts.len() <= 1,
1461 "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
1462 ensemble.components,
1463 unique_vnode_counts
1464 );
1465 }
1466}
1467
1468async fn resolve_source_fragments<C>(
1469 txn: &C,
1470 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1471) -> MetaResult<(
1472 HashMap<FragmentId, SourceId>,
1473 HashMap<FragmentId, Vec<SplitImpl>>,
1474)>
1475where
1476 C: ConnectionTrait,
1477{
1478 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1479 for (fragment_id, fragment) in job_fragments.values().flatten() {
1480 let mask = fragment.fragment_type_mask;
1481 if mask.contains(FragmentTypeFlag::Source)
1482 && let Some(source_id) = fragment.nodes.find_stream_source()
1483 {
1484 source_fragment_ids
1485 .entry(source_id)
1486 .or_default()
1487 .insert(*fragment_id);
1488 }
1489
1490 if mask.contains(FragmentTypeFlag::SourceScan)
1491 && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1492 {
1493 source_fragment_ids
1494 .entry(source_id)
1495 .or_default()
1496 .insert(*fragment_id);
1497 }
1498 }
1499
1500 let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1501 .iter()
1502 .flat_map(|(source_id, fragment_ids)| {
1503 fragment_ids
1504 .iter()
1505 .map(|fragment_id| (*fragment_id, *source_id))
1506 })
1507 .collect();
1508
1509 let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1510
1511 let fragment_splits: Vec<_> = FragmentSplits::find()
1512 .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1513 .all(txn)
1514 .await?;
1515
1516 let fragment_splits: HashMap<_, _> = fragment_splits
1517 .into_iter()
1518 .flat_map(|model| {
1519 model.splits.map(|splits| {
1520 (
1521 model.fragment_id,
1522 splits
1523 .to_protobuf()
1524 .splits
1525 .iter()
1526 .flat_map(SplitImpl::try_from)
1527 .collect_vec(),
1528 )
1529 })
1530 })
1531 .collect();
1532
1533 Ok((fragment_source_ids, fragment_splits))
1534}
1535
1536#[derive(Debug)]
1538pub struct ActorGraph<'a> {
1539 pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1540 pub locations: &'a HashMap<ActorId, WorkerId>,
1541}
1542
1543#[derive(Debug, Clone)]
1544pub struct NoShuffleEnsemble {
1545 entries: HashSet<FragmentId>,
1546 components: HashSet<FragmentId>,
1547}
1548
1549impl NoShuffleEnsemble {
1550 pub(crate) fn singleton(fragment_id: FragmentId) -> Self {
1552 Self {
1553 entries: HashSet::from_iter([fragment_id]),
1554 components: HashSet::from_iter([fragment_id]),
1555 }
1556 }
1557
1558 #[cfg(test)]
1559 pub fn for_test(
1560 entries: impl IntoIterator<Item = FragmentId>,
1561 components: impl IntoIterator<Item = FragmentId>,
1562 ) -> Self {
1563 let entries = entries.into_iter().collect();
1564 let components = components.into_iter().collect();
1565 Self {
1566 entries,
1567 components,
1568 }
1569 }
1570
1571 pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1572 self.components.iter().cloned()
1573 }
1574
1575 pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1576 self.entries.iter().copied()
1577 }
1578
1579 pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1580 self.components.iter().copied()
1581 }
1582
1583 pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1584 self.entries.contains(fragment_id)
1585 }
1586}
1587
1588pub async fn find_fragment_no_shuffle_dags_detailed(
1589 db: &impl ConnectionTrait,
1590 initial_fragment_ids: &[FragmentId],
1591) -> MetaResult<Vec<NoShuffleEnsemble>> {
1592 let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1593 .columns([
1594 fragment_relation::Column::SourceFragmentId,
1595 fragment_relation::Column::TargetFragmentId,
1596 ])
1597 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1598 .into_tuple()
1599 .all(db)
1600 .await?;
1601
1602 let (forward_edges, backward_edges) =
1603 build_no_shuffle_fragment_graph_edges(all_no_shuffle_relations);
1604
1605 find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1606}
1607
1608pub(crate) fn build_no_shuffle_fragment_graph_edges(
1609 relations: impl IntoIterator<Item = (FragmentId, FragmentId)>,
1610) -> (
1611 HashMap<FragmentId, Vec<FragmentId>>,
1612 HashMap<FragmentId, Vec<FragmentId>>,
1613) {
1614 let mut forward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1615 let mut backward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1616
1617 for (src, dst) in relations {
1618 forward_edges.entry(src).or_default().insert(dst);
1619 backward_edges.entry(dst).or_default().insert(src);
1620 }
1621
1622 let forward_edges = forward_edges
1623 .into_iter()
1624 .map(|(src, dst_set)| (src, dst_set.into_iter().collect()))
1625 .collect();
1626 let backward_edges = backward_edges
1627 .into_iter()
1628 .map(|(dst, src_set)| (dst, src_set.into_iter().collect()))
1629 .collect();
1630
1631 (forward_edges, backward_edges)
1632}
1633
1634pub(crate) fn find_no_shuffle_graphs(
1635 initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1636 forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1637 backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1638) -> MetaResult<Vec<NoShuffleEnsemble>> {
1639 let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1640 let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1641
1642 for &init_id in initial_fragment_ids {
1643 let init_id = init_id.into();
1644 if globally_visited.contains(&init_id) {
1645 continue;
1646 }
1647
1648 let mut components = HashSet::new();
1650 let mut queue: VecDeque<FragmentId> = VecDeque::new();
1651
1652 queue.push_back(init_id);
1653 globally_visited.insert(init_id);
1654
1655 while let Some(current_id) = queue.pop_front() {
1656 components.insert(current_id);
1657 let neighbors = forward_edges
1658 .get(¤t_id)
1659 .into_iter()
1660 .flatten()
1661 .chain(backward_edges.get(¤t_id).into_iter().flatten());
1662
1663 for &neighbor_id in neighbors {
1664 if globally_visited.insert(neighbor_id) {
1665 queue.push_back(neighbor_id);
1666 }
1667 }
1668 }
1669
1670 let mut entries = HashSet::new();
1672 for &node_id in &components {
1673 let is_root = match backward_edges.get(&node_id) {
1674 Some(parents) => parents.iter().all(|p| !components.contains(p)),
1675 None => true,
1676 };
1677 if is_root {
1678 entries.insert(node_id);
1679 }
1680 }
1681
1682 if !entries.is_empty() {
1684 graphs.push(NoShuffleEnsemble {
1685 entries,
1686 components,
1687 });
1688 }
1689 }
1690
1691 Ok(graphs)
1692}
1693
1694#[cfg(test)]
1695mod tests {
1696 use std::collections::{BTreeSet, HashMap, HashSet};
1697 use std::sync::Arc;
1698
1699 use risingwave_connector::source::SplitImpl;
1700 use risingwave_connector::source::test_source::TestSourceSplit;
1701 use risingwave_meta_model::{CreateType, JobStatus};
1702 use risingwave_pb::common::WorkerType;
1703 use risingwave_pb::common::worker_node::Property as WorkerProperty;
1704 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1705
1706 use super::*;
1707
1708 type Edges = (
1711 HashMap<FragmentId, Vec<FragmentId>>,
1712 HashMap<FragmentId, Vec<FragmentId>>,
1713 );
1714
1715 fn build_edges(relations: &[(u32, u32)]) -> Edges {
1718 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1719 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1720 for &(src, dst) in relations {
1721 forward_edges
1722 .entry(src.into())
1723 .or_default()
1724 .push(dst.into());
1725 backward_edges
1726 .entry(dst.into())
1727 .or_default()
1728 .push(src.into());
1729 }
1730 (forward_edges, backward_edges)
1731 }
1732
1733 fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1735 ids.iter().map(|id| (*id).into()).collect()
1736 }
1737
1738 fn build_fragment(
1739 fragment_id: FragmentId,
1740 job_id: JobId,
1741 fragment_type_mask: i32,
1742 distribution_type: DistributionType,
1743 vnode_count: i32,
1744 parallelism: StreamingParallelism,
1745 ) -> LoadedFragment {
1746 LoadedFragment {
1747 fragment_id,
1748 job_id,
1749 fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1750 distribution_type,
1751 vnode_count: vnode_count as usize,
1752 nodes: PbStreamNode::default(),
1753 state_table_ids: HashSet::new(),
1754 parallelism: Some(parallelism),
1755 }
1756 }
1757
1758 type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1759
1760 fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1761 let base = fragment.actors.keys().copied().min().unwrap_or_default();
1762
1763 let mut entries: Vec<_> = fragment
1764 .actors
1765 .iter()
1766 .map(|(&actor_id, info)| {
1767 let idx = actor_id.as_raw_id() - base.as_raw_id();
1768 let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1769 bitmap
1770 .iter()
1771 .enumerate()
1772 .filter_map(|(pos, is_set)| is_set.then_some(pos))
1773 .collect::<Vec<_>>()
1774 });
1775 let splits = info
1776 .splits
1777 .iter()
1778 .map(|split| split.id().to_string())
1779 .collect::<Vec<_>>();
1780 (idx.into(), info.worker_id, vnode_indices, splits)
1781 })
1782 .collect();
1783
1784 entries.sort_by_key(|(idx, _, _, _)| *idx);
1785 entries
1786 }
1787
1788 fn build_worker_node(
1789 id: impl Into<WorkerId>,
1790 parallelism: usize,
1791 resource_group: &str,
1792 ) -> WorkerNode {
1793 WorkerNode {
1794 id: id.into(),
1795 r#type: WorkerType::ComputeNode as i32,
1796 property: Some(WorkerProperty {
1797 is_streaming: true,
1798 parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1799 resource_group: Some(resource_group.to_owned()),
1800 ..Default::default()
1801 }),
1802 ..Default::default()
1803 }
1804 }
1805
1806 #[test]
1807 fn test_single_linear_chain() {
1808 let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1811 let initial_ids = &[2];
1812
1813 let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1815
1816 assert!(result.is_ok());
1818 let graphs = result.unwrap();
1819
1820 assert_eq!(graphs.len(), 1);
1821 let graph = &graphs[0];
1822 assert_eq!(graph.entries, to_hashset(&[1]));
1823 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1824 }
1825
1826 #[test]
1827 fn test_two_disconnected_graphs() {
1828 let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1831 let initial_ids = &[2, 10];
1832
1833 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1835
1836 assert_eq!(graphs.len(), 2);
1838
1839 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1841
1842 assert_eq!(graphs[0].entries, to_hashset(&[1]));
1844 assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1845
1846 assert_eq!(graphs[1].entries, to_hashset(&[10]));
1848 assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1849 }
1850
1851 #[test]
1852 fn test_multiple_entries_in_one_graph() {
1853 let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1855 let initial_ids = &[3];
1856
1857 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1859
1860 assert_eq!(graphs.len(), 1);
1862 let graph = &graphs[0];
1863 assert_eq!(graph.entries, to_hashset(&[1, 2]));
1864 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1865 }
1866
1867 #[test]
1868 fn test_diamond_shape_graph() {
1869 let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1871 let initial_ids = &[4];
1872
1873 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1875
1876 assert_eq!(graphs.len(), 1);
1878 let graph = &graphs[0];
1879 assert_eq!(graph.entries, to_hashset(&[1]));
1880 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1881 }
1882
1883 #[test]
1884 fn test_starting_with_multiple_nodes_in_same_graph() {
1885 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1888 let initial_ids = &[2, 4];
1889
1890 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1892
1893 assert_eq!(graphs.len(), 1);
1895 let graph = &graphs[0];
1896 assert_eq!(graph.entries, to_hashset(&[1]));
1897 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1898 }
1899
1900 #[test]
1901 fn test_empty_initial_ids() {
1902 let (forward, backward) = build_edges(&[(1, 2)]);
1904 let initial_ids: &[u32] = &[];
1905
1906 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1908
1909 assert!(graphs.is_empty());
1911 }
1912
1913 #[test]
1914 fn test_isolated_node_as_input() {
1915 let (forward, backward) = build_edges(&[(1, 2)]);
1917 let initial_ids = &[100];
1918
1919 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1921
1922 assert_eq!(graphs.len(), 1);
1924 let graph = &graphs[0];
1925 assert_eq!(graph.entries, to_hashset(&[100]));
1926 assert_eq!(graph.components, to_hashset(&[100]));
1927 }
1928
1929 #[test]
1930 fn test_graph_with_a_cycle() {
1931 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1936 let initial_ids = &[2];
1937
1938 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1940
1941 assert!(
1943 graphs.is_empty(),
1944 "A graph with no entries should not be returned"
1945 );
1946 }
1947 #[test]
1948 fn test_custom_complex() {
1949 let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1950 let initial_ids = &[1, 2, 4, 6];
1951
1952 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1954
1955 assert_eq!(graphs.len(), 2);
1957 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1959
1960 assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1962 assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1963
1964 assert_eq!(graphs[1].entries, to_hashset(&[6]));
1966 assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1967 }
1968
1969 #[test]
1970 fn render_actors_increments_actor_counter() {
1971 let actor_id_counter = AtomicU32::new(100);
1972 let fragment_id: FragmentId = 1.into();
1973 let job_id: JobId = 10.into();
1974 let database_id: DatabaseId = DatabaseId::new(3);
1975
1976 let fragment_model = build_fragment(
1977 fragment_id,
1978 job_id,
1979 0,
1980 DistributionType::Single,
1981 1,
1982 StreamingParallelism::Fixed(1),
1983 );
1984
1985 let job_model = streaming_job::Model {
1986 job_id,
1987 job_status: JobStatus::Created,
1988 create_type: CreateType::Foreground,
1989 timezone: None,
1990 config_override: None,
1991 adaptive_parallelism_strategy: None,
1992 parallelism: StreamingParallelism::Fixed(1),
1993 backfill_parallelism: None,
1994 backfill_adaptive_parallelism_strategy: None,
1995 backfill_orders: None,
1996 max_parallelism: 1,
1997 specific_resource_group: None,
1998 is_serverless_backfill: false,
1999 refresh_interval_sec: None,
2000 };
2001
2002 let database_model = database::Model {
2003 database_id,
2004 name: "test_db".into(),
2005 resource_group: "rg-a".into(),
2006 barrier_interval_ms: None,
2007 checkpoint_frequency: None,
2008 };
2009
2010 let ensembles = vec![NoShuffleEnsemble {
2011 entries: HashSet::from([fragment_id]),
2012 components: HashSet::from([fragment_id]),
2013 }];
2014
2015 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
2016 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2017 let job_map = HashMap::from([(job_id, job_model)]);
2018
2019 let worker_map: HashMap<WorkerId, WorkerNode> =
2020 HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
2021
2022 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2023 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2024 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2025 let database_map = HashMap::from([(database_id, database_model)]);
2026
2027 let context = RenderActorsContext {
2028 fragment_source_ids: &fragment_source_ids,
2029 fragment_splits: &fragment_splits,
2030 streaming_job_databases: &streaming_job_databases,
2031 database_map: &database_map,
2032 };
2033
2034 let result = render_actors(
2035 &actor_id_counter,
2036 &ensembles,
2037 &job_fragments,
2038 &job_map,
2039 &worker_map,
2040 context,
2041 )
2042 .expect("actor rendering succeeds");
2043
2044 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2045 assert_eq!(state.len(), 1);
2046 assert!(
2047 state[0].2.is_none(),
2048 "single distribution should not assign vnode bitmaps"
2049 );
2050 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
2051 }
2052
2053 #[test]
2054 fn render_actors_aligns_hash_vnode_bitmaps() {
2055 let actor_id_counter = AtomicU32::new(0);
2056 let entry_fragment_id: FragmentId = 1.into();
2057 let downstream_fragment_id: FragmentId = 2.into();
2058 let job_id: JobId = 20.into();
2059 let database_id: DatabaseId = DatabaseId::new(5);
2060
2061 let entry_fragment = build_fragment(
2062 entry_fragment_id,
2063 job_id,
2064 0,
2065 DistributionType::Hash,
2066 4,
2067 StreamingParallelism::Fixed(2),
2068 );
2069
2070 let downstream_fragment = build_fragment(
2071 downstream_fragment_id,
2072 job_id,
2073 0,
2074 DistributionType::Hash,
2075 4,
2076 StreamingParallelism::Fixed(2),
2077 );
2078
2079 let job_model = streaming_job::Model {
2080 job_id,
2081 job_status: JobStatus::Created,
2082 create_type: CreateType::Background,
2083 timezone: None,
2084 config_override: None,
2085 adaptive_parallelism_strategy: None,
2086 parallelism: StreamingParallelism::Fixed(2),
2087 backfill_parallelism: None,
2088 backfill_adaptive_parallelism_strategy: None,
2089 backfill_orders: None,
2090 max_parallelism: 2,
2091 specific_resource_group: None,
2092 is_serverless_backfill: false,
2093 refresh_interval_sec: None,
2094 };
2095
2096 let database_model = database::Model {
2097 database_id,
2098 name: "test_db_hash".into(),
2099 resource_group: "rg-hash".into(),
2100 barrier_interval_ms: None,
2101 checkpoint_frequency: None,
2102 };
2103
2104 let ensembles = vec![NoShuffleEnsemble {
2105 entries: HashSet::from([entry_fragment_id]),
2106 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2107 }];
2108
2109 let fragment_map = HashMap::from([
2110 (entry_fragment_id, entry_fragment),
2111 (downstream_fragment_id, downstream_fragment),
2112 ]);
2113 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2114 let job_map = HashMap::from([(job_id, job_model)]);
2115
2116 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2117 (1.into(), build_worker_node(1, 1, "rg-hash")),
2118 (2.into(), build_worker_node(2, 1, "rg-hash")),
2119 ]);
2120
2121 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2122 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2123 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2124 let database_map = HashMap::from([(database_id, database_model)]);
2125
2126 let context = RenderActorsContext {
2127 fragment_source_ids: &fragment_source_ids,
2128 fragment_splits: &fragment_splits,
2129 streaming_job_databases: &streaming_job_databases,
2130 database_map: &database_map,
2131 };
2132
2133 let result = render_actors(
2134 &actor_id_counter,
2135 &ensembles,
2136 &job_fragments,
2137 &job_map,
2138 &worker_map,
2139 context,
2140 )
2141 .expect("actor rendering succeeds");
2142
2143 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2144 let downstream_state =
2145 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2146
2147 assert_eq!(entry_state.len(), 2);
2148 assert_eq!(entry_state, downstream_state);
2149
2150 let assigned_vnodes: BTreeSet<_> = entry_state
2151 .iter()
2152 .flat_map(|(_, _, vnodes, _)| {
2153 vnodes
2154 .as_ref()
2155 .expect("hash distribution should populate vnode bitmap")
2156 .iter()
2157 .copied()
2158 })
2159 .collect();
2160 assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
2161 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2162 }
2163
2164 #[test]
2165 fn render_actors_propagates_source_splits() {
2166 let actor_id_counter = AtomicU32::new(0);
2167 let entry_fragment_id: FragmentId = 11.into();
2168 let downstream_fragment_id: FragmentId = 12.into();
2169 let job_id: JobId = 30.into();
2170 let database_id: DatabaseId = DatabaseId::new(7);
2171 let source_id: SourceId = 99.into();
2172
2173 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2174 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2175
2176 let entry_fragment = build_fragment(
2177 entry_fragment_id,
2178 job_id,
2179 source_mask,
2180 DistributionType::Hash,
2181 4,
2182 StreamingParallelism::Fixed(2),
2183 );
2184
2185 let downstream_fragment = build_fragment(
2186 downstream_fragment_id,
2187 job_id,
2188 source_scan_mask,
2189 DistributionType::Hash,
2190 4,
2191 StreamingParallelism::Fixed(2),
2192 );
2193
2194 let job_model = streaming_job::Model {
2195 job_id,
2196 job_status: JobStatus::Created,
2197 create_type: CreateType::Background,
2198 timezone: None,
2199 config_override: None,
2200 adaptive_parallelism_strategy: None,
2201 parallelism: StreamingParallelism::Fixed(2),
2202 backfill_parallelism: None,
2203 backfill_adaptive_parallelism_strategy: None,
2204 backfill_orders: None,
2205 max_parallelism: 2,
2206 specific_resource_group: None,
2207 is_serverless_backfill: false,
2208 refresh_interval_sec: None,
2209 };
2210
2211 let database_model = database::Model {
2212 database_id,
2213 name: "split_db".into(),
2214 resource_group: "rg-source".into(),
2215 barrier_interval_ms: None,
2216 checkpoint_frequency: None,
2217 };
2218
2219 let ensembles = vec![NoShuffleEnsemble {
2220 entries: HashSet::from([entry_fragment_id]),
2221 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2222 }];
2223
2224 let fragment_map = HashMap::from([
2225 (entry_fragment_id, entry_fragment),
2226 (downstream_fragment_id, downstream_fragment),
2227 ]);
2228 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2229 let job_map = HashMap::from([(job_id, job_model)]);
2230
2231 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2232 (1.into(), build_worker_node(1, 1, "rg-source")),
2233 (2.into(), build_worker_node(2, 1, "rg-source")),
2234 ]);
2235
2236 let split_a = SplitImpl::Test(TestSourceSplit {
2237 id: Arc::<str>::from("split-a"),
2238 properties: HashMap::new(),
2239 offset: "0".into(),
2240 });
2241 let split_b = SplitImpl::Test(TestSourceSplit {
2242 id: Arc::<str>::from("split-b"),
2243 properties: HashMap::new(),
2244 offset: "0".into(),
2245 });
2246
2247 let fragment_source_ids = HashMap::from([
2248 (entry_fragment_id, source_id),
2249 (downstream_fragment_id, source_id),
2250 ]);
2251 let fragment_splits =
2252 HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
2253 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2254 let database_map = HashMap::from([(database_id, database_model)]);
2255
2256 let context = RenderActorsContext {
2257 fragment_source_ids: &fragment_source_ids,
2258 fragment_splits: &fragment_splits,
2259 streaming_job_databases: &streaming_job_databases,
2260 database_map: &database_map,
2261 };
2262
2263 let result = render_actors(
2264 &actor_id_counter,
2265 &ensembles,
2266 &job_fragments,
2267 &job_map,
2268 &worker_map,
2269 context,
2270 )
2271 .expect("actor rendering succeeds");
2272
2273 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2274 let downstream_state =
2275 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2276
2277 assert_eq!(entry_state, downstream_state);
2278
2279 let split_ids: BTreeSet<_> = entry_state
2280 .iter()
2281 .flat_map(|(_, _, _, splits)| splits.iter().cloned())
2282 .collect();
2283 assert_eq!(
2284 split_ids,
2285 BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
2286 );
2287 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2288 }
2289
2290 #[test]
2291 fn preview_actor_assignments_defers_actor_id_allocation() {
2292 let actor_id_counter = AtomicU32::new(100);
2293 let entry_fragment_id: FragmentId = 11.into();
2294 let downstream_fragment_id: FragmentId = 12.into();
2295 let job_id: JobId = 30.into();
2296 let database_id: DatabaseId = DatabaseId::new(7);
2297 let source_id: SourceId = 99.into();
2298
2299 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2300 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2301
2302 let entry_fragment = build_fragment(
2303 entry_fragment_id,
2304 job_id,
2305 source_mask,
2306 DistributionType::Hash,
2307 4,
2308 StreamingParallelism::Fixed(2),
2309 );
2310
2311 let downstream_fragment = build_fragment(
2312 downstream_fragment_id,
2313 job_id,
2314 source_scan_mask,
2315 DistributionType::Hash,
2316 4,
2317 StreamingParallelism::Fixed(2),
2318 );
2319
2320 let job_model = streaming_job::Model {
2321 job_id,
2322 job_status: JobStatus::Created,
2323 create_type: CreateType::Background,
2324 timezone: None,
2325 config_override: None,
2326 adaptive_parallelism_strategy: None,
2327 parallelism: StreamingParallelism::Fixed(2),
2328 backfill_parallelism: None,
2329 backfill_adaptive_parallelism_strategy: None,
2330 backfill_orders: None,
2331 max_parallelism: 2,
2332 specific_resource_group: None,
2333 is_serverless_backfill: false,
2334 refresh_interval_sec: None,
2335 };
2336
2337 let database_model = database::Model {
2338 database_id,
2339 name: "preview_db".into(),
2340 resource_group: "rg-source".into(),
2341 barrier_interval_ms: None,
2342 checkpoint_frequency: None,
2343 };
2344
2345 let ensembles = vec![NoShuffleEnsemble {
2346 entries: HashSet::from([entry_fragment_id]),
2347 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2348 }];
2349
2350 let fragment_map = HashMap::from([
2351 (entry_fragment_id, entry_fragment),
2352 (downstream_fragment_id, downstream_fragment),
2353 ]);
2354 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2355 let job_map = HashMap::from([(job_id, job_model)]);
2356
2357 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2358 (1.into(), build_worker_node(1, 1, "rg-source")),
2359 (2.into(), build_worker_node(2, 1, "rg-source")),
2360 ]);
2361
2362 let split_a = SplitImpl::Test(TestSourceSplit {
2363 id: Arc::<str>::from("split-a"),
2364 properties: HashMap::new(),
2365 offset: "0".into(),
2366 });
2367 let split_b = SplitImpl::Test(TestSourceSplit {
2368 id: Arc::<str>::from("split-b"),
2369 properties: HashMap::new(),
2370 offset: "0".into(),
2371 });
2372
2373 let loaded = LoadedFragmentContext {
2374 ensembles,
2375 job_fragments,
2376 job_map,
2377 streaming_job_databases: HashMap::from([(job_id, database_id)]),
2378 database_map: HashMap::from([(database_id, database_model)]),
2379 fragment_source_ids: HashMap::from([
2380 (entry_fragment_id, source_id),
2381 (downstream_fragment_id, source_id),
2382 ]),
2383 fragment_splits: HashMap::from([(entry_fragment_id, vec![split_a, split_b])]),
2384 };
2385
2386 let preview =
2387 preview_actor_assignments(&worker_map, &loaded).expect("preview rendering succeeds");
2388 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 100);
2389
2390 let preview_entry_state =
2391 collect_actor_state(&preview.fragments[&database_id][&job_id][&entry_fragment_id]);
2392 let preview_downstream_state =
2393 collect_actor_state(&preview.fragments[&database_id][&job_id][&downstream_fragment_id]);
2394 assert_eq!(preview_entry_state, preview_downstream_state);
2395
2396 let materialized = materialize_actor_assignments(&actor_id_counter, preview);
2397
2398 let materialized_entry_state =
2399 collect_actor_state(&materialized.fragments[&database_id][&job_id][&entry_fragment_id]);
2400 let materialized_downstream_state = collect_actor_state(
2401 &materialized.fragments[&database_id][&job_id][&downstream_fragment_id],
2402 );
2403
2404 assert_eq!(materialized_entry_state, materialized_downstream_state);
2405 assert_eq!(materialized_entry_state, preview_entry_state);
2406 assert_eq!(materialized_downstream_state, preview_downstream_state);
2407 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 104);
2408 }
2409
2410 #[test]
2412 fn render_actors_job_strategy_overrides_global() {
2413 let actor_id_counter = AtomicU32::new(0);
2414 let fragment_id: FragmentId = 1.into();
2415 let job_id: JobId = 100.into();
2416 let database_id: DatabaseId = DatabaseId::new(10);
2417
2418 let fragment_model = build_fragment(
2420 fragment_id,
2421 job_id,
2422 0,
2423 DistributionType::Hash,
2424 8,
2425 StreamingParallelism::Adaptive,
2426 );
2427
2428 let job_model = streaming_job::Model {
2430 job_id,
2431 job_status: JobStatus::Created,
2432 create_type: CreateType::Foreground,
2433 timezone: None,
2434 config_override: None,
2435 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2436 parallelism: StreamingParallelism::Adaptive,
2437 backfill_parallelism: None,
2438 backfill_adaptive_parallelism_strategy: None,
2439 backfill_orders: None,
2440 max_parallelism: 8,
2441 specific_resource_group: None,
2442 is_serverless_backfill: false,
2443 refresh_interval_sec: None,
2444 };
2445
2446 let database_model = database::Model {
2447 database_id,
2448 name: "test_db".into(),
2449 resource_group: "default".into(),
2450 barrier_interval_ms: None,
2451 checkpoint_frequency: None,
2452 };
2453
2454 let ensembles = vec![NoShuffleEnsemble {
2455 entries: HashSet::from([fragment_id]),
2456 components: HashSet::from([fragment_id]),
2457 }];
2458
2459 let fragment_map =
2460 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2461 let job_map = HashMap::from([(job_id, job_model)]);
2462
2463 let worker_map = HashMap::from([
2465 (1.into(), build_worker_node(1, 1, "default")),
2466 (2.into(), build_worker_node(2, 1, "default")),
2467 (3.into(), build_worker_node(3, 1, "default")),
2468 (4.into(), build_worker_node(4, 1, "default")),
2469 ]);
2470
2471 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2472 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2473 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2474 let database_map = HashMap::from([(database_id, database_model)]);
2475
2476 let context = RenderActorsContext {
2477 fragment_source_ids: &fragment_source_ids,
2478 fragment_splits: &fragment_splits,
2479 streaming_job_databases: &streaming_job_databases,
2480 database_map: &database_map,
2481 };
2482
2483 let result = render_actors(
2485 &actor_id_counter,
2486 &ensembles,
2487 &fragment_map,
2488 &job_map,
2489 &worker_map,
2490 context,
2491 )
2492 .expect("actor rendering succeeds");
2493
2494 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2495 assert_eq!(
2497 state.len(),
2498 2,
2499 "Job strategy BOUNDED(2) should override global FULL"
2500 );
2501 }
2502
2503 #[test]
2506 fn render_actors_falls_back_to_default_when_adaptive_job_has_no_strategy() {
2507 let actor_id_counter = AtomicU32::new(0);
2508 let fragment_id: FragmentId = 1.into();
2509 let job_id: JobId = 101.into();
2510 let database_id: DatabaseId = DatabaseId::new(11);
2511
2512 let fragment_model = build_fragment(
2513 fragment_id,
2514 job_id,
2515 0,
2516 DistributionType::Hash,
2517 8,
2518 StreamingParallelism::Adaptive,
2519 );
2520
2521 let job_model = streaming_job::Model {
2523 job_id,
2524 job_status: JobStatus::Created,
2525 create_type: CreateType::Foreground,
2526 timezone: None,
2527 config_override: None,
2528 adaptive_parallelism_strategy: None, parallelism: StreamingParallelism::Adaptive,
2530 backfill_parallelism: None,
2531 backfill_adaptive_parallelism_strategy: None,
2532 backfill_orders: None,
2533 max_parallelism: 8,
2534 specific_resource_group: None,
2535 is_serverless_backfill: false,
2536 refresh_interval_sec: None,
2537 };
2538
2539 let database_model = database::Model {
2540 database_id,
2541 name: "test_db".into(),
2542 resource_group: "default".into(),
2543 barrier_interval_ms: None,
2544 checkpoint_frequency: None,
2545 };
2546
2547 let ensembles = vec![NoShuffleEnsemble {
2548 entries: HashSet::from([fragment_id]),
2549 components: HashSet::from([fragment_id]),
2550 }];
2551
2552 let fragment_map =
2553 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2554 let job_map = HashMap::from([(job_id, job_model)]);
2555
2556 let worker_map = HashMap::from([
2558 (1.into(), build_worker_node(1, 1, "default")),
2559 (2.into(), build_worker_node(2, 1, "default")),
2560 (3.into(), build_worker_node(3, 1, "default")),
2561 (4.into(), build_worker_node(4, 1, "default")),
2562 ]);
2563
2564 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2565 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2566 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2567 let database_map = HashMap::from([(database_id, database_model)]);
2568
2569 let context = RenderActorsContext {
2570 fragment_source_ids: &fragment_source_ids,
2571 fragment_splits: &fragment_splits,
2572 streaming_job_databases: &streaming_job_databases,
2573 database_map: &database_map,
2574 };
2575
2576 let result = render_actors(
2577 &actor_id_counter,
2578 &ensembles,
2579 &fragment_map,
2580 &job_map,
2581 &worker_map,
2582 context,
2583 )
2584 .expect("actor rendering succeeds");
2585
2586 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2587 assert_eq!(
2588 state.len(),
2589 4,
2590 "default adaptive strategy should use the full available parallelism"
2591 );
2592 }
2593
2594 #[test]
2596 fn render_actors_fixed_parallelism_ignores_strategy() {
2597 let actor_id_counter = AtomicU32::new(0);
2598 let fragment_id: FragmentId = 1.into();
2599 let job_id: JobId = 102.into();
2600 let database_id: DatabaseId = DatabaseId::new(12);
2601
2602 let fragment_model = build_fragment(
2604 fragment_id,
2605 job_id,
2606 0,
2607 DistributionType::Hash,
2608 8,
2609 StreamingParallelism::Fixed(5),
2610 );
2611
2612 let job_model = streaming_job::Model {
2614 job_id,
2615 job_status: JobStatus::Created,
2616 create_type: CreateType::Foreground,
2617 timezone: None,
2618 config_override: None,
2619 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2620 parallelism: StreamingParallelism::Fixed(5),
2621 backfill_parallelism: None,
2622 backfill_adaptive_parallelism_strategy: None,
2623 backfill_orders: None,
2624 max_parallelism: 8,
2625 specific_resource_group: None,
2626 is_serverless_backfill: false,
2627 refresh_interval_sec: None,
2628 };
2629
2630 let database_model = database::Model {
2631 database_id,
2632 name: "test_db".into(),
2633 resource_group: "default".into(),
2634 barrier_interval_ms: None,
2635 checkpoint_frequency: None,
2636 };
2637
2638 let ensembles = vec![NoShuffleEnsemble {
2639 entries: HashSet::from([fragment_id]),
2640 components: HashSet::from([fragment_id]),
2641 }];
2642
2643 let fragment_map =
2644 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2645 let job_map = HashMap::from([(job_id, job_model)]);
2646
2647 let worker_map = HashMap::from([
2649 (1.into(), build_worker_node(1, 1, "default")),
2650 (2.into(), build_worker_node(2, 1, "default")),
2651 (3.into(), build_worker_node(3, 1, "default")),
2652 (4.into(), build_worker_node(4, 1, "default")),
2653 (5.into(), build_worker_node(5, 1, "default")),
2654 (6.into(), build_worker_node(6, 1, "default")),
2655 ]);
2656
2657 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2658 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2659 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2660 let database_map = HashMap::from([(database_id, database_model)]);
2661
2662 let context = RenderActorsContext {
2663 fragment_source_ids: &fragment_source_ids,
2664 fragment_splits: &fragment_splits,
2665 streaming_job_databases: &streaming_job_databases,
2666 database_map: &database_map,
2667 };
2668
2669 let result = render_actors(
2670 &actor_id_counter,
2671 &ensembles,
2672 &fragment_map,
2673 &job_map,
2674 &worker_map,
2675 context,
2676 )
2677 .expect("actor rendering succeeds");
2678
2679 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2680 assert_eq!(
2682 state.len(),
2683 5,
2684 "Fixed parallelism should ignore all strategies"
2685 );
2686 }
2687
2688 #[test]
2690 fn render_actors_ratio_strategy() {
2691 let actor_id_counter = AtomicU32::new(0);
2692 let fragment_id: FragmentId = 1.into();
2693 let job_id: JobId = 103.into();
2694 let database_id: DatabaseId = DatabaseId::new(13);
2695
2696 let fragment_model = build_fragment(
2697 fragment_id,
2698 job_id,
2699 0,
2700 DistributionType::Hash,
2701 16,
2702 StreamingParallelism::Adaptive,
2703 );
2704
2705 let job_model = streaming_job::Model {
2707 job_id,
2708 job_status: JobStatus::Created,
2709 create_type: CreateType::Foreground,
2710 timezone: None,
2711 config_override: None,
2712 adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2713 parallelism: StreamingParallelism::Adaptive,
2714 backfill_parallelism: None,
2715 backfill_adaptive_parallelism_strategy: None,
2716 backfill_orders: None,
2717 max_parallelism: 16,
2718 specific_resource_group: None,
2719 is_serverless_backfill: false,
2720 refresh_interval_sec: None,
2721 };
2722
2723 let database_model = database::Model {
2724 database_id,
2725 name: "test_db".into(),
2726 resource_group: "default".into(),
2727 barrier_interval_ms: None,
2728 checkpoint_frequency: None,
2729 };
2730
2731 let ensembles = vec![NoShuffleEnsemble {
2732 entries: HashSet::from([fragment_id]),
2733 components: HashSet::from([fragment_id]),
2734 }];
2735
2736 let fragment_map =
2737 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2738 let job_map = HashMap::from([(job_id, job_model)]);
2739
2740 let worker_map = HashMap::from([
2742 (1.into(), build_worker_node(1, 1, "default")),
2743 (2.into(), build_worker_node(2, 1, "default")),
2744 (3.into(), build_worker_node(3, 1, "default")),
2745 (4.into(), build_worker_node(4, 1, "default")),
2746 (5.into(), build_worker_node(5, 1, "default")),
2747 (6.into(), build_worker_node(6, 1, "default")),
2748 (7.into(), build_worker_node(7, 1, "default")),
2749 (8.into(), build_worker_node(8, 1, "default")),
2750 ]);
2751
2752 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2753 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2754 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2755 let database_map = HashMap::from([(database_id, database_model)]);
2756
2757 let context = RenderActorsContext {
2758 fragment_source_ids: &fragment_source_ids,
2759 fragment_splits: &fragment_splits,
2760 streaming_job_databases: &streaming_job_databases,
2761 database_map: &database_map,
2762 };
2763
2764 let result = render_actors(
2765 &actor_id_counter,
2766 &ensembles,
2767 &fragment_map,
2768 &job_map,
2769 &worker_map,
2770 context,
2771 )
2772 .expect("actor rendering succeeds");
2773
2774 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2775 assert_eq!(
2777 state.len(),
2778 4,
2779 "RATIO(0.5) of 8 workers should give 4 actors"
2780 );
2781 }
2782
2783 #[test]
2784 fn render_actors_backfill_strategy_overrides_job_strategy() {
2785 let actor_id_counter = AtomicU32::new(0);
2786 let fragment_id: FragmentId = 1.into();
2787 let job_id: JobId = 104.into();
2788 let database_id: DatabaseId = DatabaseId::new(14);
2789
2790 let fragment_model = build_fragment(
2791 fragment_id,
2792 job_id,
2793 0,
2794 DistributionType::Hash,
2795 16,
2796 StreamingParallelism::Adaptive,
2797 );
2798
2799 let job_model = streaming_job::Model {
2800 job_id,
2801 job_status: JobStatus::Creating,
2802 create_type: CreateType::Background,
2803 timezone: None,
2804 config_override: None,
2805 adaptive_parallelism_strategy: Some("BOUNDED(4)".to_owned()),
2806 parallelism: StreamingParallelism::Adaptive,
2807 backfill_parallelism: Some(StreamingParallelism::Adaptive),
2808 backfill_adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2809 backfill_orders: None,
2810 max_parallelism: 16,
2811 specific_resource_group: None,
2812 is_serverless_backfill: false,
2813 refresh_interval_sec: None,
2814 };
2815
2816 let database_model = database::Model {
2817 database_id,
2818 name: "test_db".into(),
2819 resource_group: "default".into(),
2820 barrier_interval_ms: None,
2821 checkpoint_frequency: None,
2822 };
2823
2824 let ensembles = vec![NoShuffleEnsemble {
2825 entries: HashSet::from([fragment_id]),
2826 components: HashSet::from([fragment_id]),
2827 }];
2828
2829 let fragment_map =
2830 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2831 let job_map = HashMap::from([(job_id, job_model)]);
2832
2833 let worker_map = HashMap::from([
2834 (1.into(), build_worker_node(1, 1, "default")),
2835 (2.into(), build_worker_node(2, 1, "default")),
2836 (3.into(), build_worker_node(3, 1, "default")),
2837 (4.into(), build_worker_node(4, 1, "default")),
2838 ]);
2839
2840 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2841 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2842 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2843 let database_map = HashMap::from([(database_id, database_model)]);
2844 let context = RenderActorsContext {
2845 fragment_source_ids: &fragment_source_ids,
2846 fragment_splits: &fragment_splits,
2847 streaming_job_databases: &streaming_job_databases,
2848 database_map: &database_map,
2849 };
2850
2851 let result = render_actors(
2852 &actor_id_counter,
2853 &ensembles,
2854 &fragment_map,
2855 &job_map,
2856 &worker_map,
2857 context,
2858 )
2859 .expect("actor rendering succeeds");
2860
2861 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2862 assert_eq!(
2863 state.len(),
2864 2,
2865 "Backfill strategy BOUNDED(2) should override the job strategy during backfill"
2866 );
2867 }
2868
2869 #[test]
2870 fn render_actors_created_job_ignores_backfill_strategy() {
2871 let actor_id_counter = AtomicU32::new(0);
2872 let fragment_id: FragmentId = 1.into();
2873 let job_id: JobId = 105.into();
2874 let database_id: DatabaseId = DatabaseId::new(15);
2875
2876 let fragment_model = build_fragment(
2877 fragment_id,
2878 job_id,
2879 0,
2880 DistributionType::Hash,
2881 16,
2882 StreamingParallelism::Adaptive,
2883 );
2884
2885 let job_model = streaming_job::Model {
2886 job_id,
2887 job_status: JobStatus::Created,
2888 create_type: CreateType::Foreground,
2889 timezone: None,
2890 config_override: None,
2891 adaptive_parallelism_strategy: Some("BOUNDED(4)".to_owned()),
2892 parallelism: StreamingParallelism::Adaptive,
2893 backfill_parallelism: Some(StreamingParallelism::Adaptive),
2894 backfill_adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2895 backfill_orders: None,
2896 max_parallelism: 16,
2897 specific_resource_group: None,
2898 is_serverless_backfill: false,
2899 refresh_interval_sec: None,
2900 };
2901
2902 let database_model = database::Model {
2903 database_id,
2904 name: "test_db".into(),
2905 resource_group: "default".into(),
2906 barrier_interval_ms: None,
2907 checkpoint_frequency: None,
2908 };
2909
2910 let ensembles = vec![NoShuffleEnsemble {
2911 entries: HashSet::from([fragment_id]),
2912 components: HashSet::from([fragment_id]),
2913 }];
2914
2915 let fragment_map =
2916 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2917 let job_map = HashMap::from([(job_id, job_model)]);
2918
2919 let worker_map = HashMap::from([
2920 (1.into(), build_worker_node(1, 1, "default")),
2921 (2.into(), build_worker_node(2, 1, "default")),
2922 (3.into(), build_worker_node(3, 1, "default")),
2923 (4.into(), build_worker_node(4, 1, "default")),
2924 ]);
2925
2926 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2927 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2928 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2929 let database_map = HashMap::from([(database_id, database_model)]);
2930 let context = RenderActorsContext {
2931 fragment_source_ids: &fragment_source_ids,
2932 fragment_splits: &fragment_splits,
2933 streaming_job_databases: &streaming_job_databases,
2934 database_map: &database_map,
2935 };
2936
2937 let result = render_actors(
2938 &actor_id_counter,
2939 &ensembles,
2940 &fragment_map,
2941 &job_map,
2942 &worker_map,
2943 context,
2944 )
2945 .expect("actor rendering succeeds");
2946
2947 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2948 assert_eq!(
2949 state.len(),
2950 4,
2951 "created jobs should use the main job strategy before any backfill override applies"
2952 );
2953 }
2954}