1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
23use risingwave_common::id::JobId;
24use risingwave_common::system_param::AdaptiveParallelismStrategy;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_connector::source::{SplitImpl, SplitMetaData};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::prelude::{
29 Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
30};
31use risingwave_meta_model::{
32 CreateType, DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism,
33 WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink, source,
34 streaming_job, table,
35};
36use risingwave_meta_model_migration::Condition;
37use sea_orm::{
38 ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
39 RelationTrait,
40};
41
42use crate::MetaResult;
43use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
44use crate::manager::ActiveStreamingWorkerNodes;
45use crate::model::{ActorId, StreamActor, StreamingJobModelContextExt};
46use crate::stream::{AssignerBuilder, SplitDiffOptions};
47
48pub(crate) async fn resolve_streaming_job_definition<C>(
49 txn: &C,
50 job_ids: &HashSet<JobId>,
51) -> MetaResult<HashMap<JobId, String>>
52where
53 C: ConnectionTrait,
54{
55 let job_ids = job_ids.iter().cloned().collect_vec();
56
57 let common_job_definitions: Vec<(JobId, String)> = Table::find()
59 .select_only()
60 .columns([
61 table::Column::TableId,
62 #[cfg(not(debug_assertions))]
63 table::Column::Name,
64 #[cfg(debug_assertions)]
65 table::Column::Definition,
66 ])
67 .filter(table::Column::TableId.is_in(job_ids.clone()))
68 .into_tuple()
69 .all(txn)
70 .await?;
71
72 let sink_definitions: Vec<(JobId, String)> = Sink::find()
73 .select_only()
74 .columns([
75 sink::Column::SinkId,
76 #[cfg(not(debug_assertions))]
77 sink::Column::Name,
78 #[cfg(debug_assertions)]
79 sink::Column::Definition,
80 ])
81 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
82 .into_tuple()
83 .all(txn)
84 .await?;
85
86 let source_definitions: Vec<(JobId, String)> = Source::find()
87 .select_only()
88 .columns([
89 source::Column::SourceId,
90 #[cfg(not(debug_assertions))]
91 source::Column::Name,
92 #[cfg(debug_assertions)]
93 source::Column::Definition,
94 ])
95 .filter(source::Column::SourceId.is_in(job_ids.clone()))
96 .into_tuple()
97 .all(txn)
98 .await?;
99
100 let definitions: HashMap<JobId, String> = common_job_definitions
101 .into_iter()
102 .chain(sink_definitions.into_iter())
103 .chain(source_definitions.into_iter())
104 .collect();
105
106 Ok(definitions)
107}
108
109pub async fn load_fragment_info<C>(
110 txn: &C,
111 actor_id_counter: &AtomicU32,
112 database_id: Option<DatabaseId>,
113 worker_nodes: &ActiveStreamingWorkerNodes,
114 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
115) -> MetaResult<FragmentRenderMap>
116where
117 C: ConnectionTrait,
118{
119 let mut query = StreamingJob::find()
120 .select_only()
121 .column(streaming_job::Column::JobId);
122
123 if let Some(database_id) = database_id {
124 query = query
125 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
126 .filter(object::Column::DatabaseId.eq(database_id));
127 }
128
129 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
130
131 if jobs.is_empty() {
132 return Ok(HashMap::new());
133 }
134
135 let jobs: HashSet<JobId> = jobs.into_iter().collect();
136
137 let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
138
139 if loaded.is_empty() {
140 return Ok(HashMap::new());
141 }
142
143 let available_workers: BTreeMap<_, _> = worker_nodes
144 .current()
145 .values()
146 .filter(|worker| worker.is_streaming_schedulable())
147 .map(|worker| {
148 (
149 worker.id,
150 WorkerInfo {
151 parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
152 resource_group: worker.resource_group(),
153 },
154 )
155 })
156 .collect();
157
158 let RenderedGraph { fragments, .. } = render_actor_assignments(
159 actor_id_counter,
160 &available_workers,
161 adaptive_parallelism_strategy,
162 &loaded,
163 )?;
164
165 Ok(fragments)
166}
167
168#[derive(Debug)]
169pub struct TargetResourcePolicy {
170 pub resource_group: Option<String>,
171 pub parallelism: StreamingParallelism,
172}
173
174#[derive(Debug, Clone)]
175pub struct WorkerInfo {
176 pub parallelism: NonZeroUsize,
177 pub resource_group: Option<String>,
178}
179
180pub type FragmentRenderMap =
181 HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
182
183#[derive(Default)]
184pub struct RenderedGraph {
185 pub fragments: FragmentRenderMap,
186 pub ensembles: Vec<NoShuffleEnsemble>,
187}
188
189impl RenderedGraph {
190 pub fn empty() -> Self {
191 Self::default()
192 }
193}
194
195#[derive(Default)]
199pub struct LoadedFragmentContext {
200 pub ensembles: Vec<NoShuffleEnsemble>,
201 pub fragment_map: HashMap<FragmentId, fragment::Model>,
202 pub job_map: HashMap<JobId, streaming_job::Model>,
203 pub streaming_job_databases: HashMap<JobId, DatabaseId>,
204 pub database_map: HashMap<DatabaseId, database::Model>,
205 pub fragment_source_ids: HashMap<FragmentId, SourceId>,
206 pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
207}
208
209impl LoadedFragmentContext {
210 pub fn is_empty(&self) -> bool {
211 self.ensembles.is_empty()
212 }
213}
214
215pub async fn render_fragments<C>(
220 txn: &C,
221 actor_id_counter: &AtomicU32,
222 ensembles: Vec<NoShuffleEnsemble>,
223 workers: BTreeMap<WorkerId, WorkerInfo>,
224 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
225) -> MetaResult<RenderedGraph>
226where
227 C: ConnectionTrait,
228{
229 let loaded = load_fragment_context(txn, ensembles).await?;
230
231 if loaded.is_empty() {
232 return Ok(RenderedGraph::empty());
233 }
234
235 render_actor_assignments(
236 actor_id_counter,
237 &workers,
238 adaptive_parallelism_strategy,
239 &loaded,
240 )
241}
242
243pub async fn load_fragment_context<C>(
246 txn: &C,
247 ensembles: Vec<NoShuffleEnsemble>,
248) -> MetaResult<LoadedFragmentContext>
249where
250 C: ConnectionTrait,
251{
252 if ensembles.is_empty() {
253 return Ok(LoadedFragmentContext::default());
254 }
255
256 let required_fragment_ids: HashSet<_> = ensembles
257 .iter()
258 .flat_map(|ensemble| ensemble.components.iter().copied())
259 .collect();
260
261 let fragment_models = Fragment::find()
262 .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
263 .all(txn)
264 .await?;
265
266 let found_fragment_ids: HashSet<_> = fragment_models
267 .iter()
268 .map(|fragment| fragment.fragment_id)
269 .collect();
270
271 if found_fragment_ids.len() != required_fragment_ids.len() {
272 let missing = required_fragment_ids
273 .difference(&found_fragment_ids)
274 .copied()
275 .collect_vec();
276 return Err(anyhow!("fragments {:?} not found", missing).into());
277 }
278
279 let fragment_map: HashMap<_, _> = fragment_models
280 .into_iter()
281 .map(|fragment| (fragment.fragment_id, fragment))
282 .collect();
283
284 let job_ids: HashSet<_> = fragment_map
285 .values()
286 .map(|fragment| fragment.job_id)
287 .collect();
288
289 if job_ids.is_empty() {
290 return Ok(LoadedFragmentContext::default());
291 }
292
293 let jobs: HashMap<_, _> = StreamingJob::find()
294 .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
295 .all(txn)
296 .await?
297 .into_iter()
298 .map(|job| (job.job_id, job))
299 .collect();
300
301 let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
302 if found_job_ids.len() != job_ids.len() {
303 let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
304 return Err(anyhow!("streaming jobs {:?} not found", missing).into());
305 }
306
307 build_loaded_context(txn, ensembles, fragment_map, jobs).await
308}
309
310pub async fn render_jobs<C>(
313 txn: &C,
314 actor_id_counter: &AtomicU32,
315 job_ids: HashSet<JobId>,
316 workers: BTreeMap<WorkerId, WorkerInfo>,
317 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
318) -> MetaResult<RenderedGraph>
319where
320 C: ConnectionTrait,
321{
322 let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
323
324 if loaded.is_empty() {
325 return Ok(RenderedGraph::empty());
326 }
327
328 render_actor_assignments(
329 actor_id_counter,
330 &workers,
331 adaptive_parallelism_strategy,
332 &loaded,
333 )
334}
335
336pub async fn load_fragment_context_for_jobs<C>(
339 txn: &C,
340 job_ids: HashSet<JobId>,
341) -> MetaResult<LoadedFragmentContext>
342where
343 C: ConnectionTrait,
344{
345 if job_ids.is_empty() {
346 return Ok(LoadedFragmentContext::default());
347 }
348
349 let excluded_fragments_query = FragmentRelation::find()
350 .select_only()
351 .column(fragment_relation::Column::TargetFragmentId)
352 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
353 .into_query();
354
355 let condition = Condition::all()
356 .add(fragment::Column::JobId.is_in(job_ids.clone()))
357 .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
358
359 let fragments: Vec<FragmentId> = Fragment::find()
360 .select_only()
361 .column(fragment::Column::FragmentId)
362 .filter(condition)
363 .into_tuple()
364 .all(txn)
365 .await?;
366
367 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
368
369 let fragments = Fragment::find()
370 .filter(
371 fragment::Column::FragmentId.is_in(
372 ensembles
373 .iter()
374 .flat_map(|graph| graph.components.iter())
375 .cloned()
376 .collect_vec(),
377 ),
378 )
379 .all(txn)
380 .await?;
381
382 let fragment_map: HashMap<_, _> = fragments
383 .into_iter()
384 .map(|fragment| (fragment.fragment_id, fragment))
385 .collect();
386
387 let job_ids = fragment_map
388 .values()
389 .map(|fragment| fragment.job_id)
390 .collect::<BTreeSet<_>>()
391 .into_iter()
392 .collect_vec();
393
394 let jobs: HashMap<_, _> = StreamingJob::find()
395 .filter(streaming_job::Column::JobId.is_in(job_ids))
396 .all(txn)
397 .await?
398 .into_iter()
399 .map(|job| (job.job_id, job))
400 .collect();
401
402 build_loaded_context(txn, ensembles, fragment_map, jobs).await
403}
404
405pub(crate) fn render_actor_assignments(
408 actor_id_counter: &AtomicU32,
409 worker_map: &BTreeMap<WorkerId, WorkerInfo>,
410 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
411 loaded: &LoadedFragmentContext,
412) -> MetaResult<RenderedGraph> {
413 if loaded.is_empty() {
414 return Ok(RenderedGraph::empty());
415 }
416
417 let backfill_jobs: HashSet<JobId> = loaded
418 .job_map
419 .iter()
420 .filter(|(_, job)| {
421 job.create_type == CreateType::Background && job.job_status == JobStatus::Creating
422 })
423 .map(|(id, _)| *id)
424 .collect();
425
426 let render_context = RenderActorsContext {
427 fragment_source_ids: &loaded.fragment_source_ids,
428 fragment_splits: &loaded.fragment_splits,
429 streaming_job_databases: &loaded.streaming_job_databases,
430 database_map: &loaded.database_map,
431 backfill_jobs: &backfill_jobs,
432 };
433
434 let fragments = render_actors(
435 actor_id_counter,
436 &loaded.ensembles,
437 &loaded.fragment_map,
438 &loaded.job_map,
439 worker_map,
440 adaptive_parallelism_strategy,
441 render_context,
442 )?;
443
444 Ok(RenderedGraph {
445 fragments,
446 ensembles: loaded.ensembles.clone(),
447 })
448}
449
450async fn build_loaded_context<C>(
451 txn: &C,
452 ensembles: Vec<NoShuffleEnsemble>,
453 fragment_map: HashMap<FragmentId, fragment::Model>,
454 job_map: HashMap<JobId, streaming_job::Model>,
455) -> MetaResult<LoadedFragmentContext>
456where
457 C: ConnectionTrait,
458{
459 if ensembles.is_empty() {
460 return Ok(LoadedFragmentContext::default());
461 }
462
463 #[cfg(debug_assertions)]
464 {
465 debug_sanity_check(&ensembles, &fragment_map, &job_map);
466 }
467
468 let (fragment_source_ids, fragment_splits) =
469 resolve_source_fragments(txn, &fragment_map).await?;
470
471 let job_ids = job_map.keys().copied().collect_vec();
472
473 let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
474 .select_only()
475 .column(streaming_job::Column::JobId)
476 .column(object::Column::DatabaseId)
477 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
478 .filter(streaming_job::Column::JobId.is_in(job_ids))
479 .into_tuple()
480 .all(txn)
481 .await?
482 .into_iter()
483 .collect();
484
485 let database_map: HashMap<_, _> = Database::find()
486 .filter(
487 database::Column::DatabaseId
488 .is_in(streaming_job_databases.values().copied().collect_vec()),
489 )
490 .all(txn)
491 .await?
492 .into_iter()
493 .map(|db| (db.database_id, db))
494 .collect();
495
496 Ok(LoadedFragmentContext {
497 ensembles,
498 fragment_map,
499 job_map,
500 streaming_job_databases,
501 database_map,
502 fragment_source_ids,
503 fragment_splits,
504 })
505}
506
507struct RenderActorsContext<'a> {
510 fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
511 fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
512 streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
513 database_map: &'a HashMap<DatabaseId, database::Model>,
514 backfill_jobs: &'a HashSet<JobId>,
515}
516
517fn render_actors(
518 actor_id_counter: &AtomicU32,
519 ensembles: &[NoShuffleEnsemble],
520 fragment_map: &HashMap<FragmentId, fragment::Model>,
521 job_map: &HashMap<JobId, streaming_job::Model>,
522 worker_map: &BTreeMap<WorkerId, WorkerInfo>,
523 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
524 context: RenderActorsContext<'_>,
525) -> MetaResult<FragmentRenderMap> {
526 let RenderActorsContext {
527 fragment_source_ids,
528 fragment_splits: fragment_splits_map,
529 streaming_job_databases,
530 database_map,
531 backfill_jobs,
532 } = context;
533
534 let mut all_fragments: FragmentRenderMap = HashMap::new();
535
536 for NoShuffleEnsemble {
537 entries,
538 components,
539 } in ensembles
540 {
541 tracing::debug!("rendering ensemble entries {:?}", entries);
542
543 let entry_fragments = entries
544 .iter()
545 .map(|fragment_id| fragment_map.get(fragment_id).unwrap())
546 .collect_vec();
547
548 let entry_fragment_parallelism = entry_fragments
549 .iter()
550 .map(|fragment| fragment.parallelism.clone())
551 .dedup()
552 .exactly_one()
553 .map_err(|_| {
554 anyhow!(
555 "entry fragments {:?} have inconsistent parallelism settings",
556 entries.iter().copied().collect_vec()
557 )
558 })?;
559
560 let (job_id, vnode_count) = entry_fragments
561 .iter()
562 .map(|f| (f.job_id, f.vnode_count as usize))
563 .dedup()
564 .exactly_one()
565 .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
566
567 let job = job_map
568 .get(&job_id)
569 .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
570
571 let job_strategy = job
572 .stream_context()
573 .adaptive_parallelism_strategy
574 .unwrap_or(adaptive_parallelism_strategy);
575
576 let resource_group = match &job.specific_resource_group {
577 None => {
578 let database = streaming_job_databases
579 .get(&job_id)
580 .and_then(|database_id| database_map.get(database_id))
581 .unwrap();
582 database.resource_group.clone()
583 }
584 Some(resource_group) => resource_group.clone(),
585 };
586
587 let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
588 .iter()
589 .filter_map(|(worker_id, worker)| {
590 if worker
591 .resource_group
592 .as_deref()
593 .unwrap_or(DEFAULT_RESOURCE_GROUP)
594 == resource_group.as_str()
595 {
596 Some((*worker_id, worker.parallelism))
597 } else {
598 None
599 }
600 })
601 .collect();
602
603 let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
604
605 let effective_job_parallelism = if backfill_jobs.contains(&job_id) {
606 job.backfill_parallelism
607 .as_ref()
608 .unwrap_or(&job.parallelism)
609 } else {
610 &job.parallelism
611 };
612
613 let actual_parallelism = match entry_fragment_parallelism
614 .as_ref()
615 .unwrap_or(effective_job_parallelism)
616 {
617 StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
618 job_strategy.compute_target_parallelism(total_parallelism)
619 }
620 StreamingParallelism::Fixed(n) => *n,
621 }
622 .min(vnode_count)
623 .min(job.max_parallelism as usize);
624
625 tracing::debug!(
626 "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
627 job_id,
628 actual_parallelism,
629 job.parallelism,
630 total_parallelism,
631 job.max_parallelism,
632 vnode_count,
633 entry_fragment_parallelism
634 );
635
636 let assigner = AssignerBuilder::new(job_id).build();
637
638 let actors = (0..(actual_parallelism as u32))
639 .map_into::<ActorId>()
640 .collect_vec();
641 let vnodes = (0..vnode_count).collect_vec();
642
643 let assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
644
645 let source_entry_fragment = entry_fragments.iter().find(|f| {
646 let mask = FragmentTypeMask::from(f.fragment_type_mask);
647 if mask.contains(FragmentTypeFlag::Source) {
648 assert!(!mask.contains(FragmentTypeFlag::SourceScan))
649 }
650 mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
651 });
652
653 let (fragment_splits, shared_source_id) = match source_entry_fragment {
654 Some(entry_fragment) => {
655 let source_id = fragment_source_ids
656 .get(&entry_fragment.fragment_id)
657 .ok_or_else(|| {
658 anyhow!(
659 "missing source id in source fragment {}",
660 entry_fragment.fragment_id
661 )
662 })?;
663
664 let entry_fragment_id = entry_fragment.fragment_id;
665
666 let empty_actor_splits: HashMap<_, _> =
667 actors.iter().map(|actor_id| (*actor_id, vec![])).collect();
668
669 let splits = fragment_splits_map
670 .get(&entry_fragment_id)
671 .cloned()
672 .unwrap_or_default();
673
674 let splits: BTreeMap<_, _> = splits.into_iter().map(|s| (s.id(), s)).collect();
675
676 let fragment_splits = crate::stream::source_manager::reassign_splits(
677 entry_fragment_id,
678 empty_actor_splits,
679 &splits,
680 SplitDiffOptions::default(),
681 )
682 .unwrap_or_default();
683 (fragment_splits, Some(*source_id))
684 }
685 None => (HashMap::new(), None),
686 };
687
688 for component_fragment_id in components {
689 let &fragment::Model {
690 fragment_id,
691 job_id,
692 fragment_type_mask,
693 distribution_type,
694 ref stream_node,
695 ref state_table_ids,
696 ..
697 } = fragment_map.get(component_fragment_id).unwrap();
698
699 let actor_count =
700 u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
701 let actor_id_base = actor_id_counter.fetch_add(actor_count, Ordering::Relaxed);
702
703 let actors: HashMap<ActorId, InflightActorInfo> = assignment
704 .iter()
705 .flat_map(|(worker_id, actors)| {
706 actors
707 .iter()
708 .map(move |(actor_id, vnodes)| (worker_id, actor_id, vnodes))
709 })
710 .map(|(&worker_id, &actor_idx, vnodes)| {
711 let vnode_bitmap = match distribution_type {
712 DistributionType::Single => None,
713 DistributionType::Hash => Some(Bitmap::from_indices(vnode_count, vnodes)),
714 };
715
716 let actor_id = actor_idx + actor_id_base;
717
718 let splits = if let Some(source_id) = fragment_source_ids.get(&fragment_id) {
719 assert_eq!(shared_source_id, Some(*source_id));
720
721 fragment_splits
722 .get(&(actor_idx))
723 .cloned()
724 .unwrap_or_default()
725 } else {
726 vec![]
727 };
728
729 (
730 actor_id,
731 InflightActorInfo {
732 worker_id,
733 vnode_bitmap,
734 splits,
735 },
736 )
737 })
738 .collect();
739
740 let fragment = InflightFragmentInfo {
741 fragment_id,
742 distribution_type,
743 fragment_type_mask: fragment_type_mask.into(),
744 vnode_count,
745 nodes: stream_node.to_protobuf(),
746 actors,
747 state_table_ids: state_table_ids.inner_ref().iter().copied().collect(),
748 };
749
750 let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
751 anyhow!("streaming job {job_id} not found in streaming_job_databases")
752 })?;
753
754 all_fragments
755 .entry(database_id)
756 .or_default()
757 .entry(job_id)
758 .or_default()
759 .insert(fragment_id, fragment);
760 }
761 }
762
763 Ok(all_fragments)
764}
765
766#[cfg(debug_assertions)]
767fn debug_sanity_check(
768 ensembles: &[NoShuffleEnsemble],
769 fragment_map: &HashMap<FragmentId, fragment::Model>,
770 jobs: &HashMap<JobId, streaming_job::Model>,
771) {
772 debug_assert!(
774 ensembles
775 .iter()
776 .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
777 "entries must be subset of components"
778 );
779
780 let mut missing_fragments = BTreeSet::new();
781 let mut missing_jobs = BTreeSet::new();
782
783 for fragment_id in ensembles
784 .iter()
785 .flat_map(|ensemble| ensemble.components.iter())
786 {
787 match fragment_map.get(fragment_id) {
788 Some(fragment) => {
789 if !jobs.contains_key(&fragment.job_id) {
790 missing_jobs.insert(fragment.job_id);
791 }
792 }
793 None => {
794 missing_fragments.insert(*fragment_id);
795 }
796 }
797 }
798
799 debug_assert!(
800 missing_fragments.is_empty(),
801 "missing fragments in fragment_map: {:?}",
802 missing_fragments
803 );
804
805 debug_assert!(
806 missing_jobs.is_empty(),
807 "missing jobs for fragments' job_id: {:?}",
808 missing_jobs
809 );
810
811 for ensemble in ensembles {
812 let unique_vnode_counts: Vec<_> = ensemble
813 .components
814 .iter()
815 .flat_map(|fragment_id| {
816 fragment_map
817 .get(fragment_id)
818 .map(|fragment| fragment.vnode_count)
819 })
820 .unique()
821 .collect();
822
823 debug_assert!(
824 unique_vnode_counts.len() <= 1,
825 "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
826 ensemble.components,
827 unique_vnode_counts
828 );
829 }
830}
831
832async fn resolve_source_fragments<C>(
833 txn: &C,
834 fragment_map: &HashMap<FragmentId, fragment::Model>,
835) -> MetaResult<(
836 HashMap<FragmentId, SourceId>,
837 HashMap<FragmentId, Vec<SplitImpl>>,
838)>
839where
840 C: ConnectionTrait,
841{
842 let mut source_fragment_ids: HashMap<SourceId, _> = HashMap::new();
843 for (fragment_id, fragment) in fragment_map {
844 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
845 if mask.contains(FragmentTypeFlag::Source)
846 && let Some(source_id) = fragment.stream_node.to_protobuf().find_stream_source()
847 {
848 source_fragment_ids
849 .entry(source_id)
850 .or_insert_with(BTreeSet::new)
851 .insert(fragment_id);
852 }
853
854 if mask.contains(FragmentTypeFlag::SourceScan)
855 && let Some((source_id, _)) = fragment.stream_node.to_protobuf().find_source_backfill()
856 {
857 source_fragment_ids
858 .entry(source_id)
859 .or_insert_with(BTreeSet::new)
860 .insert(fragment_id);
861 }
862 }
863
864 let fragment_source_ids: HashMap<_, _> = source_fragment_ids
865 .iter()
866 .flat_map(|(source_id, fragment_ids)| {
867 fragment_ids
868 .iter()
869 .map(|fragment_id| (**fragment_id, *source_id as SourceId))
870 })
871 .collect();
872
873 let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
874
875 let fragment_splits: Vec<_> = FragmentSplits::find()
876 .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
877 .all(txn)
878 .await?;
879
880 let fragment_splits: HashMap<_, _> = fragment_splits
881 .into_iter()
882 .flat_map(|model| {
883 model.splits.map(|splits| {
884 (
885 model.fragment_id,
886 splits
887 .to_protobuf()
888 .splits
889 .iter()
890 .flat_map(SplitImpl::try_from)
891 .collect_vec(),
892 )
893 })
894 })
895 .collect();
896
897 Ok((fragment_source_ids, fragment_splits))
898}
899
900#[derive(Debug)]
902pub struct ActorGraph<'a> {
903 pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
904 pub locations: &'a HashMap<ActorId, WorkerId>,
905}
906
907#[derive(Debug, Clone)]
908pub struct NoShuffleEnsemble {
909 entries: HashSet<FragmentId>,
910 components: HashSet<FragmentId>,
911}
912
913impl NoShuffleEnsemble {
914 pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
915 self.components.iter().cloned()
916 }
917
918 pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
919 self.entries.iter().copied()
920 }
921
922 pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
923 self.components.iter().copied()
924 }
925
926 pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
927 self.entries.contains(fragment_id)
928 }
929}
930
931pub async fn find_fragment_no_shuffle_dags_detailed(
932 db: &impl ConnectionTrait,
933 initial_fragment_ids: &[FragmentId],
934) -> MetaResult<Vec<NoShuffleEnsemble>> {
935 let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
936 .columns([
937 fragment_relation::Column::SourceFragmentId,
938 fragment_relation::Column::TargetFragmentId,
939 ])
940 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
941 .into_tuple()
942 .all(db)
943 .await?;
944
945 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
946 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
947
948 for (src, dst) in all_no_shuffle_relations {
949 forward_edges.entry(src).or_default().push(dst);
950 backward_edges.entry(dst).or_default().push(src);
951 }
952
953 find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
954}
955
956fn find_no_shuffle_graphs(
957 initial_fragment_ids: &[impl Into<FragmentId> + Copy],
958 forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
959 backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
960) -> MetaResult<Vec<NoShuffleEnsemble>> {
961 let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
962 let mut globally_visited: HashSet<FragmentId> = HashSet::new();
963
964 for &init_id in initial_fragment_ids {
965 let init_id = init_id.into();
966 if globally_visited.contains(&init_id) {
967 continue;
968 }
969
970 let mut components = HashSet::new();
972 let mut queue: VecDeque<FragmentId> = VecDeque::new();
973
974 queue.push_back(init_id);
975 globally_visited.insert(init_id);
976
977 while let Some(current_id) = queue.pop_front() {
978 components.insert(current_id);
979 let neighbors = forward_edges
980 .get(¤t_id)
981 .into_iter()
982 .flatten()
983 .chain(backward_edges.get(¤t_id).into_iter().flatten());
984
985 for &neighbor_id in neighbors {
986 if globally_visited.insert(neighbor_id) {
987 queue.push_back(neighbor_id);
988 }
989 }
990 }
991
992 let mut entries = HashSet::new();
994 for &node_id in &components {
995 let is_root = match backward_edges.get(&node_id) {
996 Some(parents) => parents.iter().all(|p| !components.contains(p)),
997 None => true,
998 };
999 if is_root {
1000 entries.insert(node_id);
1001 }
1002 }
1003
1004 if !entries.is_empty() {
1006 graphs.push(NoShuffleEnsemble {
1007 entries,
1008 components,
1009 });
1010 }
1011 }
1012
1013 Ok(graphs)
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018 use std::collections::{BTreeSet, HashMap, HashSet};
1019 use std::sync::Arc;
1020
1021 use risingwave_connector::source::SplitImpl;
1022 use risingwave_connector::source::test_source::TestSourceSplit;
1023 use risingwave_meta_model::{CreateType, I32Array, JobStatus, StreamNode, TableIdArray};
1024 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1025
1026 use super::*;
1027
1028 type Edges = (
1031 HashMap<FragmentId, Vec<FragmentId>>,
1032 HashMap<FragmentId, Vec<FragmentId>>,
1033 );
1034
1035 fn build_edges(relations: &[(u32, u32)]) -> Edges {
1038 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1039 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1040 for &(src, dst) in relations {
1041 forward_edges
1042 .entry(src.into())
1043 .or_default()
1044 .push(dst.into());
1045 backward_edges
1046 .entry(dst.into())
1047 .or_default()
1048 .push(src.into());
1049 }
1050 (forward_edges, backward_edges)
1051 }
1052
1053 fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1055 ids.iter().map(|id| (*id).into()).collect()
1056 }
1057
1058 #[allow(deprecated)]
1059 fn build_fragment(
1060 fragment_id: FragmentId,
1061 job_id: JobId,
1062 fragment_type_mask: i32,
1063 distribution_type: DistributionType,
1064 vnode_count: i32,
1065 parallelism: StreamingParallelism,
1066 ) -> fragment::Model {
1067 fragment::Model {
1068 fragment_id,
1069 job_id,
1070 fragment_type_mask,
1071 distribution_type,
1072 stream_node: StreamNode::from(&PbStreamNode::default()),
1073 state_table_ids: TableIdArray::default(),
1074 upstream_fragment_id: I32Array::default(),
1075 vnode_count,
1076 parallelism: Some(parallelism),
1077 }
1078 }
1079
1080 type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1081
1082 fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1083 let base = fragment.actors.keys().copied().min().unwrap_or_default();
1084
1085 let mut entries: Vec<_> = fragment
1086 .actors
1087 .iter()
1088 .map(|(&actor_id, info)| {
1089 let idx = actor_id.as_raw_id() - base.as_raw_id();
1090 let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1091 bitmap
1092 .iter()
1093 .enumerate()
1094 .filter_map(|(pos, is_set)| is_set.then_some(pos))
1095 .collect::<Vec<_>>()
1096 });
1097 let splits = info
1098 .splits
1099 .iter()
1100 .map(|split| split.id().to_string())
1101 .collect::<Vec<_>>();
1102 (idx.into(), info.worker_id, vnode_indices, splits)
1103 })
1104 .collect();
1105
1106 entries.sort_by_key(|(idx, _, _, _)| *idx);
1107 entries
1108 }
1109
1110 #[test]
1111 fn test_single_linear_chain() {
1112 let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1115 let initial_ids = &[2];
1116
1117 let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1119
1120 assert!(result.is_ok());
1122 let graphs = result.unwrap();
1123
1124 assert_eq!(graphs.len(), 1);
1125 let graph = &graphs[0];
1126 assert_eq!(graph.entries, to_hashset(&[1]));
1127 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1128 }
1129
1130 #[test]
1131 fn test_two_disconnected_graphs() {
1132 let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1135 let initial_ids = &[2, 10];
1136
1137 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1139
1140 assert_eq!(graphs.len(), 2);
1142
1143 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1145
1146 assert_eq!(graphs[0].entries, to_hashset(&[1]));
1148 assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1149
1150 assert_eq!(graphs[1].entries, to_hashset(&[10]));
1152 assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1153 }
1154
1155 #[test]
1156 fn test_multiple_entries_in_one_graph() {
1157 let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1159 let initial_ids = &[3];
1160
1161 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1163
1164 assert_eq!(graphs.len(), 1);
1166 let graph = &graphs[0];
1167 assert_eq!(graph.entries, to_hashset(&[1, 2]));
1168 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1169 }
1170
1171 #[test]
1172 fn test_diamond_shape_graph() {
1173 let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1175 let initial_ids = &[4];
1176
1177 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1179
1180 assert_eq!(graphs.len(), 1);
1182 let graph = &graphs[0];
1183 assert_eq!(graph.entries, to_hashset(&[1]));
1184 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1185 }
1186
1187 #[test]
1188 fn test_starting_with_multiple_nodes_in_same_graph() {
1189 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1192 let initial_ids = &[2, 4];
1193
1194 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1196
1197 assert_eq!(graphs.len(), 1);
1199 let graph = &graphs[0];
1200 assert_eq!(graph.entries, to_hashset(&[1]));
1201 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1202 }
1203
1204 #[test]
1205 fn test_empty_initial_ids() {
1206 let (forward, backward) = build_edges(&[(1, 2)]);
1208 let initial_ids: &[u32] = &[];
1209
1210 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1212
1213 assert!(graphs.is_empty());
1215 }
1216
1217 #[test]
1218 fn test_isolated_node_as_input() {
1219 let (forward, backward) = build_edges(&[(1, 2)]);
1221 let initial_ids = &[100];
1222
1223 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1225
1226 assert_eq!(graphs.len(), 1);
1228 let graph = &graphs[0];
1229 assert_eq!(graph.entries, to_hashset(&[100]));
1230 assert_eq!(graph.components, to_hashset(&[100]));
1231 }
1232
1233 #[test]
1234 fn test_graph_with_a_cycle() {
1235 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1240 let initial_ids = &[2];
1241
1242 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1244
1245 assert!(
1247 graphs.is_empty(),
1248 "A graph with no entries should not be returned"
1249 );
1250 }
1251 #[test]
1252 fn test_custom_complex() {
1253 let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1254 let initial_ids = &[1, 2, 4, 6];
1255
1256 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1258
1259 assert_eq!(graphs.len(), 2);
1261 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1263
1264 assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1266 assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1267
1268 assert_eq!(graphs[1].entries, to_hashset(&[6]));
1270 assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1271 }
1272
1273 #[test]
1274 fn render_actors_increments_actor_counter() {
1275 let actor_id_counter = AtomicU32::new(100);
1276 let fragment_id: FragmentId = 1.into();
1277 let job_id: JobId = 10.into();
1278 let database_id: DatabaseId = DatabaseId::new(3);
1279
1280 let fragment_model = build_fragment(
1281 fragment_id,
1282 job_id,
1283 0,
1284 DistributionType::Single,
1285 1,
1286 StreamingParallelism::Fixed(1),
1287 );
1288
1289 let job_model = streaming_job::Model {
1290 job_id,
1291 job_status: JobStatus::Created,
1292 create_type: CreateType::Foreground,
1293 timezone: None,
1294 config_override: None,
1295 adaptive_parallelism_strategy: None,
1296 parallelism: StreamingParallelism::Fixed(1),
1297 backfill_parallelism: None,
1298 backfill_orders: None,
1299 max_parallelism: 1,
1300 specific_resource_group: None,
1301 is_serverless_backfill: false,
1302 };
1303
1304 let database_model = database::Model {
1305 database_id,
1306 name: "test_db".into(),
1307 resource_group: "rg-a".into(),
1308 barrier_interval_ms: None,
1309 checkpoint_frequency: None,
1310 };
1311
1312 let ensembles = vec![NoShuffleEnsemble {
1313 entries: HashSet::from([fragment_id]),
1314 components: HashSet::from([fragment_id]),
1315 }];
1316
1317 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1318 let job_map = HashMap::from([(job_id, job_model)]);
1319
1320 let worker_map = BTreeMap::from([(
1321 1.into(),
1322 WorkerInfo {
1323 parallelism: NonZeroUsize::new(1).unwrap(),
1324 resource_group: Some("rg-a".into()),
1325 },
1326 )]);
1327
1328 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1329 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1330 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1331 let database_map = HashMap::from([(database_id, database_model)]);
1332 let backfill_jobs = HashSet::new();
1333
1334 let context = RenderActorsContext {
1335 fragment_source_ids: &fragment_source_ids,
1336 fragment_splits: &fragment_splits,
1337 streaming_job_databases: &streaming_job_databases,
1338 database_map: &database_map,
1339 backfill_jobs: &backfill_jobs,
1340 };
1341
1342 let result = render_actors(
1343 &actor_id_counter,
1344 &ensembles,
1345 &fragment_map,
1346 &job_map,
1347 &worker_map,
1348 AdaptiveParallelismStrategy::Auto,
1349 context,
1350 )
1351 .expect("actor rendering succeeds");
1352
1353 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1354 assert_eq!(state.len(), 1);
1355 assert!(
1356 state[0].2.is_none(),
1357 "single distribution should not assign vnode bitmaps"
1358 );
1359 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
1360 }
1361
1362 #[test]
1363 fn render_actors_aligns_hash_vnode_bitmaps() {
1364 let actor_id_counter = AtomicU32::new(0);
1365 let entry_fragment_id: FragmentId = 1.into();
1366 let downstream_fragment_id: FragmentId = 2.into();
1367 let job_id: JobId = 20.into();
1368 let database_id: DatabaseId = DatabaseId::new(5);
1369
1370 let entry_fragment = build_fragment(
1371 entry_fragment_id,
1372 job_id,
1373 0,
1374 DistributionType::Hash,
1375 4,
1376 StreamingParallelism::Fixed(2),
1377 );
1378
1379 let downstream_fragment = build_fragment(
1380 downstream_fragment_id,
1381 job_id,
1382 0,
1383 DistributionType::Hash,
1384 4,
1385 StreamingParallelism::Fixed(2),
1386 );
1387
1388 let job_model = streaming_job::Model {
1389 job_id,
1390 job_status: JobStatus::Created,
1391 create_type: CreateType::Background,
1392 timezone: None,
1393 config_override: None,
1394 adaptive_parallelism_strategy: None,
1395 parallelism: StreamingParallelism::Fixed(2),
1396 backfill_parallelism: None,
1397 backfill_orders: None,
1398 max_parallelism: 2,
1399 specific_resource_group: None,
1400 is_serverless_backfill: false,
1401 };
1402
1403 let database_model = database::Model {
1404 database_id,
1405 name: "test_db_hash".into(),
1406 resource_group: "rg-hash".into(),
1407 barrier_interval_ms: None,
1408 checkpoint_frequency: None,
1409 };
1410
1411 let ensembles = vec![NoShuffleEnsemble {
1412 entries: HashSet::from([entry_fragment_id]),
1413 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1414 }];
1415
1416 let fragment_map = HashMap::from([
1417 (entry_fragment_id, entry_fragment),
1418 (downstream_fragment_id, downstream_fragment),
1419 ]);
1420 let job_map = HashMap::from([(job_id, job_model)]);
1421
1422 let worker_map = BTreeMap::from([
1423 (
1424 1.into(),
1425 WorkerInfo {
1426 parallelism: NonZeroUsize::new(1).unwrap(),
1427 resource_group: Some("rg-hash".into()),
1428 },
1429 ),
1430 (
1431 2.into(),
1432 WorkerInfo {
1433 parallelism: NonZeroUsize::new(1).unwrap(),
1434 resource_group: Some("rg-hash".into()),
1435 },
1436 ),
1437 ]);
1438
1439 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1440 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1441 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1442 let database_map = HashMap::from([(database_id, database_model)]);
1443 let backfill_jobs = HashSet::new();
1444
1445 let context = RenderActorsContext {
1446 fragment_source_ids: &fragment_source_ids,
1447 fragment_splits: &fragment_splits,
1448 streaming_job_databases: &streaming_job_databases,
1449 database_map: &database_map,
1450 backfill_jobs: &backfill_jobs,
1451 };
1452
1453 let result = render_actors(
1454 &actor_id_counter,
1455 &ensembles,
1456 &fragment_map,
1457 &job_map,
1458 &worker_map,
1459 AdaptiveParallelismStrategy::Auto,
1460 context,
1461 )
1462 .expect("actor rendering succeeds");
1463
1464 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1465 let downstream_state =
1466 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1467
1468 assert_eq!(entry_state.len(), 2);
1469 assert_eq!(entry_state, downstream_state);
1470
1471 let assigned_vnodes: BTreeSet<_> = entry_state
1472 .iter()
1473 .flat_map(|(_, _, vnodes, _)| {
1474 vnodes
1475 .as_ref()
1476 .expect("hash distribution should populate vnode bitmap")
1477 .iter()
1478 .copied()
1479 })
1480 .collect();
1481 assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
1482 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1483 }
1484
1485 #[test]
1486 fn render_actors_propagates_source_splits() {
1487 let actor_id_counter = AtomicU32::new(0);
1488 let entry_fragment_id: FragmentId = 11.into();
1489 let downstream_fragment_id: FragmentId = 12.into();
1490 let job_id: JobId = 30.into();
1491 let database_id: DatabaseId = DatabaseId::new(7);
1492 let source_id: SourceId = 99.into();
1493
1494 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
1495 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
1496
1497 let entry_fragment = build_fragment(
1498 entry_fragment_id,
1499 job_id,
1500 source_mask,
1501 DistributionType::Hash,
1502 4,
1503 StreamingParallelism::Fixed(2),
1504 );
1505
1506 let downstream_fragment = build_fragment(
1507 downstream_fragment_id,
1508 job_id,
1509 source_scan_mask,
1510 DistributionType::Hash,
1511 4,
1512 StreamingParallelism::Fixed(2),
1513 );
1514
1515 let job_model = streaming_job::Model {
1516 job_id,
1517 job_status: JobStatus::Created,
1518 create_type: CreateType::Background,
1519 timezone: None,
1520 config_override: None,
1521 adaptive_parallelism_strategy: None,
1522 parallelism: StreamingParallelism::Fixed(2),
1523 backfill_parallelism: None,
1524 backfill_orders: None,
1525 max_parallelism: 2,
1526 specific_resource_group: None,
1527 is_serverless_backfill: false,
1528 };
1529
1530 let database_model = database::Model {
1531 database_id,
1532 name: "split_db".into(),
1533 resource_group: "rg-source".into(),
1534 barrier_interval_ms: None,
1535 checkpoint_frequency: None,
1536 };
1537
1538 let ensembles = vec![NoShuffleEnsemble {
1539 entries: HashSet::from([entry_fragment_id]),
1540 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1541 }];
1542
1543 let fragment_map = HashMap::from([
1544 (entry_fragment_id, entry_fragment),
1545 (downstream_fragment_id, downstream_fragment),
1546 ]);
1547 let job_map = HashMap::from([(job_id, job_model)]);
1548
1549 let worker_map = BTreeMap::from([
1550 (
1551 1.into(),
1552 WorkerInfo {
1553 parallelism: NonZeroUsize::new(1).unwrap(),
1554 resource_group: Some("rg-source".into()),
1555 },
1556 ),
1557 (
1558 2.into(),
1559 WorkerInfo {
1560 parallelism: NonZeroUsize::new(1).unwrap(),
1561 resource_group: Some("rg-source".into()),
1562 },
1563 ),
1564 ]);
1565
1566 let split_a = SplitImpl::Test(TestSourceSplit {
1567 id: Arc::<str>::from("split-a"),
1568 properties: HashMap::new(),
1569 offset: "0".into(),
1570 });
1571 let split_b = SplitImpl::Test(TestSourceSplit {
1572 id: Arc::<str>::from("split-b"),
1573 properties: HashMap::new(),
1574 offset: "0".into(),
1575 });
1576
1577 let fragment_source_ids = HashMap::from([
1578 (entry_fragment_id, source_id),
1579 (downstream_fragment_id, source_id),
1580 ]);
1581 let fragment_splits =
1582 HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
1583 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1584 let database_map = HashMap::from([(database_id, database_model)]);
1585 let backfill_jobs = HashSet::new();
1586
1587 let context = RenderActorsContext {
1588 fragment_source_ids: &fragment_source_ids,
1589 fragment_splits: &fragment_splits,
1590 streaming_job_databases: &streaming_job_databases,
1591 database_map: &database_map,
1592 backfill_jobs: &backfill_jobs,
1593 };
1594
1595 let result = render_actors(
1596 &actor_id_counter,
1597 &ensembles,
1598 &fragment_map,
1599 &job_map,
1600 &worker_map,
1601 AdaptiveParallelismStrategy::Auto,
1602 context,
1603 )
1604 .expect("actor rendering succeeds");
1605
1606 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1607 let downstream_state =
1608 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1609
1610 assert_eq!(entry_state, downstream_state);
1611
1612 let split_ids: BTreeSet<_> = entry_state
1613 .iter()
1614 .flat_map(|(_, _, _, splits)| splits.iter().cloned())
1615 .collect();
1616 assert_eq!(
1617 split_ids,
1618 BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
1619 );
1620 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1621 }
1622
1623 #[test]
1625 fn render_actors_job_strategy_overrides_global() {
1626 let actor_id_counter = AtomicU32::new(0);
1627 let fragment_id: FragmentId = 1.into();
1628 let job_id: JobId = 100.into();
1629 let database_id: DatabaseId = DatabaseId::new(10);
1630
1631 let fragment_model = build_fragment(
1633 fragment_id,
1634 job_id,
1635 0,
1636 DistributionType::Hash,
1637 8,
1638 StreamingParallelism::Adaptive,
1639 );
1640
1641 let job_model = streaming_job::Model {
1643 job_id,
1644 job_status: JobStatus::Created,
1645 create_type: CreateType::Foreground,
1646 timezone: None,
1647 config_override: None,
1648 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1649 parallelism: StreamingParallelism::Adaptive,
1650 backfill_parallelism: None,
1651 backfill_orders: None,
1652 max_parallelism: 8,
1653 specific_resource_group: None,
1654 is_serverless_backfill: false,
1655 };
1656
1657 let database_model = database::Model {
1658 database_id,
1659 name: "test_db".into(),
1660 resource_group: "default".into(),
1661 barrier_interval_ms: None,
1662 checkpoint_frequency: None,
1663 };
1664
1665 let ensembles = vec![NoShuffleEnsemble {
1666 entries: HashSet::from([fragment_id]),
1667 components: HashSet::from([fragment_id]),
1668 }];
1669
1670 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1671 let job_map = HashMap::from([(job_id, job_model)]);
1672
1673 let worker_map = BTreeMap::from([
1675 (
1676 1.into(),
1677 WorkerInfo {
1678 parallelism: NonZeroUsize::new(1).unwrap(),
1679 resource_group: Some("default".into()),
1680 },
1681 ),
1682 (
1683 2.into(),
1684 WorkerInfo {
1685 parallelism: NonZeroUsize::new(1).unwrap(),
1686 resource_group: Some("default".into()),
1687 },
1688 ),
1689 (
1690 3.into(),
1691 WorkerInfo {
1692 parallelism: NonZeroUsize::new(1).unwrap(),
1693 resource_group: Some("default".into()),
1694 },
1695 ),
1696 (
1697 4.into(),
1698 WorkerInfo {
1699 parallelism: NonZeroUsize::new(1).unwrap(),
1700 resource_group: Some("default".into()),
1701 },
1702 ),
1703 ]);
1704
1705 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1706 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1707 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1708 let database_map = HashMap::from([(database_id, database_model)]);
1709 let backfill_jobs = HashSet::new();
1710
1711 let context = RenderActorsContext {
1712 fragment_source_ids: &fragment_source_ids,
1713 fragment_splits: &fragment_splits,
1714 streaming_job_databases: &streaming_job_databases,
1715 database_map: &database_map,
1716 backfill_jobs: &backfill_jobs,
1717 };
1718
1719 let result = render_actors(
1721 &actor_id_counter,
1722 &ensembles,
1723 &fragment_map,
1724 &job_map,
1725 &worker_map,
1726 AdaptiveParallelismStrategy::Full,
1727 context,
1728 )
1729 .expect("actor rendering succeeds");
1730
1731 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1732 assert_eq!(
1734 state.len(),
1735 2,
1736 "Job strategy BOUNDED(2) should override global FULL"
1737 );
1738 }
1739
1740 #[test]
1742 fn render_actors_uses_global_strategy_when_job_has_none() {
1743 let actor_id_counter = AtomicU32::new(0);
1744 let fragment_id: FragmentId = 1.into();
1745 let job_id: JobId = 101.into();
1746 let database_id: DatabaseId = DatabaseId::new(11);
1747
1748 let fragment_model = build_fragment(
1749 fragment_id,
1750 job_id,
1751 0,
1752 DistributionType::Hash,
1753 8,
1754 StreamingParallelism::Adaptive,
1755 );
1756
1757 let job_model = streaming_job::Model {
1759 job_id,
1760 job_status: JobStatus::Created,
1761 create_type: CreateType::Foreground,
1762 timezone: None,
1763 config_override: None,
1764 adaptive_parallelism_strategy: None, parallelism: StreamingParallelism::Adaptive,
1766 backfill_parallelism: None,
1767 backfill_orders: None,
1768 max_parallelism: 8,
1769 specific_resource_group: None,
1770 is_serverless_backfill: false,
1771 };
1772
1773 let database_model = database::Model {
1774 database_id,
1775 name: "test_db".into(),
1776 resource_group: "default".into(),
1777 barrier_interval_ms: None,
1778 checkpoint_frequency: None,
1779 };
1780
1781 let ensembles = vec![NoShuffleEnsemble {
1782 entries: HashSet::from([fragment_id]),
1783 components: HashSet::from([fragment_id]),
1784 }];
1785
1786 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1787 let job_map = HashMap::from([(job_id, job_model)]);
1788
1789 let worker_map = BTreeMap::from([
1791 (
1792 1.into(),
1793 WorkerInfo {
1794 parallelism: NonZeroUsize::new(1).unwrap(),
1795 resource_group: Some("default".into()),
1796 },
1797 ),
1798 (
1799 2.into(),
1800 WorkerInfo {
1801 parallelism: NonZeroUsize::new(1).unwrap(),
1802 resource_group: Some("default".into()),
1803 },
1804 ),
1805 (
1806 3.into(),
1807 WorkerInfo {
1808 parallelism: NonZeroUsize::new(1).unwrap(),
1809 resource_group: Some("default".into()),
1810 },
1811 ),
1812 (
1813 4.into(),
1814 WorkerInfo {
1815 parallelism: NonZeroUsize::new(1).unwrap(),
1816 resource_group: Some("default".into()),
1817 },
1818 ),
1819 ]);
1820
1821 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1822 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1823 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1824 let database_map = HashMap::from([(database_id, database_model)]);
1825 let backfill_jobs = HashSet::new();
1826
1827 let context = RenderActorsContext {
1828 fragment_source_ids: &fragment_source_ids,
1829 fragment_splits: &fragment_splits,
1830 streaming_job_databases: &streaming_job_databases,
1831 database_map: &database_map,
1832 backfill_jobs: &backfill_jobs,
1833 };
1834
1835 let result = render_actors(
1837 &actor_id_counter,
1838 &ensembles,
1839 &fragment_map,
1840 &job_map,
1841 &worker_map,
1842 AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(3).unwrap()),
1843 context,
1844 )
1845 .expect("actor rendering succeeds");
1846
1847 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1848 assert_eq!(
1850 state.len(),
1851 3,
1852 "Should use global strategy BOUNDED(3) when job has no custom strategy"
1853 );
1854 }
1855
1856 #[test]
1858 fn render_actors_fixed_parallelism_ignores_strategy() {
1859 let actor_id_counter = AtomicU32::new(0);
1860 let fragment_id: FragmentId = 1.into();
1861 let job_id: JobId = 102.into();
1862 let database_id: DatabaseId = DatabaseId::new(12);
1863
1864 let fragment_model = build_fragment(
1866 fragment_id,
1867 job_id,
1868 0,
1869 DistributionType::Hash,
1870 8,
1871 StreamingParallelism::Fixed(5),
1872 );
1873
1874 let job_model = streaming_job::Model {
1876 job_id,
1877 job_status: JobStatus::Created,
1878 create_type: CreateType::Foreground,
1879 timezone: None,
1880 config_override: None,
1881 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1882 parallelism: StreamingParallelism::Fixed(5),
1883 backfill_parallelism: None,
1884 backfill_orders: None,
1885 max_parallelism: 8,
1886 specific_resource_group: None,
1887 is_serverless_backfill: false,
1888 };
1889
1890 let database_model = database::Model {
1891 database_id,
1892 name: "test_db".into(),
1893 resource_group: "default".into(),
1894 barrier_interval_ms: None,
1895 checkpoint_frequency: None,
1896 };
1897
1898 let ensembles = vec![NoShuffleEnsemble {
1899 entries: HashSet::from([fragment_id]),
1900 components: HashSet::from([fragment_id]),
1901 }];
1902
1903 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1904 let job_map = HashMap::from([(job_id, job_model)]);
1905
1906 let worker_map = BTreeMap::from([
1908 (
1909 1.into(),
1910 WorkerInfo {
1911 parallelism: NonZeroUsize::new(1).unwrap(),
1912 resource_group: Some("default".into()),
1913 },
1914 ),
1915 (
1916 2.into(),
1917 WorkerInfo {
1918 parallelism: NonZeroUsize::new(1).unwrap(),
1919 resource_group: Some("default".into()),
1920 },
1921 ),
1922 (
1923 3.into(),
1924 WorkerInfo {
1925 parallelism: NonZeroUsize::new(1).unwrap(),
1926 resource_group: Some("default".into()),
1927 },
1928 ),
1929 (
1930 4.into(),
1931 WorkerInfo {
1932 parallelism: NonZeroUsize::new(1).unwrap(),
1933 resource_group: Some("default".into()),
1934 },
1935 ),
1936 (
1937 5.into(),
1938 WorkerInfo {
1939 parallelism: NonZeroUsize::new(1).unwrap(),
1940 resource_group: Some("default".into()),
1941 },
1942 ),
1943 (
1944 6.into(),
1945 WorkerInfo {
1946 parallelism: NonZeroUsize::new(1).unwrap(),
1947 resource_group: Some("default".into()),
1948 },
1949 ),
1950 ]);
1951
1952 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1953 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1954 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1955 let database_map = HashMap::from([(database_id, database_model)]);
1956 let backfill_jobs = HashSet::new();
1957
1958 let context = RenderActorsContext {
1959 fragment_source_ids: &fragment_source_ids,
1960 fragment_splits: &fragment_splits,
1961 streaming_job_databases: &streaming_job_databases,
1962 database_map: &database_map,
1963 backfill_jobs: &backfill_jobs,
1964 };
1965
1966 let result = render_actors(
1967 &actor_id_counter,
1968 &ensembles,
1969 &fragment_map,
1970 &job_map,
1971 &worker_map,
1972 AdaptiveParallelismStrategy::Full,
1973 context,
1974 )
1975 .expect("actor rendering succeeds");
1976
1977 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1978 assert_eq!(
1980 state.len(),
1981 5,
1982 "Fixed parallelism should ignore all strategies"
1983 );
1984 }
1985
1986 #[test]
1988 fn render_actors_ratio_strategy() {
1989 let actor_id_counter = AtomicU32::new(0);
1990 let fragment_id: FragmentId = 1.into();
1991 let job_id: JobId = 103.into();
1992 let database_id: DatabaseId = DatabaseId::new(13);
1993
1994 let fragment_model = build_fragment(
1995 fragment_id,
1996 job_id,
1997 0,
1998 DistributionType::Hash,
1999 16,
2000 StreamingParallelism::Adaptive,
2001 );
2002
2003 let job_model = streaming_job::Model {
2005 job_id,
2006 job_status: JobStatus::Created,
2007 create_type: CreateType::Foreground,
2008 timezone: None,
2009 config_override: None,
2010 adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2011 parallelism: StreamingParallelism::Adaptive,
2012 backfill_parallelism: None,
2013 backfill_orders: None,
2014 max_parallelism: 16,
2015 specific_resource_group: None,
2016 is_serverless_backfill: false,
2017 };
2018
2019 let database_model = database::Model {
2020 database_id,
2021 name: "test_db".into(),
2022 resource_group: "default".into(),
2023 barrier_interval_ms: None,
2024 checkpoint_frequency: None,
2025 };
2026
2027 let ensembles = vec![NoShuffleEnsemble {
2028 entries: HashSet::from([fragment_id]),
2029 components: HashSet::from([fragment_id]),
2030 }];
2031
2032 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
2033 let job_map = HashMap::from([(job_id, job_model)]);
2034
2035 let worker_map = BTreeMap::from([
2037 (
2038 1.into(),
2039 WorkerInfo {
2040 parallelism: NonZeroUsize::new(1).unwrap(),
2041 resource_group: Some("default".into()),
2042 },
2043 ),
2044 (
2045 2.into(),
2046 WorkerInfo {
2047 parallelism: NonZeroUsize::new(1).unwrap(),
2048 resource_group: Some("default".into()),
2049 },
2050 ),
2051 (
2052 3.into(),
2053 WorkerInfo {
2054 parallelism: NonZeroUsize::new(1).unwrap(),
2055 resource_group: Some("default".into()),
2056 },
2057 ),
2058 (
2059 4.into(),
2060 WorkerInfo {
2061 parallelism: NonZeroUsize::new(1).unwrap(),
2062 resource_group: Some("default".into()),
2063 },
2064 ),
2065 (
2066 5.into(),
2067 WorkerInfo {
2068 parallelism: NonZeroUsize::new(1).unwrap(),
2069 resource_group: Some("default".into()),
2070 },
2071 ),
2072 (
2073 6.into(),
2074 WorkerInfo {
2075 parallelism: NonZeroUsize::new(1).unwrap(),
2076 resource_group: Some("default".into()),
2077 },
2078 ),
2079 (
2080 7.into(),
2081 WorkerInfo {
2082 parallelism: NonZeroUsize::new(1).unwrap(),
2083 resource_group: Some("default".into()),
2084 },
2085 ),
2086 (
2087 8.into(),
2088 WorkerInfo {
2089 parallelism: NonZeroUsize::new(1).unwrap(),
2090 resource_group: Some("default".into()),
2091 },
2092 ),
2093 ]);
2094
2095 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2096 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2097 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2098 let database_map = HashMap::from([(database_id, database_model)]);
2099 let backfill_jobs = HashSet::new();
2100
2101 let context = RenderActorsContext {
2102 fragment_source_ids: &fragment_source_ids,
2103 fragment_splits: &fragment_splits,
2104 streaming_job_databases: &streaming_job_databases,
2105 database_map: &database_map,
2106 backfill_jobs: &backfill_jobs,
2107 };
2108
2109 let result = render_actors(
2110 &actor_id_counter,
2111 &ensembles,
2112 &fragment_map,
2113 &job_map,
2114 &worker_map,
2115 AdaptiveParallelismStrategy::Full,
2116 context,
2117 )
2118 .expect("actor rendering succeeds");
2119
2120 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2121 assert_eq!(
2123 state.len(),
2124 4,
2125 "RATIO(0.5) of 8 workers should give 4 actors"
2126 );
2127 }
2128}