1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23#[cfg(debug_assertions)]
24use risingwave_common::hash::VnodeBitmapExt;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_meta_model::prelude::{
32 Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
33};
34use risingwave_meta_model::{
35 DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism, WorkerId,
36 database, fragment, fragment_relation, fragment_splits, object, sink, source, streaming_job,
37 table,
38};
39use risingwave_meta_model_migration::Condition;
40use risingwave_pb::common::WorkerNode;
41use risingwave_pb::stream_plan::PbStreamNode;
42use sea_orm::{
43 ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
44 RelationTrait,
45};
46
47use crate::MetaResult;
48use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
49use crate::controller::utils::resolve_no_shuffle_actor_mapping;
50use crate::manager::ActiveStreamingWorkerNodes;
51use crate::model::{ActorId, StreamActor};
52use crate::stream::{AssignerBuilder, SplitDiffOptions};
53
54pub(crate) async fn resolve_streaming_job_definition<C>(
55 txn: &C,
56 job_ids: &HashSet<JobId>,
57) -> MetaResult<HashMap<JobId, String>>
58where
59 C: ConnectionTrait,
60{
61 let job_ids = job_ids.iter().cloned().collect_vec();
62
63 let common_job_definitions: Vec<(JobId, String)> = Table::find()
65 .select_only()
66 .columns([
67 table::Column::TableId,
68 #[cfg(not(debug_assertions))]
69 table::Column::Name,
70 #[cfg(debug_assertions)]
71 table::Column::Definition,
72 ])
73 .filter(table::Column::TableId.is_in(job_ids.clone()))
74 .into_tuple()
75 .all(txn)
76 .await?;
77
78 let sink_definitions: Vec<(JobId, String)> = Sink::find()
79 .select_only()
80 .columns([
81 sink::Column::SinkId,
82 #[cfg(not(debug_assertions))]
83 sink::Column::Name,
84 #[cfg(debug_assertions)]
85 sink::Column::Definition,
86 ])
87 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
88 .into_tuple()
89 .all(txn)
90 .await?;
91
92 let source_definitions: Vec<(JobId, String)> = Source::find()
93 .select_only()
94 .columns([
95 source::Column::SourceId,
96 #[cfg(not(debug_assertions))]
97 source::Column::Name,
98 #[cfg(debug_assertions)]
99 source::Column::Definition,
100 ])
101 .filter(source::Column::SourceId.is_in(job_ids.clone()))
102 .into_tuple()
103 .all(txn)
104 .await?;
105
106 let definitions: HashMap<JobId, String> = common_job_definitions
107 .into_iter()
108 .chain(sink_definitions)
109 .chain(source_definitions)
110 .collect();
111
112 Ok(definitions)
113}
114
115pub async fn load_fragment_info<C>(
116 txn: &C,
117 actor_id_counter: &AtomicU32,
118 database_id: Option<DatabaseId>,
119 worker_nodes: &ActiveStreamingWorkerNodes,
120) -> MetaResult<FragmentRenderMap>
121where
122 C: ConnectionTrait,
123{
124 let mut query = StreamingJob::find()
125 .select_only()
126 .column(streaming_job::Column::JobId);
127
128 if let Some(database_id) = database_id {
129 query = query
130 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
131 .filter(object::Column::DatabaseId.eq(database_id));
132 }
133
134 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
135
136 if jobs.is_empty() {
137 return Ok(HashMap::new());
138 }
139
140 let jobs: HashSet<JobId> = jobs.into_iter().collect();
141
142 let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
143
144 if loaded.is_empty() {
145 return Ok(HashMap::new());
146 }
147
148 let RenderedGraph { fragments, .. } =
149 render_actor_assignments(actor_id_counter, worker_nodes.current(), &loaded)?;
150
151 Ok(fragments)
152}
153
154#[derive(Debug)]
155pub struct TargetResourcePolicy {
156 pub resource_group: Option<String>,
157 pub parallelism: StreamingParallelism,
158}
159
160#[derive(Debug, Clone)]
161pub struct WorkerInfo {
162 pub parallelism: NonZeroUsize,
163 pub resource_group: Option<String>,
164}
165
166pub type FragmentRenderMap =
167 HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
168
169#[derive(Default)]
170pub struct RenderedGraph {
171 pub fragments: FragmentRenderMap,
172 pub ensembles: Vec<NoShuffleEnsemble>,
173}
174
175impl RenderedGraph {
176 pub fn empty() -> Self {
177 Self::default()
178 }
179}
180
181#[derive(Clone, Debug)]
185pub struct LoadedFragment {
186 pub fragment_id: FragmentId,
187 pub job_id: JobId,
188 pub fragment_type_mask: FragmentTypeMask,
189 pub distribution_type: DistributionType,
190 pub vnode_count: usize,
191 pub nodes: PbStreamNode,
192 pub state_table_ids: HashSet<TableId>,
193 pub parallelism: Option<StreamingParallelism>,
194}
195
196impl From<fragment::Model> for LoadedFragment {
197 fn from(model: fragment::Model) -> Self {
198 Self {
199 fragment_id: model.fragment_id,
200 job_id: model.job_id,
201 fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
202 distribution_type: model.distribution_type,
203 vnode_count: model.vnode_count as usize,
204 nodes: model.stream_node.to_protobuf(),
205 state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
206 parallelism: model.parallelism,
207 }
208 }
209}
210
211#[derive(Default, Debug, Clone)]
212pub struct LoadedFragmentContext {
213 pub ensembles: Vec<NoShuffleEnsemble>,
214 pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
215 pub job_map: HashMap<JobId, streaming_job::Model>,
216 pub streaming_job_databases: HashMap<JobId, DatabaseId>,
217 pub database_map: HashMap<DatabaseId, database::Model>,
218 pub fragment_source_ids: HashMap<FragmentId, SourceId>,
219 pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
220}
221
222impl LoadedFragmentContext {
223 pub fn is_empty(&self) -> bool {
224 if self.ensembles.is_empty() {
225 assert!(
226 self.job_fragments.is_empty(),
227 "non-empty job fragments for empty ensembles: {:?}",
228 self.job_fragments
229 );
230 true
231 } else {
232 false
233 }
234 }
235
236 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
237 let job_ids: HashSet<JobId> = self
238 .streaming_job_databases
239 .iter()
240 .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
241 .collect();
242
243 if job_ids.is_empty() {
244 return None;
245 }
246
247 let job_fragments: HashMap<_, _> = job_ids
248 .iter()
249 .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
250 .collect();
251
252 let fragment_ids: HashSet<_> = job_fragments
253 .values()
254 .flat_map(|fragments| fragments.keys().copied())
255 .collect();
256
257 assert!(
258 !fragment_ids.is_empty(),
259 "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
260 );
261
262 let ensembles: Vec<NoShuffleEnsemble> = self
263 .ensembles
264 .iter()
265 .filter(|ensemble| {
266 if ensemble
267 .components
268 .iter()
269 .any(|fragment_id| fragment_ids.contains(fragment_id))
270 {
271 assert!(
272 ensemble
273 .components
274 .iter()
275 .all(|fragment_id| fragment_ids.contains(fragment_id)),
276 "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
277 );
278 true
279 } else {
280 false
281 }
282 })
283 .cloned()
284 .collect();
285
286 assert!(
287 !ensembles.is_empty(),
288 "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
289 );
290
291 let job_map = job_ids
292 .iter()
293 .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
294 .collect();
295
296 let streaming_job_databases = job_ids
297 .iter()
298 .filter_map(|job_id| {
299 self.streaming_job_databases
300 .get(job_id)
301 .map(|db_id| (*job_id, *db_id))
302 })
303 .collect();
304
305 let database_model = self.database_map[&database_id].clone();
306 let database_map = HashMap::from([(database_id, database_model)]);
307
308 let fragment_source_ids = self
309 .fragment_source_ids
310 .iter()
311 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
312 .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
313 .collect();
314
315 let fragment_splits = self
316 .fragment_splits
317 .iter()
318 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
319 .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
320 .collect();
321
322 Some(Self {
323 ensembles,
324 job_fragments,
325 job_map,
326 streaming_job_databases,
327 database_map,
328 fragment_source_ids,
329 fragment_splits,
330 })
331 }
332
333 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
336 let Self {
337 ensembles,
338 mut job_fragments,
339 mut job_map,
340 streaming_job_databases,
341 mut database_map,
342 mut fragment_source_ids,
343 mut fragment_splits,
344 } = self;
345
346 let mut contexts = HashMap::<DatabaseId, Self>::new();
347 let mut fragment_databases = HashMap::<FragmentId, DatabaseId>::new();
348 let mut unresolved_ensembles = 0usize;
349 let mut unresolved_ensemble_sample: Option<Vec<FragmentId>> = None;
350
351 for (job_id, database_id) in streaming_job_databases {
352 let context = contexts.entry(database_id).or_insert_with(|| {
353 let database_model = database_map
354 .remove(&database_id)
355 .expect("database should exist for streaming job");
356 Self {
357 ensembles: Vec::new(),
358 job_fragments: HashMap::new(),
359 job_map: HashMap::new(),
360 streaming_job_databases: HashMap::new(),
361 database_map: HashMap::from([(database_id, database_model)]),
362 fragment_source_ids: HashMap::new(),
363 fragment_splits: HashMap::new(),
364 }
365 });
366
367 let fragments = job_fragments
368 .remove(&job_id)
369 .expect("job fragments should exist for streaming job");
370 for fragment_id in fragments.keys().copied() {
371 fragment_databases.insert(fragment_id, database_id);
372 if let Some(source_id) = fragment_source_ids.remove(&fragment_id) {
373 context.fragment_source_ids.insert(fragment_id, source_id);
374 }
375 if let Some(splits) = fragment_splits.remove(&fragment_id) {
376 context.fragment_splits.insert(fragment_id, splits);
377 }
378 }
379
380 assert!(
381 context
382 .job_map
383 .insert(
384 job_id,
385 job_map
386 .remove(&job_id)
387 .expect("streaming job should exist for loaded context"),
388 )
389 .is_none(),
390 "duplicated streaming job"
391 );
392 assert!(
393 context.job_fragments.insert(job_id, fragments).is_none(),
394 "duplicated job fragments"
395 );
396 assert!(
397 context
398 .streaming_job_databases
399 .insert(job_id, database_id)
400 .is_none(),
401 "duplicated job database mapping"
402 );
403 }
404
405 for ensemble in ensembles {
406 let Some(database_id) = ensemble
407 .components
408 .iter()
409 .find_map(|fragment_id| fragment_databases.get(fragment_id).copied())
410 else {
411 unresolved_ensembles += 1;
412 if unresolved_ensemble_sample.is_none() {
413 unresolved_ensemble_sample =
414 Some(ensemble.components.iter().copied().collect());
415 }
416 continue;
417 };
418
419 debug_assert!(
420 ensemble
421 .components
422 .iter()
423 .all(|fragment_id| fragment_databases.get(fragment_id) == Some(&database_id)),
424 "ensemble {ensemble:?} should belong to a single database"
425 );
426
427 contexts
428 .get_mut(&database_id)
429 .expect("database context should exist for ensemble")
430 .ensembles
431 .push(ensemble);
432 }
433
434 if unresolved_ensembles > 0 {
435 tracing::warn!(
436 unresolved_ensembles,
437 ?unresolved_ensemble_sample,
438 known_fragments = fragment_databases.len(),
439 "skip ensembles without resolved database while splitting loaded context"
440 );
441 }
442 debug_assert_eq!(
443 unresolved_ensembles, 0,
444 "all ensembles should be mappable to a database"
445 );
446
447 contexts
448 }
449}
450
451pub async fn load_fragment_context<C>(
454 txn: &C,
455 ensembles: Vec<NoShuffleEnsemble>,
456) -> MetaResult<LoadedFragmentContext>
457where
458 C: ConnectionTrait,
459{
460 if ensembles.is_empty() {
461 return Ok(LoadedFragmentContext::default());
462 }
463
464 let required_fragment_ids: HashSet<_> = ensembles
465 .iter()
466 .flat_map(|ensemble| ensemble.components.iter().copied())
467 .collect();
468
469 let fragment_models = Fragment::find()
470 .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
471 .all(txn)
472 .await?;
473
474 let found_fragment_ids: HashSet<_> = fragment_models
475 .iter()
476 .map(|fragment| fragment.fragment_id)
477 .collect();
478
479 if found_fragment_ids.len() != required_fragment_ids.len() {
480 let missing = required_fragment_ids
481 .difference(&found_fragment_ids)
482 .copied()
483 .collect_vec();
484 return Err(anyhow!("fragments {:?} not found", missing).into());
485 }
486
487 let fragment_models: HashMap<_, _> = fragment_models
488 .into_iter()
489 .map(|fragment| (fragment.fragment_id, fragment))
490 .collect();
491
492 let job_ids: HashSet<_> = fragment_models
493 .values()
494 .map(|fragment| fragment.job_id)
495 .collect();
496
497 if job_ids.is_empty() {
498 return Ok(LoadedFragmentContext::default());
499 }
500
501 let jobs: HashMap<_, _> = StreamingJob::find()
502 .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
503 .all(txn)
504 .await?
505 .into_iter()
506 .map(|job| (job.job_id, job))
507 .collect();
508
509 let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
510 if found_job_ids.len() != job_ids.len() {
511 let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
512 return Err(anyhow!("streaming jobs {:?} not found", missing).into());
513 }
514
515 build_loaded_context(txn, ensembles, fragment_models, jobs).await
516}
517
518pub async fn load_fragment_context_for_jobs<C>(
521 txn: &C,
522 job_ids: HashSet<JobId>,
523) -> MetaResult<LoadedFragmentContext>
524where
525 C: ConnectionTrait,
526{
527 if job_ids.is_empty() {
528 return Ok(LoadedFragmentContext::default());
529 }
530
531 let excluded_fragments_query = FragmentRelation::find()
532 .select_only()
533 .column(fragment_relation::Column::TargetFragmentId)
534 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
535 .into_query();
536
537 let condition = Condition::all()
538 .add(fragment::Column::JobId.is_in(job_ids.clone()))
539 .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
540
541 let fragments: Vec<FragmentId> = Fragment::find()
542 .select_only()
543 .column(fragment::Column::FragmentId)
544 .filter(condition)
545 .into_tuple()
546 .all(txn)
547 .await?;
548
549 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
550
551 let fragments = Fragment::find()
552 .filter(
553 fragment::Column::FragmentId.is_in(
554 ensembles
555 .iter()
556 .flat_map(|graph| graph.components.iter())
557 .cloned()
558 .collect_vec(),
559 ),
560 )
561 .all(txn)
562 .await?;
563
564 let fragment_map: HashMap<_, _> = fragments
565 .into_iter()
566 .map(|fragment| (fragment.fragment_id, fragment))
567 .collect();
568
569 let job_ids = fragment_map
570 .values()
571 .map(|fragment| fragment.job_id)
572 .collect::<BTreeSet<_>>()
573 .into_iter()
574 .collect_vec();
575
576 let jobs: HashMap<_, _> = StreamingJob::find()
577 .filter(streaming_job::Column::JobId.is_in(job_ids))
578 .all(txn)
579 .await?
580 .into_iter()
581 .map(|job| (job.job_id, job))
582 .collect();
583
584 build_loaded_context(txn, ensembles, fragment_map, jobs).await
585}
586
587pub(crate) fn render_actor_assignments(
590 actor_id_counter: &AtomicU32,
591 worker_map: &HashMap<WorkerId, WorkerNode>,
592 loaded: &LoadedFragmentContext,
593) -> MetaResult<RenderedGraph> {
594 let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
595 render_actor_assignments_with_allocator(&mut actor_id_allocator, worker_map, loaded)
596}
597
598pub(crate) fn preview_actor_assignments(
601 worker_map: &HashMap<WorkerId, WorkerNode>,
602 loaded: &LoadedFragmentContext,
603) -> MetaResult<RenderedGraph> {
604 let mut actor_id_allocator = RenderActorIdAllocator::Preview { next_actor_id: 0 };
605 render_actor_assignments_with_allocator(&mut actor_id_allocator, worker_map, loaded)
606}
607
608pub(crate) fn materialize_actor_assignments(
611 actor_id_counter: &AtomicU32,
612 rendered: RenderedGraph,
613) -> RenderedGraph {
614 let RenderedGraph {
615 fragments,
616 ensembles,
617 } = rendered;
618 #[cfg(debug_assertions)]
619 let preview_slots = collect_relative_actor_slots_by_fragment(&fragments);
620
621 let mut ordered_fragments = fragments
622 .into_iter()
623 .flat_map(|(database_id, jobs)| {
624 jobs.into_iter().flat_map(move |(job_id, fragments)| {
625 fragments.into_iter().map(move |(fragment_id, fragment)| {
626 (database_id, job_id, fragment_id, fragment)
627 })
628 })
629 })
630 .collect_vec();
631 ordered_fragments.sort_by_key(|(_, _, fragment_id, fragment)| {
642 (
643 fragment
644 .actors
645 .keys()
646 .copied()
647 .min()
648 .map(|actor_id| actor_id.as_raw_id())
649 .unwrap_or(u32::MAX),
650 *fragment_id,
651 )
652 });
653
654 let mut materialized = FragmentRenderMap::new();
655 for (database_id, job_id, fragment_id, fragment) in ordered_fragments {
656 materialized
657 .entry(database_id)
658 .or_default()
659 .entry(job_id)
660 .or_default()
661 .insert(
662 fragment_id,
663 materialize_fragment(fragment, actor_id_counter),
664 );
665 }
666
667 #[cfg(debug_assertions)]
668 assert_materialization_preserves_preview_slots(&preview_slots, &materialized);
669
670 RenderedGraph {
671 fragments: materialized,
672 ensembles,
673 }
674}
675
676#[cfg(debug_assertions)]
677type RelativeActorSlot = (u32, WorkerId, Option<Vec<usize>>, Vec<SplitId>);
678
679#[cfg(debug_assertions)]
680fn collect_relative_actor_slots(fragment: &InflightFragmentInfo) -> Vec<RelativeActorSlot> {
681 let base = fragment
682 .actors
683 .keys()
684 .copied()
685 .min()
686 .map(|actor_id| actor_id.as_raw_id())
687 .unwrap_or_default();
688
689 let mut entries = fragment
690 .actors
691 .iter()
692 .map(|(&actor_id, info)| {
693 let idx = actor_id.as_raw_id() - base;
694 let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
695 bitmap
696 .iter_vnodes()
697 .map(|vnode| vnode.to_index())
698 .collect_vec()
699 });
700 let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
701 splits.sort_unstable();
702 (idx, info.worker_id, vnode_bitmap, splits)
703 })
704 .collect_vec();
705 entries.sort_unstable_by_key(|(idx, ..)| *idx);
706 entries
707}
708
709#[cfg(debug_assertions)]
710fn collect_relative_actor_slots_by_fragment(
711 fragments: &FragmentRenderMap,
712) -> HashMap<FragmentId, Vec<RelativeActorSlot>> {
713 fragments
714 .values()
715 .flat_map(|jobs| jobs.values())
716 .flatten()
717 .map(|(fragment_id, fragment)| (*fragment_id, collect_relative_actor_slots(fragment)))
718 .collect()
719}
720
721#[cfg(debug_assertions)]
722fn assert_materialization_preserves_preview_slots(
723 preview_slots: &HashMap<FragmentId, Vec<RelativeActorSlot>>,
724 materialized_fragments: &FragmentRenderMap,
725) {
726 for (database_id, jobs) in materialized_fragments {
727 for (job_id, fragments) in jobs {
728 for (fragment_id, fragment) in fragments {
729 let expected_slots = preview_slots.get(fragment_id).unwrap_or_else(|| {
730 panic!(
731 "preview fragment {fragment_id} for database {database_id}, job {job_id} not found"
732 )
733 });
734
735 assert_eq!(
736 collect_relative_actor_slots(fragment),
737 *expected_slots,
738 "materialization changed preview slot ordering for database {database_id}, job {job_id}, fragment {fragment_id}"
739 );
740 }
741 }
742 }
743}
744
745fn render_actor_assignments_with_allocator(
746 actor_id_allocator: &mut RenderActorIdAllocator<'_>,
747 worker_map: &HashMap<WorkerId, WorkerNode>,
748 loaded: &LoadedFragmentContext,
749) -> MetaResult<RenderedGraph> {
750 if loaded.is_empty() {
751 return Ok(RenderedGraph::empty());
752 }
753
754 let render_context = RenderActorsContext {
755 fragment_source_ids: &loaded.fragment_source_ids,
756 fragment_splits: &loaded.fragment_splits,
757 streaming_job_databases: &loaded.streaming_job_databases,
758 database_map: &loaded.database_map,
759 };
760
761 let fragments = render_actors_with_allocator(
762 actor_id_allocator,
763 &loaded.ensembles,
764 &loaded.job_fragments,
765 &loaded.job_map,
766 worker_map,
767 render_context,
768 )?;
769
770 Ok(RenderedGraph {
771 fragments,
772 ensembles: loaded.ensembles.clone(),
773 })
774}
775
776fn materialize_fragment(
777 mut fragment: InflightFragmentInfo,
778 actor_id_counter: &AtomicU32,
779) -> InflightFragmentInfo {
780 let actor_count = u32::try_from(fragment.actors.len()).expect("actor count exceeds u32::MAX");
781 let actor_id_base: ActorId = actor_id_counter
782 .fetch_add(actor_count, Ordering::Relaxed)
783 .into();
784
785 let mut actors = fragment.actors.into_iter().collect_vec();
786 actors.sort_by_key(|(actor_id, _)| *actor_id);
787
788 fragment.actors = actors
789 .into_iter()
790 .enumerate()
791 .map(|(idx, (_, info))| {
792 let actor_offset = u32::try_from(idx).expect("actor index exceeds u32::MAX");
793 (actor_id_base + actor_offset, info)
794 })
795 .collect();
796
797 fragment
798}
799
800async fn build_loaded_context<C>(
801 txn: &C,
802 ensembles: Vec<NoShuffleEnsemble>,
803 fragment_models: HashMap<FragmentId, fragment::Model>,
804 job_map: HashMap<JobId, streaming_job::Model>,
805) -> MetaResult<LoadedFragmentContext>
806where
807 C: ConnectionTrait,
808{
809 if ensembles.is_empty() {
810 return Ok(LoadedFragmentContext::default());
811 }
812
813 let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
814 for (fragment_id, model) in fragment_models {
815 job_fragments
816 .entry(model.job_id)
817 .or_default()
818 .try_insert(fragment_id, LoadedFragment::from(model))
819 .expect("duplicate fragment id for job");
820 }
821
822 #[cfg(debug_assertions)]
823 {
824 debug_sanity_check(&ensembles, &job_fragments, &job_map);
825 }
826
827 let (fragment_source_ids, fragment_splits) =
828 resolve_source_fragments(txn, &job_fragments).await?;
829
830 let job_ids = job_map.keys().copied().collect_vec();
831
832 let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
833 .select_only()
834 .column(streaming_job::Column::JobId)
835 .column(object::Column::DatabaseId)
836 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
837 .filter(streaming_job::Column::JobId.is_in(job_ids))
838 .into_tuple()
839 .all(txn)
840 .await?
841 .into_iter()
842 .collect();
843
844 let database_map: HashMap<_, _> = Database::find()
845 .filter(
846 database::Column::DatabaseId
847 .is_in(streaming_job_databases.values().copied().collect_vec()),
848 )
849 .all(txn)
850 .await?
851 .into_iter()
852 .map(|db| (db.database_id, db))
853 .collect();
854
855 Ok(LoadedFragmentContext {
856 ensembles,
857 job_fragments,
858 job_map,
859 streaming_job_databases,
860 database_map,
861 fragment_source_ids,
862 fragment_splits,
863 })
864}
865
866struct RenderActorsContext<'a> {
869 fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
870 fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
871 streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
872 database_map: &'a HashMap<DatabaseId, database::Model>,
873}
874
875enum RenderActorIdAllocator<'a> {
876 Persistent(&'a AtomicU32),
877 Preview { next_actor_id: u32 },
878}
879
880impl RenderActorIdAllocator<'_> {
881 fn allocate_block(&mut self, actor_count: u32) -> ActorId {
882 match self {
883 Self::Persistent(actor_id_counter) => {
884 let actor_id_base: ActorId = actor_id_counter
885 .fetch_add(actor_count, Ordering::Relaxed)
886 .into();
887 actor_id_base
888 }
889 Self::Preview { next_actor_id } => {
890 let actor_id_base = *next_actor_id;
891 *next_actor_id = next_actor_id
892 .checked_add(actor_count)
893 .expect("preview actor id overflow");
894 let actor_id_base: ActorId = actor_id_base.into();
895 actor_id_base
896 }
897 }
898 }
899}
900
901#[cfg(test)]
902fn render_actors(
903 actor_id_counter: &AtomicU32,
904 ensembles: &[NoShuffleEnsemble],
905 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
906 job_map: &HashMap<JobId, streaming_job::Model>,
907 worker_map: &HashMap<WorkerId, WorkerNode>,
908 context: RenderActorsContext<'_>,
909) -> MetaResult<FragmentRenderMap> {
910 let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
911 render_actors_with_allocator(
912 &mut actor_id_allocator,
913 ensembles,
914 job_fragments,
915 job_map,
916 worker_map,
917 context,
918 )
919}
920
921fn render_actors_with_allocator(
922 actor_id_allocator: &mut RenderActorIdAllocator<'_>,
923 ensembles: &[NoShuffleEnsemble],
924 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
925 job_map: &HashMap<JobId, streaming_job::Model>,
926 worker_map: &HashMap<WorkerId, WorkerNode>,
927 context: RenderActorsContext<'_>,
928) -> MetaResult<FragmentRenderMap> {
929 let RenderActorsContext {
930 fragment_source_ids,
931 fragment_splits: fragment_splits_map,
932 streaming_job_databases,
933 database_map,
934 } = context;
935
936 let mut all_fragments: FragmentRenderMap = HashMap::new();
937 let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
938 .values()
939 .flat_map(|fragments| fragments.iter())
940 .map(|(fragment_id, fragment)| (*fragment_id, fragment))
941 .collect();
942
943 for NoShuffleEnsemble {
944 entries,
945 components,
946 } in ensembles
947 {
948 tracing::debug!("rendering ensemble entries {:?}", entries);
949
950 let entry_fragments = entries
951 .iter()
952 .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
953 .collect_vec();
954
955 let entry_fragment_parallelism = Itertools::exactly_one(
956 entry_fragments
957 .iter()
958 .map(|fragment| fragment.parallelism.clone())
959 .dedup(),
960 )
961 .map_err(|_| {
962 anyhow!(
963 "entry fragments {:?} have inconsistent parallelism settings",
964 entries.iter().copied().collect_vec()
965 )
966 })?;
967
968 let (job_id, distribution_type, vnode_count) = Itertools::exactly_one(
969 entry_fragments
970 .iter()
971 .map(|f| (f.job_id, f.distribution_type, f.vnode_count))
972 .dedup(),
973 )
974 .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
975
976 let job = job_map
977 .get(&job_id)
978 .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
979
980 let database_resource_group = streaming_job_databases
981 .get(&job_id)
982 .and_then(|database_id| database_map.get(database_id))
983 .unwrap()
984 .resource_group
985 .clone();
986
987 let source_entry_fragment = entry_fragments.iter().find(|f| {
988 let mask = f.fragment_type_mask;
989 if mask.contains(FragmentTypeFlag::Source) {
990 assert!(!mask.contains(FragmentTypeFlag::SourceScan))
991 }
992 mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
993 });
994
995 let actor_template = EnsembleActorTemplate::render_new(
996 job,
997 worker_map,
998 entry_fragment_parallelism,
999 database_resource_group,
1000 distribution_type,
1001 vnode_count,
1002 )?;
1003
1004 let source_splits = match source_entry_fragment {
1005 Some(entry_fragment) => {
1006 let source_id = fragment_source_ids
1007 .get(&entry_fragment.fragment_id)
1008 .ok_or_else(|| {
1009 anyhow!(
1010 "missing source id in source fragment {}",
1011 entry_fragment.fragment_id
1012 )
1013 })?;
1014
1015 let entry_fragment_id = entry_fragment.fragment_id;
1016
1017 let splits = fragment_splits_map
1018 .get(&entry_fragment_id)
1019 .cloned()
1020 .unwrap_or_default();
1021
1022 let splits: std::collections::BTreeMap<_, _> =
1023 splits.into_iter().map(|s| (s.id(), s)).collect();
1024 let splits = actor_template.assign_splits(entry_fragment_id, splits);
1025 Some((splits, *source_id))
1026 }
1027 None => None,
1028 };
1029
1030 for component_fragment_id in components {
1031 let fragment = fragment_lookup.get(component_fragment_id).unwrap();
1032 let fragment_id = fragment.fragment_id;
1033 let job_id = fragment.job_id;
1034 let fragment_type_mask = fragment.fragment_type_mask;
1035 let distribution_type = fragment.distribution_type;
1036 let stream_node = &fragment.nodes;
1037 let state_table_ids = &fragment.state_table_ids;
1038 let vnode_count = fragment.vnode_count;
1039 let source_id = fragment_source_ids.get(&fragment_id).cloned();
1040
1041 let aligner = ComponentFragmentAligner::new(&actor_template, actor_id_allocator);
1042 let actors = aligner.align_component_actor(distribution_type);
1043 let mut splits = source_id
1044 .map(|source_id| {
1045 let (fragment_splits, shared_source_id) = source_splits.as_ref().unwrap();
1046 assert_eq!(*shared_source_id, source_id);
1047 aligner.align_component_splits(fragment_splits)
1048 })
1049 .unwrap_or_default();
1050
1051 let actors: HashMap<ActorId, InflightActorInfo> = actors
1052 .into_iter()
1053 .map(|(actor_id, (worker_id, vnode_bitmap))| {
1054 (
1055 actor_id,
1056 InflightActorInfo {
1057 worker_id,
1058 vnode_bitmap,
1059 splits: splits.remove(&actor_id).unwrap_or_default(),
1060 },
1061 )
1062 })
1063 .collect();
1064
1065 let fragment = InflightFragmentInfo {
1066 fragment_id,
1067 distribution_type,
1068 fragment_type_mask,
1069 vnode_count,
1070 nodes: stream_node.clone(),
1071 actors,
1072 state_table_ids: state_table_ids.clone(),
1073 };
1074
1075 let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
1076 anyhow!("streaming job {job_id} not found in streaming_job_databases")
1077 })?;
1078
1079 all_fragments
1080 .entry(database_id)
1081 .or_default()
1082 .entry(job_id)
1083 .or_default()
1084 .insert(fragment_id, fragment);
1085 }
1086 }
1087
1088 Ok(all_fragments)
1089}
1090
1091pub(crate) struct EnsembleActorTemplate {
1092 assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>>,
1093 distribution_type: DistributionType,
1094 actor_count: u32,
1095}
1096
1097impl EnsembleActorTemplate {
1098 pub(crate) fn render_new(
1099 job: &streaming_job::Model,
1100 worker_map: &HashMap<WorkerId, WorkerNode>,
1101 entry_fragment_parallelism: Option<StreamingParallelism>,
1102 database_resource_group: String,
1103 distribution_type: DistributionType,
1104 vnode_count: usize,
1105 ) -> MetaResult<Self> {
1106 let job_id = job.job_id;
1107 let job_strategy = job
1108 .adaptive_parallelism_strategy
1109 .as_deref()
1110 .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1111 let backfill_job_strategy = job
1112 .backfill_adaptive_parallelism_strategy
1113 .as_deref()
1114 .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1115
1116 let resource_group = match &job.specific_resource_group {
1117 None => database_resource_group,
1118 Some(resource_group) => resource_group.clone(),
1119 };
1120
1121 let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
1122 .iter()
1123 .filter_map(|(worker_id, worker)| {
1124 if worker
1125 .resource_group()
1126 .as_deref()
1127 .unwrap_or(DEFAULT_RESOURCE_GROUP)
1128 == resource_group.as_str()
1129 {
1130 Some((
1131 *worker_id,
1132 worker
1133 .parallelism()
1134 .expect("should have parallelism for compute node")
1135 .try_into()
1136 .expect("parallelism for compute node"),
1137 ))
1138 } else {
1139 None
1140 }
1141 })
1142 .collect();
1143
1144 let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
1145
1146 let effective_job_parallelism = if job.job_status != JobStatus::Created {
1147 job.backfill_parallelism
1148 .as_ref()
1149 .unwrap_or(&job.parallelism)
1150 } else {
1151 &job.parallelism
1152 };
1153 let effective_job_strategy = if job.job_status != JobStatus::Created {
1154 backfill_job_strategy.or(job_strategy)
1155 } else {
1156 job_strategy
1157 };
1158
1159 let target_parallelism = match entry_fragment_parallelism
1160 .as_ref()
1161 .unwrap_or(effective_job_parallelism)
1162 {
1163 StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
1164 let effective_job_strategy = effective_job_strategy.unwrap_or_else(|| {
1165 tracing::warn!(
1166 job_id = %job_id,
1167 ?effective_job_parallelism,
1168 "adaptive/custom job is missing adaptive strategy in StreamContext; falling back to default"
1169 );
1170 AdaptiveParallelismStrategy::default()
1171 });
1172 effective_job_strategy.compute_target_parallelism(total_parallelism)
1173 }
1174 StreamingParallelism::Fixed(n) => *n,
1175 };
1176 let actual_parallelism = target_parallelism
1177 .min(vnode_count)
1178 .min(job.max_parallelism as usize);
1179 if actual_parallelism != target_parallelism {
1180 tracing::warn!(
1181 job_id = %job_id,
1182 target_parallelism,
1183 actual_parallelism,
1184 vnode_count,
1185 job_max_parallelism = job.max_parallelism,
1186 ?effective_job_parallelism,
1187 ?entry_fragment_parallelism,
1188 "streaming job parallelism was capped by vnode count or max parallelism"
1189 );
1190 }
1191
1192 tracing::debug!(
1193 "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
1194 job_id,
1195 actual_parallelism,
1196 job.parallelism,
1197 total_parallelism,
1198 job.max_parallelism,
1199 vnode_count,
1200 entry_fragment_parallelism
1201 );
1202
1203 let assigner = AssignerBuilder::new(job_id).build();
1204
1205 let actors = (0..(actual_parallelism as u32)).collect_vec();
1206 let vnodes = (0..vnode_count).collect_vec();
1207
1208 let raw_assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
1209
1210 let assignment = raw_assignment
1211 .into_iter()
1212 .map(|(worker_id, actors)| {
1213 let actors = actors
1214 .into_iter()
1215 .map(|(actor_idx, vnodes)| {
1216 let bitmap = match distribution_type {
1217 DistributionType::Single => None,
1218 DistributionType::Hash => {
1219 Some(Bitmap::from_indices(vnode_count, &vnodes))
1220 }
1221 };
1222 (actor_idx, bitmap)
1223 })
1224 .collect();
1225 (worker_id, actors)
1226 })
1227 .collect();
1228
1229 let actor_count = u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
1230
1231 Ok(Self {
1232 assignment,
1233 distribution_type,
1234 actor_count,
1235 })
1236 }
1237
1238 pub(crate) fn from_existing_inflight_fragment(fragment: &InflightFragmentInfo) -> Self {
1239 if fragment.actors.is_empty() {
1240 return Self {
1241 assignment: BTreeMap::new(),
1242 distribution_type: fragment.distribution_type,
1243 actor_count: 0,
1244 };
1245 }
1246
1247 let actor_count = fragment.actors.len() as u32;
1248
1249 let mut assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>> = BTreeMap::new();
1250 for (actor_idx, (&_actor_id, actor_info)) in fragment.actors.iter().enumerate() {
1252 let actor_idx = actor_idx as u32;
1253 assignment
1254 .entry(actor_info.worker_id)
1255 .or_default()
1256 .insert(actor_idx, actor_info.vnode_bitmap.clone());
1257 }
1258
1259 Self {
1260 assignment,
1261 distribution_type: fragment.distribution_type,
1262 actor_count,
1263 }
1264 }
1265
1266 pub(crate) fn assert_aligned_with(
1273 &self,
1274 other: &Self,
1275 self_fragment_id: FragmentId,
1276 other_fragment_id: FragmentId,
1277 ) {
1278 let mapping = resolve_no_shuffle_actor_mapping(
1279 self.distribution_type,
1280 self.assignment
1281 .iter()
1282 .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1283 other.distribution_type,
1284 other
1285 .assignment
1286 .iter()
1287 .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1288 );
1289
1290 for ((self_worker, _self_idx), (other_worker, _other_idx)) in &mapping {
1291 assert_eq!(
1292 self_worker, other_worker,
1293 "fragments {} and {} disagree on worker placement: {:?}",
1294 self_fragment_id, other_fragment_id, mapping,
1295 );
1296 }
1297 }
1298
1299 fn assign_splits(
1300 &self,
1301 entry_fragment_id: FragmentId,
1302 splits: BTreeMap<SplitId, SplitImpl>,
1303 ) -> HashMap<u32, Vec<SplitImpl>> {
1304 {
1305 {
1306 let empty_actor_splits: HashMap<_, _> = self
1307 .assignment
1308 .values()
1309 .flat_map(|actors| actors.keys())
1310 .map(|actor_id| (*actor_id, vec![]))
1311 .collect();
1312
1313 crate::stream::source_manager::reassign_splits(
1314 entry_fragment_id,
1315 empty_actor_splits,
1316 &splits,
1317 SplitDiffOptions::default(),
1318 )
1319 .unwrap_or_default()
1320 }
1321 }
1322 }
1323}
1324
1325pub(crate) struct ComponentFragmentAligner<'a> {
1326 actor_template: &'a EnsembleActorTemplate,
1327 actor_id_base: ActorId,
1328}
1329
1330impl<'a> ComponentFragmentAligner<'a> {
1331 fn new(
1332 actor_template: &'a EnsembleActorTemplate,
1333 actor_id_allocator: &mut RenderActorIdAllocator<'_>,
1334 ) -> Self {
1335 let actor_id_base = actor_id_allocator.allocate_block(actor_template.actor_count);
1336 Self {
1337 actor_template,
1338 actor_id_base,
1339 }
1340 }
1341
1342 pub(crate) fn new_persistent(
1343 actor_template: &'a EnsembleActorTemplate,
1344 actor_id_counter: &AtomicU32,
1345 ) -> Self {
1346 let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
1347 Self::new(actor_template, &mut actor_id_allocator)
1348 }
1349
1350 pub(crate) fn align_component_actor(
1351 &self,
1352 distribution_type: DistributionType,
1353 ) -> HashMap<ActorId, (WorkerId, Option<Bitmap>)> {
1354 let EnsembleActorTemplate {
1355 assignment,
1356 actor_count,
1357 distribution_type: _,
1358 } = &self.actor_template;
1359 let actor_id_base = self.actor_id_base;
1360 {
1361 assignment
1362 .iter()
1363 .flat_map(|(worker_id, actors)| {
1364 actors
1365 .iter()
1366 .map(move |(actor_idx, bitmap)| (worker_id, actor_idx, bitmap))
1367 })
1368 .map(|(&worker_id, &actor_idx, bitmap)| {
1369 if distribution_type == DistributionType::Single {
1370 assert_eq!(*actor_count, 1);
1371 }
1372
1373 let actor_id = actor_id_base + actor_idx;
1374
1375 (actor_id, (worker_id, bitmap.clone()))
1376 })
1377 .collect()
1378 }
1379 }
1380
1381 pub(crate) fn align_component_splits(
1382 &self,
1383 split_assignment: &HashMap<u32, Vec<SplitImpl>>,
1384 ) -> HashMap<ActorId, Vec<SplitImpl>> {
1385 (0..self.actor_template.actor_count)
1386 .filter_map(|actor_idx| {
1387 split_assignment
1388 .get(&actor_idx)
1389 .map(|splits| ((self.actor_id_base + actor_idx), splits.clone()))
1390 })
1391 .collect()
1392 }
1393}
1394
1395#[cfg(debug_assertions)]
1396fn debug_sanity_check(
1397 ensembles: &[NoShuffleEnsemble],
1398 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1399 jobs: &HashMap<JobId, streaming_job::Model>,
1400) {
1401 let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
1402 .iter()
1403 .flat_map(|(job_id, fragments)| {
1404 fragments
1405 .iter()
1406 .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
1407 })
1408 .collect();
1409
1410 debug_assert!(
1412 ensembles
1413 .iter()
1414 .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
1415 "entries must be subset of components"
1416 );
1417
1418 let mut missing_fragments = BTreeSet::new();
1419 let mut missing_jobs = BTreeSet::new();
1420
1421 for fragment_id in ensembles
1422 .iter()
1423 .flat_map(|ensemble| ensemble.components.iter())
1424 {
1425 match fragment_lookup.get(fragment_id) {
1426 Some((fragment, job_id)) => {
1427 if !jobs.contains_key(&fragment.job_id) {
1428 missing_jobs.insert(*job_id);
1429 }
1430 }
1431 None => {
1432 missing_fragments.insert(*fragment_id);
1433 }
1434 }
1435 }
1436
1437 debug_assert!(
1438 missing_fragments.is_empty(),
1439 "missing fragments in fragment_map: {:?}",
1440 missing_fragments
1441 );
1442
1443 debug_assert!(
1444 missing_jobs.is_empty(),
1445 "missing jobs for fragments' job_id: {:?}",
1446 missing_jobs
1447 );
1448
1449 for ensemble in ensembles {
1450 let unique_vnode_counts: Vec<_> = ensemble
1451 .components
1452 .iter()
1453 .flat_map(|fragment_id| {
1454 fragment_lookup
1455 .get(fragment_id)
1456 .map(|(fragment, _)| fragment.vnode_count)
1457 })
1458 .unique()
1459 .collect();
1460
1461 debug_assert!(
1462 unique_vnode_counts.len() <= 1,
1463 "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
1464 ensemble.components,
1465 unique_vnode_counts
1466 );
1467 }
1468}
1469
1470async fn resolve_source_fragments<C>(
1471 txn: &C,
1472 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1473) -> MetaResult<(
1474 HashMap<FragmentId, SourceId>,
1475 HashMap<FragmentId, Vec<SplitImpl>>,
1476)>
1477where
1478 C: ConnectionTrait,
1479{
1480 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1481 for (fragment_id, fragment) in job_fragments.values().flatten() {
1482 let mask = fragment.fragment_type_mask;
1483 if mask.contains(FragmentTypeFlag::Source)
1484 && let Some(source_id) = fragment.nodes.find_stream_source()
1485 {
1486 source_fragment_ids
1487 .entry(source_id)
1488 .or_default()
1489 .insert(*fragment_id);
1490 }
1491
1492 if mask.contains(FragmentTypeFlag::SourceScan)
1493 && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1494 {
1495 source_fragment_ids
1496 .entry(source_id)
1497 .or_default()
1498 .insert(*fragment_id);
1499 }
1500 }
1501
1502 let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1503 .iter()
1504 .flat_map(|(source_id, fragment_ids)| {
1505 fragment_ids
1506 .iter()
1507 .map(|fragment_id| (*fragment_id, *source_id))
1508 })
1509 .collect();
1510
1511 let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1512
1513 let fragment_splits: Vec<_> = FragmentSplits::find()
1514 .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1515 .all(txn)
1516 .await?;
1517
1518 let fragment_splits: HashMap<_, _> = fragment_splits
1519 .into_iter()
1520 .flat_map(|model| {
1521 model.splits.map(|splits| {
1522 (
1523 model.fragment_id,
1524 splits
1525 .to_protobuf()
1526 .splits
1527 .iter()
1528 .flat_map(SplitImpl::try_from)
1529 .collect_vec(),
1530 )
1531 })
1532 })
1533 .collect();
1534
1535 Ok((fragment_source_ids, fragment_splits))
1536}
1537
1538#[derive(Debug)]
1540pub struct ActorGraph<'a> {
1541 pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1542 pub locations: &'a HashMap<ActorId, WorkerId>,
1543}
1544
1545#[derive(Debug, Clone)]
1546pub struct NoShuffleEnsemble {
1547 entries: HashSet<FragmentId>,
1548 components: HashSet<FragmentId>,
1549}
1550
1551impl NoShuffleEnsemble {
1552 pub(crate) fn singleton(fragment_id: FragmentId) -> Self {
1554 Self {
1555 entries: HashSet::from_iter([fragment_id]),
1556 components: HashSet::from_iter([fragment_id]),
1557 }
1558 }
1559
1560 #[cfg(test)]
1561 pub fn for_test(
1562 entries: impl IntoIterator<Item = FragmentId>,
1563 components: impl IntoIterator<Item = FragmentId>,
1564 ) -> Self {
1565 let entries = entries.into_iter().collect();
1566 let components = components.into_iter().collect();
1567 Self {
1568 entries,
1569 components,
1570 }
1571 }
1572
1573 pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1574 self.components.iter().cloned()
1575 }
1576
1577 pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1578 self.entries.iter().copied()
1579 }
1580
1581 pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1582 self.components.iter().copied()
1583 }
1584
1585 pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1586 self.entries.contains(fragment_id)
1587 }
1588}
1589
1590pub async fn find_fragment_no_shuffle_dags_detailed(
1591 db: &impl ConnectionTrait,
1592 initial_fragment_ids: &[FragmentId],
1593) -> MetaResult<Vec<NoShuffleEnsemble>> {
1594 let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1595 .columns([
1596 fragment_relation::Column::SourceFragmentId,
1597 fragment_relation::Column::TargetFragmentId,
1598 ])
1599 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1600 .into_tuple()
1601 .all(db)
1602 .await?;
1603
1604 let (forward_edges, backward_edges) =
1605 build_no_shuffle_fragment_graph_edges(all_no_shuffle_relations);
1606
1607 find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1608}
1609
1610pub(crate) fn build_no_shuffle_fragment_graph_edges(
1611 relations: impl IntoIterator<Item = (FragmentId, FragmentId)>,
1612) -> (
1613 HashMap<FragmentId, Vec<FragmentId>>,
1614 HashMap<FragmentId, Vec<FragmentId>>,
1615) {
1616 let mut forward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1617 let mut backward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1618
1619 for (src, dst) in relations {
1620 forward_edges.entry(src).or_default().insert(dst);
1621 backward_edges.entry(dst).or_default().insert(src);
1622 }
1623
1624 let forward_edges = forward_edges
1625 .into_iter()
1626 .map(|(src, dst_set)| (src, dst_set.into_iter().collect()))
1627 .collect();
1628 let backward_edges = backward_edges
1629 .into_iter()
1630 .map(|(dst, src_set)| (dst, src_set.into_iter().collect()))
1631 .collect();
1632
1633 (forward_edges, backward_edges)
1634}
1635
1636pub(crate) fn find_no_shuffle_graphs(
1637 initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1638 forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1639 backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1640) -> MetaResult<Vec<NoShuffleEnsemble>> {
1641 let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1642 let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1643
1644 for &init_id in initial_fragment_ids {
1645 let init_id = init_id.into();
1646 if globally_visited.contains(&init_id) {
1647 continue;
1648 }
1649
1650 let mut components = HashSet::new();
1652 let mut queue: VecDeque<FragmentId> = VecDeque::new();
1653
1654 queue.push_back(init_id);
1655 globally_visited.insert(init_id);
1656
1657 while let Some(current_id) = queue.pop_front() {
1658 components.insert(current_id);
1659 let neighbors = forward_edges
1660 .get(¤t_id)
1661 .into_iter()
1662 .flatten()
1663 .chain(backward_edges.get(¤t_id).into_iter().flatten());
1664
1665 for &neighbor_id in neighbors {
1666 if globally_visited.insert(neighbor_id) {
1667 queue.push_back(neighbor_id);
1668 }
1669 }
1670 }
1671
1672 let mut entries = HashSet::new();
1674 for &node_id in &components {
1675 let is_root = match backward_edges.get(&node_id) {
1676 Some(parents) => parents.iter().all(|p| !components.contains(p)),
1677 None => true,
1678 };
1679 if is_root {
1680 entries.insert(node_id);
1681 }
1682 }
1683
1684 if !entries.is_empty() {
1686 graphs.push(NoShuffleEnsemble {
1687 entries,
1688 components,
1689 });
1690 }
1691 }
1692
1693 Ok(graphs)
1694}
1695
1696#[cfg(test)]
1697mod tests {
1698 use std::collections::{BTreeSet, HashMap, HashSet};
1699 use std::sync::Arc;
1700
1701 use risingwave_connector::source::SplitImpl;
1702 use risingwave_connector::source::test_source::TestSourceSplit;
1703 use risingwave_meta_model::{CreateType, JobStatus};
1704 use risingwave_pb::common::WorkerType;
1705 use risingwave_pb::common::worker_node::Property as WorkerProperty;
1706 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1707
1708 use super::*;
1709
1710 type Edges = (
1713 HashMap<FragmentId, Vec<FragmentId>>,
1714 HashMap<FragmentId, Vec<FragmentId>>,
1715 );
1716
1717 fn build_edges(relations: &[(u32, u32)]) -> Edges {
1720 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1721 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1722 for &(src, dst) in relations {
1723 forward_edges
1724 .entry(src.into())
1725 .or_default()
1726 .push(dst.into());
1727 backward_edges
1728 .entry(dst.into())
1729 .or_default()
1730 .push(src.into());
1731 }
1732 (forward_edges, backward_edges)
1733 }
1734
1735 fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1737 ids.iter().map(|id| (*id).into()).collect()
1738 }
1739
1740 fn build_fragment(
1741 fragment_id: FragmentId,
1742 job_id: JobId,
1743 fragment_type_mask: i32,
1744 distribution_type: DistributionType,
1745 vnode_count: i32,
1746 parallelism: StreamingParallelism,
1747 ) -> LoadedFragment {
1748 LoadedFragment {
1749 fragment_id,
1750 job_id,
1751 fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1752 distribution_type,
1753 vnode_count: vnode_count as usize,
1754 nodes: PbStreamNode::default(),
1755 state_table_ids: HashSet::new(),
1756 parallelism: Some(parallelism),
1757 }
1758 }
1759
1760 type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1761
1762 fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1763 let base = fragment.actors.keys().copied().min().unwrap_or_default();
1764
1765 let mut entries: Vec<_> = fragment
1766 .actors
1767 .iter()
1768 .map(|(&actor_id, info)| {
1769 let idx = actor_id.as_raw_id() - base.as_raw_id();
1770 let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1771 bitmap
1772 .iter()
1773 .enumerate()
1774 .filter_map(|(pos, is_set)| is_set.then_some(pos))
1775 .collect::<Vec<_>>()
1776 });
1777 let splits = info
1778 .splits
1779 .iter()
1780 .map(|split| split.id().to_string())
1781 .collect::<Vec<_>>();
1782 (idx.into(), info.worker_id, vnode_indices, splits)
1783 })
1784 .collect();
1785
1786 entries.sort_by_key(|(idx, _, _, _)| *idx);
1787 entries
1788 }
1789
1790 fn build_worker_node(
1791 id: impl Into<WorkerId>,
1792 parallelism: usize,
1793 resource_group: &str,
1794 ) -> WorkerNode {
1795 WorkerNode {
1796 id: id.into(),
1797 r#type: WorkerType::ComputeNode as i32,
1798 property: Some(WorkerProperty {
1799 is_streaming: true,
1800 parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1801 resource_group: Some(resource_group.to_owned()),
1802 ..Default::default()
1803 }),
1804 ..Default::default()
1805 }
1806 }
1807
1808 #[test]
1809 fn test_single_linear_chain() {
1810 let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1813 let initial_ids = &[2];
1814
1815 let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1817
1818 assert!(result.is_ok());
1820 let graphs = result.unwrap();
1821
1822 assert_eq!(graphs.len(), 1);
1823 let graph = &graphs[0];
1824 assert_eq!(graph.entries, to_hashset(&[1]));
1825 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1826 }
1827
1828 #[test]
1829 fn test_two_disconnected_graphs() {
1830 let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1833 let initial_ids = &[2, 10];
1834
1835 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1837
1838 assert_eq!(graphs.len(), 2);
1840
1841 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1843
1844 assert_eq!(graphs[0].entries, to_hashset(&[1]));
1846 assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1847
1848 assert_eq!(graphs[1].entries, to_hashset(&[10]));
1850 assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1851 }
1852
1853 #[test]
1854 fn test_multiple_entries_in_one_graph() {
1855 let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1857 let initial_ids = &[3];
1858
1859 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1861
1862 assert_eq!(graphs.len(), 1);
1864 let graph = &graphs[0];
1865 assert_eq!(graph.entries, to_hashset(&[1, 2]));
1866 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1867 }
1868
1869 #[test]
1870 fn test_diamond_shape_graph() {
1871 let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1873 let initial_ids = &[4];
1874
1875 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1877
1878 assert_eq!(graphs.len(), 1);
1880 let graph = &graphs[0];
1881 assert_eq!(graph.entries, to_hashset(&[1]));
1882 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1883 }
1884
1885 #[test]
1886 fn test_starting_with_multiple_nodes_in_same_graph() {
1887 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1890 let initial_ids = &[2, 4];
1891
1892 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1894
1895 assert_eq!(graphs.len(), 1);
1897 let graph = &graphs[0];
1898 assert_eq!(graph.entries, to_hashset(&[1]));
1899 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1900 }
1901
1902 #[test]
1903 fn test_empty_initial_ids() {
1904 let (forward, backward) = build_edges(&[(1, 2)]);
1906 let initial_ids: &[u32] = &[];
1907
1908 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1910
1911 assert!(graphs.is_empty());
1913 }
1914
1915 #[test]
1916 fn test_isolated_node_as_input() {
1917 let (forward, backward) = build_edges(&[(1, 2)]);
1919 let initial_ids = &[100];
1920
1921 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1923
1924 assert_eq!(graphs.len(), 1);
1926 let graph = &graphs[0];
1927 assert_eq!(graph.entries, to_hashset(&[100]));
1928 assert_eq!(graph.components, to_hashset(&[100]));
1929 }
1930
1931 #[test]
1932 fn test_graph_with_a_cycle() {
1933 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1938 let initial_ids = &[2];
1939
1940 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1942
1943 assert!(
1945 graphs.is_empty(),
1946 "A graph with no entries should not be returned"
1947 );
1948 }
1949 #[test]
1950 fn test_custom_complex() {
1951 let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1952 let initial_ids = &[1, 2, 4, 6];
1953
1954 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1956
1957 assert_eq!(graphs.len(), 2);
1959 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1961
1962 assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1964 assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1965
1966 assert_eq!(graphs[1].entries, to_hashset(&[6]));
1968 assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1969 }
1970
1971 #[test]
1972 fn render_actors_increments_actor_counter() {
1973 let actor_id_counter = AtomicU32::new(100);
1974 let fragment_id: FragmentId = 1.into();
1975 let job_id: JobId = 10.into();
1976 let database_id: DatabaseId = DatabaseId::new(3);
1977
1978 let fragment_model = build_fragment(
1979 fragment_id,
1980 job_id,
1981 0,
1982 DistributionType::Single,
1983 1,
1984 StreamingParallelism::Fixed(1),
1985 );
1986
1987 let job_model = streaming_job::Model {
1988 job_id,
1989 job_status: JobStatus::Created,
1990 create_type: CreateType::Foreground,
1991 timezone: None,
1992 config_override: None,
1993 adaptive_parallelism_strategy: None,
1994 parallelism: StreamingParallelism::Fixed(1),
1995 backfill_parallelism: None,
1996 backfill_adaptive_parallelism_strategy: None,
1997 backfill_orders: None,
1998 max_parallelism: 1,
1999 specific_resource_group: None,
2000 is_serverless_backfill: false,
2001 refresh_interval_sec: None,
2002 };
2003
2004 let database_model = database::Model {
2005 database_id,
2006 name: "test_db".into(),
2007 resource_group: "rg-a".into(),
2008 barrier_interval_ms: None,
2009 checkpoint_frequency: None,
2010 };
2011
2012 let ensembles = vec![NoShuffleEnsemble {
2013 entries: HashSet::from([fragment_id]),
2014 components: HashSet::from([fragment_id]),
2015 }];
2016
2017 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
2018 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2019 let job_map = HashMap::from([(job_id, job_model)]);
2020
2021 let worker_map: HashMap<WorkerId, WorkerNode> =
2022 HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
2023
2024 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2025 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2026 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2027 let database_map = HashMap::from([(database_id, database_model)]);
2028
2029 let context = RenderActorsContext {
2030 fragment_source_ids: &fragment_source_ids,
2031 fragment_splits: &fragment_splits,
2032 streaming_job_databases: &streaming_job_databases,
2033 database_map: &database_map,
2034 };
2035
2036 let result = render_actors(
2037 &actor_id_counter,
2038 &ensembles,
2039 &job_fragments,
2040 &job_map,
2041 &worker_map,
2042 context,
2043 )
2044 .expect("actor rendering succeeds");
2045
2046 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2047 assert_eq!(state.len(), 1);
2048 assert!(
2049 state[0].2.is_none(),
2050 "single distribution should not assign vnode bitmaps"
2051 );
2052 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
2053 }
2054
2055 #[test]
2056 fn render_actors_aligns_hash_vnode_bitmaps() {
2057 let actor_id_counter = AtomicU32::new(0);
2058 let entry_fragment_id: FragmentId = 1.into();
2059 let downstream_fragment_id: FragmentId = 2.into();
2060 let job_id: JobId = 20.into();
2061 let database_id: DatabaseId = DatabaseId::new(5);
2062
2063 let entry_fragment = build_fragment(
2064 entry_fragment_id,
2065 job_id,
2066 0,
2067 DistributionType::Hash,
2068 4,
2069 StreamingParallelism::Fixed(2),
2070 );
2071
2072 let downstream_fragment = build_fragment(
2073 downstream_fragment_id,
2074 job_id,
2075 0,
2076 DistributionType::Hash,
2077 4,
2078 StreamingParallelism::Fixed(2),
2079 );
2080
2081 let job_model = streaming_job::Model {
2082 job_id,
2083 job_status: JobStatus::Created,
2084 create_type: CreateType::Background,
2085 timezone: None,
2086 config_override: None,
2087 adaptive_parallelism_strategy: None,
2088 parallelism: StreamingParallelism::Fixed(2),
2089 backfill_parallelism: None,
2090 backfill_adaptive_parallelism_strategy: None,
2091 backfill_orders: None,
2092 max_parallelism: 2,
2093 specific_resource_group: None,
2094 is_serverless_backfill: false,
2095 refresh_interval_sec: None,
2096 };
2097
2098 let database_model = database::Model {
2099 database_id,
2100 name: "test_db_hash".into(),
2101 resource_group: "rg-hash".into(),
2102 barrier_interval_ms: None,
2103 checkpoint_frequency: None,
2104 };
2105
2106 let ensembles = vec![NoShuffleEnsemble {
2107 entries: HashSet::from([entry_fragment_id]),
2108 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2109 }];
2110
2111 let fragment_map = HashMap::from([
2112 (entry_fragment_id, entry_fragment),
2113 (downstream_fragment_id, downstream_fragment),
2114 ]);
2115 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2116 let job_map = HashMap::from([(job_id, job_model)]);
2117
2118 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2119 (1.into(), build_worker_node(1, 1, "rg-hash")),
2120 (2.into(), build_worker_node(2, 1, "rg-hash")),
2121 ]);
2122
2123 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2124 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2125 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2126 let database_map = HashMap::from([(database_id, database_model)]);
2127
2128 let context = RenderActorsContext {
2129 fragment_source_ids: &fragment_source_ids,
2130 fragment_splits: &fragment_splits,
2131 streaming_job_databases: &streaming_job_databases,
2132 database_map: &database_map,
2133 };
2134
2135 let result = render_actors(
2136 &actor_id_counter,
2137 &ensembles,
2138 &job_fragments,
2139 &job_map,
2140 &worker_map,
2141 context,
2142 )
2143 .expect("actor rendering succeeds");
2144
2145 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2146 let downstream_state =
2147 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2148
2149 assert_eq!(entry_state.len(), 2);
2150 assert_eq!(entry_state, downstream_state);
2151
2152 let assigned_vnodes: BTreeSet<_> = entry_state
2153 .iter()
2154 .flat_map(|(_, _, vnodes, _)| {
2155 vnodes
2156 .as_ref()
2157 .expect("hash distribution should populate vnode bitmap")
2158 .iter()
2159 .copied()
2160 })
2161 .collect();
2162 assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
2163 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2164 }
2165
2166 #[test]
2167 fn render_actors_propagates_source_splits() {
2168 let actor_id_counter = AtomicU32::new(0);
2169 let entry_fragment_id: FragmentId = 11.into();
2170 let downstream_fragment_id: FragmentId = 12.into();
2171 let job_id: JobId = 30.into();
2172 let database_id: DatabaseId = DatabaseId::new(7);
2173 let source_id: SourceId = 99.into();
2174
2175 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2176 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2177
2178 let entry_fragment = build_fragment(
2179 entry_fragment_id,
2180 job_id,
2181 source_mask,
2182 DistributionType::Hash,
2183 4,
2184 StreamingParallelism::Fixed(2),
2185 );
2186
2187 let downstream_fragment = build_fragment(
2188 downstream_fragment_id,
2189 job_id,
2190 source_scan_mask,
2191 DistributionType::Hash,
2192 4,
2193 StreamingParallelism::Fixed(2),
2194 );
2195
2196 let job_model = streaming_job::Model {
2197 job_id,
2198 job_status: JobStatus::Created,
2199 create_type: CreateType::Background,
2200 timezone: None,
2201 config_override: None,
2202 adaptive_parallelism_strategy: None,
2203 parallelism: StreamingParallelism::Fixed(2),
2204 backfill_parallelism: None,
2205 backfill_adaptive_parallelism_strategy: None,
2206 backfill_orders: None,
2207 max_parallelism: 2,
2208 specific_resource_group: None,
2209 is_serverless_backfill: false,
2210 refresh_interval_sec: None,
2211 };
2212
2213 let database_model = database::Model {
2214 database_id,
2215 name: "split_db".into(),
2216 resource_group: "rg-source".into(),
2217 barrier_interval_ms: None,
2218 checkpoint_frequency: None,
2219 };
2220
2221 let ensembles = vec![NoShuffleEnsemble {
2222 entries: HashSet::from([entry_fragment_id]),
2223 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2224 }];
2225
2226 let fragment_map = HashMap::from([
2227 (entry_fragment_id, entry_fragment),
2228 (downstream_fragment_id, downstream_fragment),
2229 ]);
2230 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2231 let job_map = HashMap::from([(job_id, job_model)]);
2232
2233 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2234 (1.into(), build_worker_node(1, 1, "rg-source")),
2235 (2.into(), build_worker_node(2, 1, "rg-source")),
2236 ]);
2237
2238 let split_a = SplitImpl::Test(TestSourceSplit {
2239 id: Arc::<str>::from("split-a"),
2240 properties: HashMap::new(),
2241 offset: "0".into(),
2242 });
2243 let split_b = SplitImpl::Test(TestSourceSplit {
2244 id: Arc::<str>::from("split-b"),
2245 properties: HashMap::new(),
2246 offset: "0".into(),
2247 });
2248
2249 let fragment_source_ids = HashMap::from([
2250 (entry_fragment_id, source_id),
2251 (downstream_fragment_id, source_id),
2252 ]);
2253 let fragment_splits =
2254 HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
2255 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2256 let database_map = HashMap::from([(database_id, database_model)]);
2257
2258 let context = RenderActorsContext {
2259 fragment_source_ids: &fragment_source_ids,
2260 fragment_splits: &fragment_splits,
2261 streaming_job_databases: &streaming_job_databases,
2262 database_map: &database_map,
2263 };
2264
2265 let result = render_actors(
2266 &actor_id_counter,
2267 &ensembles,
2268 &job_fragments,
2269 &job_map,
2270 &worker_map,
2271 context,
2272 )
2273 .expect("actor rendering succeeds");
2274
2275 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2276 let downstream_state =
2277 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2278
2279 assert_eq!(entry_state, downstream_state);
2280
2281 let split_ids: BTreeSet<_> = entry_state
2282 .iter()
2283 .flat_map(|(_, _, _, splits)| splits.iter().cloned())
2284 .collect();
2285 assert_eq!(
2286 split_ids,
2287 BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
2288 );
2289 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2290 }
2291
2292 #[test]
2293 fn preview_actor_assignments_defers_actor_id_allocation() {
2294 let actor_id_counter = AtomicU32::new(100);
2295 let entry_fragment_id: FragmentId = 11.into();
2296 let downstream_fragment_id: FragmentId = 12.into();
2297 let job_id: JobId = 30.into();
2298 let database_id: DatabaseId = DatabaseId::new(7);
2299 let source_id: SourceId = 99.into();
2300
2301 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2302 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2303
2304 let entry_fragment = build_fragment(
2305 entry_fragment_id,
2306 job_id,
2307 source_mask,
2308 DistributionType::Hash,
2309 4,
2310 StreamingParallelism::Fixed(2),
2311 );
2312
2313 let downstream_fragment = build_fragment(
2314 downstream_fragment_id,
2315 job_id,
2316 source_scan_mask,
2317 DistributionType::Hash,
2318 4,
2319 StreamingParallelism::Fixed(2),
2320 );
2321
2322 let job_model = streaming_job::Model {
2323 job_id,
2324 job_status: JobStatus::Created,
2325 create_type: CreateType::Background,
2326 timezone: None,
2327 config_override: None,
2328 adaptive_parallelism_strategy: None,
2329 parallelism: StreamingParallelism::Fixed(2),
2330 backfill_parallelism: None,
2331 backfill_adaptive_parallelism_strategy: None,
2332 backfill_orders: None,
2333 max_parallelism: 2,
2334 specific_resource_group: None,
2335 is_serverless_backfill: false,
2336 refresh_interval_sec: None,
2337 };
2338
2339 let database_model = database::Model {
2340 database_id,
2341 name: "preview_db".into(),
2342 resource_group: "rg-source".into(),
2343 barrier_interval_ms: None,
2344 checkpoint_frequency: None,
2345 };
2346
2347 let ensembles = vec![NoShuffleEnsemble {
2348 entries: HashSet::from([entry_fragment_id]),
2349 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2350 }];
2351
2352 let fragment_map = HashMap::from([
2353 (entry_fragment_id, entry_fragment),
2354 (downstream_fragment_id, downstream_fragment),
2355 ]);
2356 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2357 let job_map = HashMap::from([(job_id, job_model)]);
2358
2359 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2360 (1.into(), build_worker_node(1, 1, "rg-source")),
2361 (2.into(), build_worker_node(2, 1, "rg-source")),
2362 ]);
2363
2364 let split_a = SplitImpl::Test(TestSourceSplit {
2365 id: Arc::<str>::from("split-a"),
2366 properties: HashMap::new(),
2367 offset: "0".into(),
2368 });
2369 let split_b = SplitImpl::Test(TestSourceSplit {
2370 id: Arc::<str>::from("split-b"),
2371 properties: HashMap::new(),
2372 offset: "0".into(),
2373 });
2374
2375 let loaded = LoadedFragmentContext {
2376 ensembles,
2377 job_fragments,
2378 job_map,
2379 streaming_job_databases: HashMap::from([(job_id, database_id)]),
2380 database_map: HashMap::from([(database_id, database_model)]),
2381 fragment_source_ids: HashMap::from([
2382 (entry_fragment_id, source_id),
2383 (downstream_fragment_id, source_id),
2384 ]),
2385 fragment_splits: HashMap::from([(entry_fragment_id, vec![split_a, split_b])]),
2386 };
2387
2388 let preview =
2389 preview_actor_assignments(&worker_map, &loaded).expect("preview rendering succeeds");
2390 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 100);
2391
2392 let preview_entry_state =
2393 collect_actor_state(&preview.fragments[&database_id][&job_id][&entry_fragment_id]);
2394 let preview_downstream_state =
2395 collect_actor_state(&preview.fragments[&database_id][&job_id][&downstream_fragment_id]);
2396 assert_eq!(preview_entry_state, preview_downstream_state);
2397
2398 let materialized = materialize_actor_assignments(&actor_id_counter, preview);
2399
2400 let materialized_entry_state =
2401 collect_actor_state(&materialized.fragments[&database_id][&job_id][&entry_fragment_id]);
2402 let materialized_downstream_state = collect_actor_state(
2403 &materialized.fragments[&database_id][&job_id][&downstream_fragment_id],
2404 );
2405
2406 assert_eq!(materialized_entry_state, materialized_downstream_state);
2407 assert_eq!(materialized_entry_state, preview_entry_state);
2408 assert_eq!(materialized_downstream_state, preview_downstream_state);
2409 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 104);
2410 }
2411
2412 #[test]
2414 fn render_actors_job_strategy_overrides_global() {
2415 let actor_id_counter = AtomicU32::new(0);
2416 let fragment_id: FragmentId = 1.into();
2417 let job_id: JobId = 100.into();
2418 let database_id: DatabaseId = DatabaseId::new(10);
2419
2420 let fragment_model = build_fragment(
2422 fragment_id,
2423 job_id,
2424 0,
2425 DistributionType::Hash,
2426 8,
2427 StreamingParallelism::Adaptive,
2428 );
2429
2430 let job_model = streaming_job::Model {
2432 job_id,
2433 job_status: JobStatus::Created,
2434 create_type: CreateType::Foreground,
2435 timezone: None,
2436 config_override: None,
2437 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2438 parallelism: StreamingParallelism::Adaptive,
2439 backfill_parallelism: None,
2440 backfill_adaptive_parallelism_strategy: None,
2441 backfill_orders: None,
2442 max_parallelism: 8,
2443 specific_resource_group: None,
2444 is_serverless_backfill: false,
2445 refresh_interval_sec: None,
2446 };
2447
2448 let database_model = database::Model {
2449 database_id,
2450 name: "test_db".into(),
2451 resource_group: "default".into(),
2452 barrier_interval_ms: None,
2453 checkpoint_frequency: None,
2454 };
2455
2456 let ensembles = vec![NoShuffleEnsemble {
2457 entries: HashSet::from([fragment_id]),
2458 components: HashSet::from([fragment_id]),
2459 }];
2460
2461 let fragment_map =
2462 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2463 let job_map = HashMap::from([(job_id, job_model)]);
2464
2465 let worker_map = HashMap::from([
2467 (1.into(), build_worker_node(1, 1, "default")),
2468 (2.into(), build_worker_node(2, 1, "default")),
2469 (3.into(), build_worker_node(3, 1, "default")),
2470 (4.into(), build_worker_node(4, 1, "default")),
2471 ]);
2472
2473 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2474 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2475 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2476 let database_map = HashMap::from([(database_id, database_model)]);
2477
2478 let context = RenderActorsContext {
2479 fragment_source_ids: &fragment_source_ids,
2480 fragment_splits: &fragment_splits,
2481 streaming_job_databases: &streaming_job_databases,
2482 database_map: &database_map,
2483 };
2484
2485 let result = render_actors(
2487 &actor_id_counter,
2488 &ensembles,
2489 &fragment_map,
2490 &job_map,
2491 &worker_map,
2492 context,
2493 )
2494 .expect("actor rendering succeeds");
2495
2496 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2497 assert_eq!(
2499 state.len(),
2500 2,
2501 "Job strategy BOUNDED(2) should override global FULL"
2502 );
2503 }
2504
2505 #[test]
2508 fn render_actors_falls_back_to_default_when_adaptive_job_has_no_strategy() {
2509 let actor_id_counter = AtomicU32::new(0);
2510 let fragment_id: FragmentId = 1.into();
2511 let job_id: JobId = 101.into();
2512 let database_id: DatabaseId = DatabaseId::new(11);
2513
2514 let fragment_model = build_fragment(
2515 fragment_id,
2516 job_id,
2517 0,
2518 DistributionType::Hash,
2519 8,
2520 StreamingParallelism::Adaptive,
2521 );
2522
2523 let job_model = streaming_job::Model {
2525 job_id,
2526 job_status: JobStatus::Created,
2527 create_type: CreateType::Foreground,
2528 timezone: None,
2529 config_override: None,
2530 adaptive_parallelism_strategy: None, parallelism: StreamingParallelism::Adaptive,
2532 backfill_parallelism: None,
2533 backfill_adaptive_parallelism_strategy: None,
2534 backfill_orders: None,
2535 max_parallelism: 8,
2536 specific_resource_group: None,
2537 is_serverless_backfill: false,
2538 refresh_interval_sec: None,
2539 };
2540
2541 let database_model = database::Model {
2542 database_id,
2543 name: "test_db".into(),
2544 resource_group: "default".into(),
2545 barrier_interval_ms: None,
2546 checkpoint_frequency: None,
2547 };
2548
2549 let ensembles = vec![NoShuffleEnsemble {
2550 entries: HashSet::from([fragment_id]),
2551 components: HashSet::from([fragment_id]),
2552 }];
2553
2554 let fragment_map =
2555 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2556 let job_map = HashMap::from([(job_id, job_model)]);
2557
2558 let worker_map = HashMap::from([
2560 (1.into(), build_worker_node(1, 1, "default")),
2561 (2.into(), build_worker_node(2, 1, "default")),
2562 (3.into(), build_worker_node(3, 1, "default")),
2563 (4.into(), build_worker_node(4, 1, "default")),
2564 ]);
2565
2566 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2567 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2568 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2569 let database_map = HashMap::from([(database_id, database_model)]);
2570
2571 let context = RenderActorsContext {
2572 fragment_source_ids: &fragment_source_ids,
2573 fragment_splits: &fragment_splits,
2574 streaming_job_databases: &streaming_job_databases,
2575 database_map: &database_map,
2576 };
2577
2578 let result = render_actors(
2579 &actor_id_counter,
2580 &ensembles,
2581 &fragment_map,
2582 &job_map,
2583 &worker_map,
2584 context,
2585 )
2586 .expect("actor rendering succeeds");
2587
2588 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2589 assert_eq!(
2590 state.len(),
2591 4,
2592 "default adaptive strategy should use the full available parallelism"
2593 );
2594 }
2595
2596 #[test]
2598 fn render_actors_fixed_parallelism_ignores_strategy() {
2599 let actor_id_counter = AtomicU32::new(0);
2600 let fragment_id: FragmentId = 1.into();
2601 let job_id: JobId = 102.into();
2602 let database_id: DatabaseId = DatabaseId::new(12);
2603
2604 let fragment_model = build_fragment(
2606 fragment_id,
2607 job_id,
2608 0,
2609 DistributionType::Hash,
2610 8,
2611 StreamingParallelism::Fixed(5),
2612 );
2613
2614 let job_model = streaming_job::Model {
2616 job_id,
2617 job_status: JobStatus::Created,
2618 create_type: CreateType::Foreground,
2619 timezone: None,
2620 config_override: None,
2621 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2622 parallelism: StreamingParallelism::Fixed(5),
2623 backfill_parallelism: None,
2624 backfill_adaptive_parallelism_strategy: None,
2625 backfill_orders: None,
2626 max_parallelism: 8,
2627 specific_resource_group: None,
2628 is_serverless_backfill: false,
2629 refresh_interval_sec: None,
2630 };
2631
2632 let database_model = database::Model {
2633 database_id,
2634 name: "test_db".into(),
2635 resource_group: "default".into(),
2636 barrier_interval_ms: None,
2637 checkpoint_frequency: None,
2638 };
2639
2640 let ensembles = vec![NoShuffleEnsemble {
2641 entries: HashSet::from([fragment_id]),
2642 components: HashSet::from([fragment_id]),
2643 }];
2644
2645 let fragment_map =
2646 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2647 let job_map = HashMap::from([(job_id, job_model)]);
2648
2649 let worker_map = HashMap::from([
2651 (1.into(), build_worker_node(1, 1, "default")),
2652 (2.into(), build_worker_node(2, 1, "default")),
2653 (3.into(), build_worker_node(3, 1, "default")),
2654 (4.into(), build_worker_node(4, 1, "default")),
2655 (5.into(), build_worker_node(5, 1, "default")),
2656 (6.into(), build_worker_node(6, 1, "default")),
2657 ]);
2658
2659 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2660 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2661 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2662 let database_map = HashMap::from([(database_id, database_model)]);
2663
2664 let context = RenderActorsContext {
2665 fragment_source_ids: &fragment_source_ids,
2666 fragment_splits: &fragment_splits,
2667 streaming_job_databases: &streaming_job_databases,
2668 database_map: &database_map,
2669 };
2670
2671 let result = render_actors(
2672 &actor_id_counter,
2673 &ensembles,
2674 &fragment_map,
2675 &job_map,
2676 &worker_map,
2677 context,
2678 )
2679 .expect("actor rendering succeeds");
2680
2681 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2682 assert_eq!(
2684 state.len(),
2685 5,
2686 "Fixed parallelism should ignore all strategies"
2687 );
2688 }
2689
2690 #[test]
2692 fn render_actors_ratio_strategy() {
2693 let actor_id_counter = AtomicU32::new(0);
2694 let fragment_id: FragmentId = 1.into();
2695 let job_id: JobId = 103.into();
2696 let database_id: DatabaseId = DatabaseId::new(13);
2697
2698 let fragment_model = build_fragment(
2699 fragment_id,
2700 job_id,
2701 0,
2702 DistributionType::Hash,
2703 16,
2704 StreamingParallelism::Adaptive,
2705 );
2706
2707 let job_model = streaming_job::Model {
2709 job_id,
2710 job_status: JobStatus::Created,
2711 create_type: CreateType::Foreground,
2712 timezone: None,
2713 config_override: None,
2714 adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2715 parallelism: StreamingParallelism::Adaptive,
2716 backfill_parallelism: None,
2717 backfill_adaptive_parallelism_strategy: None,
2718 backfill_orders: None,
2719 max_parallelism: 16,
2720 specific_resource_group: None,
2721 is_serverless_backfill: false,
2722 refresh_interval_sec: None,
2723 };
2724
2725 let database_model = database::Model {
2726 database_id,
2727 name: "test_db".into(),
2728 resource_group: "default".into(),
2729 barrier_interval_ms: None,
2730 checkpoint_frequency: None,
2731 };
2732
2733 let ensembles = vec![NoShuffleEnsemble {
2734 entries: HashSet::from([fragment_id]),
2735 components: HashSet::from([fragment_id]),
2736 }];
2737
2738 let fragment_map =
2739 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2740 let job_map = HashMap::from([(job_id, job_model)]);
2741
2742 let worker_map = HashMap::from([
2744 (1.into(), build_worker_node(1, 1, "default")),
2745 (2.into(), build_worker_node(2, 1, "default")),
2746 (3.into(), build_worker_node(3, 1, "default")),
2747 (4.into(), build_worker_node(4, 1, "default")),
2748 (5.into(), build_worker_node(5, 1, "default")),
2749 (6.into(), build_worker_node(6, 1, "default")),
2750 (7.into(), build_worker_node(7, 1, "default")),
2751 (8.into(), build_worker_node(8, 1, "default")),
2752 ]);
2753
2754 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2755 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2756 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2757 let database_map = HashMap::from([(database_id, database_model)]);
2758
2759 let context = RenderActorsContext {
2760 fragment_source_ids: &fragment_source_ids,
2761 fragment_splits: &fragment_splits,
2762 streaming_job_databases: &streaming_job_databases,
2763 database_map: &database_map,
2764 };
2765
2766 let result = render_actors(
2767 &actor_id_counter,
2768 &ensembles,
2769 &fragment_map,
2770 &job_map,
2771 &worker_map,
2772 context,
2773 )
2774 .expect("actor rendering succeeds");
2775
2776 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2777 assert_eq!(
2779 state.len(),
2780 4,
2781 "RATIO(0.5) of 8 workers should give 4 actors"
2782 );
2783 }
2784
2785 #[test]
2786 fn render_actors_backfill_strategy_overrides_job_strategy() {
2787 let actor_id_counter = AtomicU32::new(0);
2788 let fragment_id: FragmentId = 1.into();
2789 let job_id: JobId = 104.into();
2790 let database_id: DatabaseId = DatabaseId::new(14);
2791
2792 let fragment_model = build_fragment(
2793 fragment_id,
2794 job_id,
2795 0,
2796 DistributionType::Hash,
2797 16,
2798 StreamingParallelism::Adaptive,
2799 );
2800
2801 let job_model = streaming_job::Model {
2802 job_id,
2803 job_status: JobStatus::Creating,
2804 create_type: CreateType::Background,
2805 timezone: None,
2806 config_override: None,
2807 adaptive_parallelism_strategy: Some("BOUNDED(4)".to_owned()),
2808 parallelism: StreamingParallelism::Adaptive,
2809 backfill_parallelism: Some(StreamingParallelism::Adaptive),
2810 backfill_adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2811 backfill_orders: None,
2812 max_parallelism: 16,
2813 specific_resource_group: None,
2814 is_serverless_backfill: false,
2815 refresh_interval_sec: None,
2816 };
2817
2818 let database_model = database::Model {
2819 database_id,
2820 name: "test_db".into(),
2821 resource_group: "default".into(),
2822 barrier_interval_ms: None,
2823 checkpoint_frequency: None,
2824 };
2825
2826 let ensembles = vec![NoShuffleEnsemble {
2827 entries: HashSet::from([fragment_id]),
2828 components: HashSet::from([fragment_id]),
2829 }];
2830
2831 let fragment_map =
2832 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2833 let job_map = HashMap::from([(job_id, job_model)]);
2834
2835 let worker_map = HashMap::from([
2836 (1.into(), build_worker_node(1, 1, "default")),
2837 (2.into(), build_worker_node(2, 1, "default")),
2838 (3.into(), build_worker_node(3, 1, "default")),
2839 (4.into(), build_worker_node(4, 1, "default")),
2840 ]);
2841
2842 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2843 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2844 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2845 let database_map = HashMap::from([(database_id, database_model)]);
2846 let context = RenderActorsContext {
2847 fragment_source_ids: &fragment_source_ids,
2848 fragment_splits: &fragment_splits,
2849 streaming_job_databases: &streaming_job_databases,
2850 database_map: &database_map,
2851 };
2852
2853 let result = render_actors(
2854 &actor_id_counter,
2855 &ensembles,
2856 &fragment_map,
2857 &job_map,
2858 &worker_map,
2859 context,
2860 )
2861 .expect("actor rendering succeeds");
2862
2863 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2864 assert_eq!(
2865 state.len(),
2866 2,
2867 "Backfill strategy BOUNDED(2) should override the job strategy during backfill"
2868 );
2869 }
2870
2871 #[test]
2872 fn render_actors_created_job_ignores_backfill_strategy() {
2873 let actor_id_counter = AtomicU32::new(0);
2874 let fragment_id: FragmentId = 1.into();
2875 let job_id: JobId = 105.into();
2876 let database_id: DatabaseId = DatabaseId::new(15);
2877
2878 let fragment_model = build_fragment(
2879 fragment_id,
2880 job_id,
2881 0,
2882 DistributionType::Hash,
2883 16,
2884 StreamingParallelism::Adaptive,
2885 );
2886
2887 let job_model = streaming_job::Model {
2888 job_id,
2889 job_status: JobStatus::Created,
2890 create_type: CreateType::Foreground,
2891 timezone: None,
2892 config_override: None,
2893 adaptive_parallelism_strategy: Some("BOUNDED(4)".to_owned()),
2894 parallelism: StreamingParallelism::Adaptive,
2895 backfill_parallelism: Some(StreamingParallelism::Adaptive),
2896 backfill_adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2897 backfill_orders: None,
2898 max_parallelism: 16,
2899 specific_resource_group: None,
2900 is_serverless_backfill: false,
2901 refresh_interval_sec: None,
2902 };
2903
2904 let database_model = database::Model {
2905 database_id,
2906 name: "test_db".into(),
2907 resource_group: "default".into(),
2908 barrier_interval_ms: None,
2909 checkpoint_frequency: None,
2910 };
2911
2912 let ensembles = vec![NoShuffleEnsemble {
2913 entries: HashSet::from([fragment_id]),
2914 components: HashSet::from([fragment_id]),
2915 }];
2916
2917 let fragment_map =
2918 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2919 let job_map = HashMap::from([(job_id, job_model)]);
2920
2921 let worker_map = HashMap::from([
2922 (1.into(), build_worker_node(1, 1, "default")),
2923 (2.into(), build_worker_node(2, 1, "default")),
2924 (3.into(), build_worker_node(3, 1, "default")),
2925 (4.into(), build_worker_node(4, 1, "default")),
2926 ]);
2927
2928 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2929 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2930 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2931 let database_map = HashMap::from([(database_id, database_model)]);
2932 let context = RenderActorsContext {
2933 fragment_source_ids: &fragment_source_ids,
2934 fragment_splits: &fragment_splits,
2935 streaming_job_databases: &streaming_job_databases,
2936 database_map: &database_map,
2937 };
2938
2939 let result = render_actors(
2940 &actor_id_counter,
2941 &ensembles,
2942 &fragment_map,
2943 &job_map,
2944 &worker_map,
2945 context,
2946 )
2947 .expect("actor rendering succeeds");
2948
2949 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2950 assert_eq!(
2951 state.len(),
2952 4,
2953 "created jobs should use the main job strategy before any backfill override applies"
2954 );
2955 }
2956}