1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
23use risingwave_common::id::JobId;
24use risingwave_common::system_param::AdaptiveParallelismStrategy;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_connector::source::{SplitImpl, SplitMetaData};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::prelude::{
29 Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
30};
31use risingwave_meta_model::{
32 CreateType, DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism,
33 WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink, source,
34 streaming_job, table,
35};
36use risingwave_meta_model_migration::Condition;
37use sea_orm::{
38 ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
39 RelationTrait,
40};
41
42use crate::MetaResult;
43use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
44use crate::manager::ActiveStreamingWorkerNodes;
45use crate::model::{ActorId, StreamActor};
46use crate::stream::{AssignerBuilder, SplitDiffOptions};
47
48pub(crate) async fn resolve_streaming_job_definition<C>(
49 txn: &C,
50 job_ids: &HashSet<JobId>,
51) -> MetaResult<HashMap<JobId, String>>
52where
53 C: ConnectionTrait,
54{
55 let job_ids = job_ids.iter().cloned().collect_vec();
56
57 let common_job_definitions: Vec<(JobId, String)> = Table::find()
59 .select_only()
60 .columns([
61 table::Column::TableId,
62 #[cfg(not(debug_assertions))]
63 table::Column::Name,
64 #[cfg(debug_assertions)]
65 table::Column::Definition,
66 ])
67 .filter(table::Column::TableId.is_in(job_ids.clone()))
68 .into_tuple()
69 .all(txn)
70 .await?;
71
72 let sink_definitions: Vec<(JobId, String)> = Sink::find()
73 .select_only()
74 .columns([
75 sink::Column::SinkId,
76 #[cfg(not(debug_assertions))]
77 sink::Column::Name,
78 #[cfg(debug_assertions)]
79 sink::Column::Definition,
80 ])
81 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
82 .into_tuple()
83 .all(txn)
84 .await?;
85
86 let source_definitions: Vec<(JobId, String)> = Source::find()
87 .select_only()
88 .columns([
89 source::Column::SourceId,
90 #[cfg(not(debug_assertions))]
91 source::Column::Name,
92 #[cfg(debug_assertions)]
93 source::Column::Definition,
94 ])
95 .filter(source::Column::SourceId.is_in(job_ids.clone()))
96 .into_tuple()
97 .all(txn)
98 .await?;
99
100 let definitions: HashMap<JobId, String> = common_job_definitions
101 .into_iter()
102 .chain(sink_definitions.into_iter())
103 .chain(source_definitions.into_iter())
104 .collect();
105
106 Ok(definitions)
107}
108
109pub async fn load_fragment_info<C>(
110 txn: &C,
111 actor_id_counter: &AtomicU32,
112 database_id: Option<DatabaseId>,
113 worker_nodes: &ActiveStreamingWorkerNodes,
114 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
115) -> MetaResult<FragmentRenderMap>
116where
117 C: ConnectionTrait,
118{
119 let mut query = StreamingJob::find()
120 .select_only()
121 .column(streaming_job::Column::JobId);
122
123 if let Some(database_id) = database_id {
124 query = query
125 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
126 .filter(object::Column::DatabaseId.eq(database_id));
127 }
128
129 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
130
131 if jobs.is_empty() {
132 return Ok(HashMap::new());
133 }
134
135 let jobs: HashSet<JobId> = jobs.into_iter().collect();
136
137 let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
138
139 if loaded.is_empty() {
140 return Ok(HashMap::new());
141 }
142
143 let available_workers: BTreeMap<_, _> = worker_nodes
144 .current()
145 .values()
146 .filter(|worker| worker.is_streaming_schedulable())
147 .map(|worker| {
148 (
149 worker.id,
150 WorkerInfo {
151 parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
152 resource_group: worker.resource_group(),
153 },
154 )
155 })
156 .collect();
157
158 let RenderedGraph { fragments, .. } = render_actor_assignments(
159 actor_id_counter,
160 &available_workers,
161 adaptive_parallelism_strategy,
162 &loaded,
163 )?;
164
165 Ok(fragments)
166}
167
168#[derive(Debug)]
169pub struct TargetResourcePolicy {
170 pub resource_group: Option<String>,
171 pub parallelism: StreamingParallelism,
172}
173
174#[derive(Debug, Clone)]
175pub struct WorkerInfo {
176 pub parallelism: NonZeroUsize,
177 pub resource_group: Option<String>,
178}
179
180pub type FragmentRenderMap =
181 HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
182
183#[derive(Default)]
184pub struct RenderedGraph {
185 pub fragments: FragmentRenderMap,
186 pub ensembles: Vec<NoShuffleEnsemble>,
187}
188
189impl RenderedGraph {
190 pub fn empty() -> Self {
191 Self::default()
192 }
193}
194
195#[derive(Default)]
199pub struct LoadedFragmentContext {
200 pub ensembles: Vec<NoShuffleEnsemble>,
201 pub fragment_map: HashMap<FragmentId, fragment::Model>,
202 pub job_map: HashMap<JobId, streaming_job::Model>,
203 pub streaming_job_databases: HashMap<JobId, DatabaseId>,
204 pub database_map: HashMap<DatabaseId, database::Model>,
205 pub fragment_source_ids: HashMap<FragmentId, SourceId>,
206 pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
207}
208
209impl LoadedFragmentContext {
210 pub fn is_empty(&self) -> bool {
211 self.ensembles.is_empty()
212 }
213}
214
215pub async fn render_fragments<C>(
220 txn: &C,
221 actor_id_counter: &AtomicU32,
222 ensembles: Vec<NoShuffleEnsemble>,
223 workers: BTreeMap<WorkerId, WorkerInfo>,
224 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
225) -> MetaResult<RenderedGraph>
226where
227 C: ConnectionTrait,
228{
229 let loaded = load_fragment_context(txn, ensembles).await?;
230
231 if loaded.is_empty() {
232 return Ok(RenderedGraph::empty());
233 }
234
235 render_actor_assignments(
236 actor_id_counter,
237 &workers,
238 adaptive_parallelism_strategy,
239 &loaded,
240 )
241}
242
243pub async fn load_fragment_context<C>(
246 txn: &C,
247 ensembles: Vec<NoShuffleEnsemble>,
248) -> MetaResult<LoadedFragmentContext>
249where
250 C: ConnectionTrait,
251{
252 if ensembles.is_empty() {
253 return Ok(LoadedFragmentContext::default());
254 }
255
256 let required_fragment_ids: HashSet<_> = ensembles
257 .iter()
258 .flat_map(|ensemble| ensemble.components.iter().copied())
259 .collect();
260
261 let fragment_models = Fragment::find()
262 .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
263 .all(txn)
264 .await?;
265
266 let found_fragment_ids: HashSet<_> = fragment_models
267 .iter()
268 .map(|fragment| fragment.fragment_id)
269 .collect();
270
271 if found_fragment_ids.len() != required_fragment_ids.len() {
272 let missing = required_fragment_ids
273 .difference(&found_fragment_ids)
274 .copied()
275 .collect_vec();
276 return Err(anyhow!("fragments {:?} not found", missing).into());
277 }
278
279 let fragment_map: HashMap<_, _> = fragment_models
280 .into_iter()
281 .map(|fragment| (fragment.fragment_id, fragment))
282 .collect();
283
284 let job_ids: HashSet<_> = fragment_map
285 .values()
286 .map(|fragment| fragment.job_id)
287 .collect();
288
289 if job_ids.is_empty() {
290 return Ok(LoadedFragmentContext::default());
291 }
292
293 let jobs: HashMap<_, _> = StreamingJob::find()
294 .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
295 .all(txn)
296 .await?
297 .into_iter()
298 .map(|job| (job.job_id, job))
299 .collect();
300
301 let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
302 if found_job_ids.len() != job_ids.len() {
303 let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
304 return Err(anyhow!("streaming jobs {:?} not found", missing).into());
305 }
306
307 build_loaded_context(txn, ensembles, fragment_map, jobs).await
308}
309
310pub async fn render_jobs<C>(
313 txn: &C,
314 actor_id_counter: &AtomicU32,
315 job_ids: HashSet<JobId>,
316 workers: BTreeMap<WorkerId, WorkerInfo>,
317 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
318) -> MetaResult<RenderedGraph>
319where
320 C: ConnectionTrait,
321{
322 let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
323
324 if loaded.is_empty() {
325 return Ok(RenderedGraph::empty());
326 }
327
328 render_actor_assignments(
329 actor_id_counter,
330 &workers,
331 adaptive_parallelism_strategy,
332 &loaded,
333 )
334}
335
336pub async fn load_fragment_context_for_jobs<C>(
339 txn: &C,
340 job_ids: HashSet<JobId>,
341) -> MetaResult<LoadedFragmentContext>
342where
343 C: ConnectionTrait,
344{
345 if job_ids.is_empty() {
346 return Ok(LoadedFragmentContext::default());
347 }
348
349 let excluded_fragments_query = FragmentRelation::find()
350 .select_only()
351 .column(fragment_relation::Column::TargetFragmentId)
352 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
353 .into_query();
354
355 let condition = Condition::all()
356 .add(fragment::Column::JobId.is_in(job_ids.clone()))
357 .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
358
359 let fragments: Vec<FragmentId> = Fragment::find()
360 .select_only()
361 .column(fragment::Column::FragmentId)
362 .filter(condition)
363 .into_tuple()
364 .all(txn)
365 .await?;
366
367 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
368
369 let fragments = Fragment::find()
370 .filter(
371 fragment::Column::FragmentId.is_in(
372 ensembles
373 .iter()
374 .flat_map(|graph| graph.components.iter())
375 .cloned()
376 .collect_vec(),
377 ),
378 )
379 .all(txn)
380 .await?;
381
382 let fragment_map: HashMap<_, _> = fragments
383 .into_iter()
384 .map(|fragment| (fragment.fragment_id, fragment))
385 .collect();
386
387 let job_ids = fragment_map
388 .values()
389 .map(|fragment| fragment.job_id)
390 .collect::<BTreeSet<_>>()
391 .into_iter()
392 .collect_vec();
393
394 let jobs: HashMap<_, _> = StreamingJob::find()
395 .filter(streaming_job::Column::JobId.is_in(job_ids))
396 .all(txn)
397 .await?
398 .into_iter()
399 .map(|job| (job.job_id, job))
400 .collect();
401
402 build_loaded_context(txn, ensembles, fragment_map, jobs).await
403}
404
405pub(crate) fn render_actor_assignments(
408 actor_id_counter: &AtomicU32,
409 worker_map: &BTreeMap<WorkerId, WorkerInfo>,
410 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
411 loaded: &LoadedFragmentContext,
412) -> MetaResult<RenderedGraph> {
413 if loaded.is_empty() {
414 return Ok(RenderedGraph::empty());
415 }
416
417 let backfill_jobs: HashSet<JobId> = loaded
418 .job_map
419 .iter()
420 .filter(|(_, job)| {
421 job.create_type == CreateType::Background && job.job_status == JobStatus::Creating
422 })
423 .map(|(id, _)| *id)
424 .collect();
425
426 let render_context = RenderActorsContext {
427 fragment_source_ids: &loaded.fragment_source_ids,
428 fragment_splits: &loaded.fragment_splits,
429 streaming_job_databases: &loaded.streaming_job_databases,
430 database_map: &loaded.database_map,
431 backfill_jobs: &backfill_jobs,
432 };
433
434 let fragments = render_actors(
435 actor_id_counter,
436 &loaded.ensembles,
437 &loaded.fragment_map,
438 &loaded.job_map,
439 worker_map,
440 adaptive_parallelism_strategy,
441 render_context,
442 )?;
443
444 Ok(RenderedGraph {
445 fragments,
446 ensembles: loaded.ensembles.clone(),
447 })
448}
449
450async fn build_loaded_context<C>(
451 txn: &C,
452 ensembles: Vec<NoShuffleEnsemble>,
453 fragment_map: HashMap<FragmentId, fragment::Model>,
454 job_map: HashMap<JobId, streaming_job::Model>,
455) -> MetaResult<LoadedFragmentContext>
456where
457 C: ConnectionTrait,
458{
459 if ensembles.is_empty() {
460 return Ok(LoadedFragmentContext::default());
461 }
462
463 #[cfg(debug_assertions)]
464 {
465 debug_sanity_check(&ensembles, &fragment_map, &job_map);
466 }
467
468 let (fragment_source_ids, fragment_splits) =
469 resolve_source_fragments(txn, &fragment_map).await?;
470
471 let job_ids = job_map.keys().copied().collect_vec();
472
473 let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
474 .select_only()
475 .column(streaming_job::Column::JobId)
476 .column(object::Column::DatabaseId)
477 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
478 .filter(streaming_job::Column::JobId.is_in(job_ids))
479 .into_tuple()
480 .all(txn)
481 .await?
482 .into_iter()
483 .collect();
484
485 let database_map: HashMap<_, _> = Database::find()
486 .filter(
487 database::Column::DatabaseId
488 .is_in(streaming_job_databases.values().copied().collect_vec()),
489 )
490 .all(txn)
491 .await?
492 .into_iter()
493 .map(|db| (db.database_id, db))
494 .collect();
495
496 Ok(LoadedFragmentContext {
497 ensembles,
498 fragment_map,
499 job_map,
500 streaming_job_databases,
501 database_map,
502 fragment_source_ids,
503 fragment_splits,
504 })
505}
506
507struct RenderActorsContext<'a> {
510 fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
511 fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
512 streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
513 database_map: &'a HashMap<DatabaseId, database::Model>,
514 backfill_jobs: &'a HashSet<JobId>,
515}
516
517fn render_actors(
518 actor_id_counter: &AtomicU32,
519 ensembles: &[NoShuffleEnsemble],
520 fragment_map: &HashMap<FragmentId, fragment::Model>,
521 job_map: &HashMap<JobId, streaming_job::Model>,
522 worker_map: &BTreeMap<WorkerId, WorkerInfo>,
523 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
524 context: RenderActorsContext<'_>,
525) -> MetaResult<FragmentRenderMap> {
526 let RenderActorsContext {
527 fragment_source_ids,
528 fragment_splits: fragment_splits_map,
529 streaming_job_databases,
530 database_map,
531 backfill_jobs,
532 } = context;
533
534 let mut all_fragments: FragmentRenderMap = HashMap::new();
535
536 for NoShuffleEnsemble {
537 entries,
538 components,
539 } in ensembles
540 {
541 tracing::debug!("rendering ensemble entries {:?}", entries);
542
543 let entry_fragments = entries
544 .iter()
545 .map(|fragment_id| fragment_map.get(fragment_id).unwrap())
546 .collect_vec();
547
548 let entry_fragment_parallelism = entry_fragments
549 .iter()
550 .map(|fragment| fragment.parallelism.clone())
551 .dedup()
552 .exactly_one()
553 .map_err(|_| {
554 anyhow!(
555 "entry fragments {:?} have inconsistent parallelism settings",
556 entries.iter().copied().collect_vec()
557 )
558 })?;
559
560 let (job_id, vnode_count) = entry_fragments
561 .iter()
562 .map(|f| (f.job_id, f.vnode_count as usize))
563 .dedup()
564 .exactly_one()
565 .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
566
567 let job = job_map
568 .get(&job_id)
569 .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
570
571 let resource_group = match &job.specific_resource_group {
572 None => {
573 let database = streaming_job_databases
574 .get(&job_id)
575 .and_then(|database_id| database_map.get(database_id))
576 .unwrap();
577 database.resource_group.clone()
578 }
579 Some(resource_group) => resource_group.clone(),
580 };
581
582 let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
583 .iter()
584 .filter_map(|(worker_id, worker)| {
585 if worker
586 .resource_group
587 .as_deref()
588 .unwrap_or(DEFAULT_RESOURCE_GROUP)
589 == resource_group.as_str()
590 {
591 Some((*worker_id, worker.parallelism))
592 } else {
593 None
594 }
595 })
596 .collect();
597
598 let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
599
600 let effective_job_parallelism = if backfill_jobs.contains(&job_id) {
601 job.backfill_parallelism
602 .as_ref()
603 .unwrap_or(&job.parallelism)
604 } else {
605 &job.parallelism
606 };
607
608 let actual_parallelism = match entry_fragment_parallelism
609 .as_ref()
610 .unwrap_or(effective_job_parallelism)
611 {
612 StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
613 adaptive_parallelism_strategy.compute_target_parallelism(total_parallelism)
614 }
615 StreamingParallelism::Fixed(n) => *n,
616 }
617 .min(vnode_count)
618 .min(job.max_parallelism as usize);
619
620 tracing::debug!(
621 "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
622 job_id,
623 actual_parallelism,
624 job.parallelism,
625 total_parallelism,
626 job.max_parallelism,
627 vnode_count,
628 entry_fragment_parallelism
629 );
630
631 let assigner = AssignerBuilder::new(job_id).build();
632
633 let actors = (0..(actual_parallelism as u32))
634 .map_into::<ActorId>()
635 .collect_vec();
636 let vnodes = (0..vnode_count).collect_vec();
637
638 let assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
639
640 let source_entry_fragment = entry_fragments.iter().find(|f| {
641 let mask = FragmentTypeMask::from(f.fragment_type_mask);
642 if mask.contains(FragmentTypeFlag::Source) {
643 assert!(!mask.contains(FragmentTypeFlag::SourceScan))
644 }
645 mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
646 });
647
648 let (fragment_splits, shared_source_id) = match source_entry_fragment {
649 Some(entry_fragment) => {
650 let source_id = fragment_source_ids
651 .get(&entry_fragment.fragment_id)
652 .ok_or_else(|| {
653 anyhow!(
654 "missing source id in source fragment {}",
655 entry_fragment.fragment_id
656 )
657 })?;
658
659 let entry_fragment_id = entry_fragment.fragment_id;
660
661 let empty_actor_splits: HashMap<_, _> =
662 actors.iter().map(|actor_id| (*actor_id, vec![])).collect();
663
664 let splits = fragment_splits_map
665 .get(&entry_fragment_id)
666 .cloned()
667 .unwrap_or_default();
668
669 let splits: BTreeMap<_, _> = splits.into_iter().map(|s| (s.id(), s)).collect();
670
671 let fragment_splits = crate::stream::source_manager::reassign_splits(
672 entry_fragment_id,
673 empty_actor_splits,
674 &splits,
675 SplitDiffOptions::default(),
676 )
677 .unwrap_or_default();
678 (fragment_splits, Some(*source_id))
679 }
680 None => (HashMap::new(), None),
681 };
682
683 for component_fragment_id in components {
684 let &fragment::Model {
685 fragment_id,
686 job_id,
687 fragment_type_mask,
688 distribution_type,
689 ref stream_node,
690 ref state_table_ids,
691 ..
692 } = fragment_map.get(component_fragment_id).unwrap();
693
694 let actor_count =
695 u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
696 let actor_id_base = actor_id_counter.fetch_add(actor_count, Ordering::Relaxed);
697
698 let actors: HashMap<ActorId, InflightActorInfo> = assignment
699 .iter()
700 .flat_map(|(worker_id, actors)| {
701 actors
702 .iter()
703 .map(move |(actor_id, vnodes)| (worker_id, actor_id, vnodes))
704 })
705 .map(|(&worker_id, &actor_idx, vnodes)| {
706 let vnode_bitmap = match distribution_type {
707 DistributionType::Single => None,
708 DistributionType::Hash => Some(Bitmap::from_indices(vnode_count, vnodes)),
709 };
710
711 let actor_id = actor_idx + actor_id_base;
712
713 let splits = if let Some(source_id) = fragment_source_ids.get(&fragment_id) {
714 assert_eq!(shared_source_id, Some(*source_id));
715
716 fragment_splits
717 .get(&(actor_idx))
718 .cloned()
719 .unwrap_or_default()
720 } else {
721 vec![]
722 };
723
724 (
725 actor_id,
726 InflightActorInfo {
727 worker_id,
728 vnode_bitmap,
729 splits,
730 },
731 )
732 })
733 .collect();
734
735 let fragment = InflightFragmentInfo {
736 fragment_id,
737 distribution_type,
738 fragment_type_mask: fragment_type_mask.into(),
739 vnode_count,
740 nodes: stream_node.to_protobuf(),
741 actors,
742 state_table_ids: state_table_ids.inner_ref().iter().copied().collect(),
743 };
744
745 let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
746 anyhow!("streaming job {job_id} not found in streaming_job_databases")
747 })?;
748
749 all_fragments
750 .entry(database_id)
751 .or_default()
752 .entry(job_id)
753 .or_default()
754 .insert(fragment_id, fragment);
755 }
756 }
757
758 Ok(all_fragments)
759}
760
761#[cfg(debug_assertions)]
762fn debug_sanity_check(
763 ensembles: &[NoShuffleEnsemble],
764 fragment_map: &HashMap<FragmentId, fragment::Model>,
765 jobs: &HashMap<JobId, streaming_job::Model>,
766) {
767 debug_assert!(
769 ensembles
770 .iter()
771 .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
772 "entries must be subset of components"
773 );
774
775 let mut missing_fragments = BTreeSet::new();
776 let mut missing_jobs = BTreeSet::new();
777
778 for fragment_id in ensembles
779 .iter()
780 .flat_map(|ensemble| ensemble.components.iter())
781 {
782 match fragment_map.get(fragment_id) {
783 Some(fragment) => {
784 if !jobs.contains_key(&fragment.job_id) {
785 missing_jobs.insert(fragment.job_id);
786 }
787 }
788 None => {
789 missing_fragments.insert(*fragment_id);
790 }
791 }
792 }
793
794 debug_assert!(
795 missing_fragments.is_empty(),
796 "missing fragments in fragment_map: {:?}",
797 missing_fragments
798 );
799
800 debug_assert!(
801 missing_jobs.is_empty(),
802 "missing jobs for fragments' job_id: {:?}",
803 missing_jobs
804 );
805
806 for ensemble in ensembles {
807 let unique_vnode_counts: Vec<_> = ensemble
808 .components
809 .iter()
810 .flat_map(|fragment_id| {
811 fragment_map
812 .get(fragment_id)
813 .map(|fragment| fragment.vnode_count)
814 })
815 .unique()
816 .collect();
817
818 debug_assert!(
819 unique_vnode_counts.len() <= 1,
820 "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
821 ensemble.components,
822 unique_vnode_counts
823 );
824 }
825}
826
827async fn resolve_source_fragments<C>(
828 txn: &C,
829 fragment_map: &HashMap<FragmentId, fragment::Model>,
830) -> MetaResult<(
831 HashMap<FragmentId, SourceId>,
832 HashMap<FragmentId, Vec<SplitImpl>>,
833)>
834where
835 C: ConnectionTrait,
836{
837 let mut source_fragment_ids: HashMap<SourceId, _> = HashMap::new();
838 for (fragment_id, fragment) in fragment_map {
839 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
840 if mask.contains(FragmentTypeFlag::Source)
841 && let Some(source_id) = fragment.stream_node.to_protobuf().find_stream_source()
842 {
843 source_fragment_ids
844 .entry(source_id)
845 .or_insert_with(BTreeSet::new)
846 .insert(fragment_id);
847 }
848
849 if mask.contains(FragmentTypeFlag::SourceScan)
850 && let Some((source_id, _)) = fragment.stream_node.to_protobuf().find_source_backfill()
851 {
852 source_fragment_ids
853 .entry(source_id)
854 .or_insert_with(BTreeSet::new)
855 .insert(fragment_id);
856 }
857 }
858
859 let fragment_source_ids: HashMap<_, _> = source_fragment_ids
860 .iter()
861 .flat_map(|(source_id, fragment_ids)| {
862 fragment_ids
863 .iter()
864 .map(|fragment_id| (**fragment_id, *source_id as SourceId))
865 })
866 .collect();
867
868 let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
869
870 let fragment_splits: Vec<_> = FragmentSplits::find()
871 .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
872 .all(txn)
873 .await?;
874
875 let fragment_splits: HashMap<_, _> = fragment_splits
876 .into_iter()
877 .flat_map(|model| {
878 model.splits.map(|splits| {
879 (
880 model.fragment_id,
881 splits
882 .to_protobuf()
883 .splits
884 .iter()
885 .flat_map(SplitImpl::try_from)
886 .collect_vec(),
887 )
888 })
889 })
890 .collect();
891
892 Ok((fragment_source_ids, fragment_splits))
893}
894
895#[derive(Debug)]
897pub struct ActorGraph<'a> {
898 pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
899 pub locations: &'a HashMap<ActorId, WorkerId>,
900}
901
902#[derive(Debug, Clone)]
903pub struct NoShuffleEnsemble {
904 entries: HashSet<FragmentId>,
905 components: HashSet<FragmentId>,
906}
907
908impl NoShuffleEnsemble {
909 pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
910 self.components.iter().cloned()
911 }
912
913 pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
914 self.entries.iter().copied()
915 }
916
917 pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
918 self.components.iter().copied()
919 }
920
921 pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
922 self.entries.contains(fragment_id)
923 }
924}
925
926pub async fn find_fragment_no_shuffle_dags_detailed(
927 db: &impl ConnectionTrait,
928 initial_fragment_ids: &[FragmentId],
929) -> MetaResult<Vec<NoShuffleEnsemble>> {
930 let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
931 .columns([
932 fragment_relation::Column::SourceFragmentId,
933 fragment_relation::Column::TargetFragmentId,
934 ])
935 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
936 .into_tuple()
937 .all(db)
938 .await?;
939
940 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
941 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
942
943 for (src, dst) in all_no_shuffle_relations {
944 forward_edges.entry(src).or_default().push(dst);
945 backward_edges.entry(dst).or_default().push(src);
946 }
947
948 find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
949}
950
951fn find_no_shuffle_graphs(
952 initial_fragment_ids: &[impl Into<FragmentId> + Copy],
953 forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
954 backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
955) -> MetaResult<Vec<NoShuffleEnsemble>> {
956 let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
957 let mut globally_visited: HashSet<FragmentId> = HashSet::new();
958
959 for &init_id in initial_fragment_ids {
960 let init_id = init_id.into();
961 if globally_visited.contains(&init_id) {
962 continue;
963 }
964
965 let mut components = HashSet::new();
967 let mut queue: VecDeque<FragmentId> = VecDeque::new();
968
969 queue.push_back(init_id);
970 globally_visited.insert(init_id);
971
972 while let Some(current_id) = queue.pop_front() {
973 components.insert(current_id);
974 let neighbors = forward_edges
975 .get(¤t_id)
976 .into_iter()
977 .flatten()
978 .chain(backward_edges.get(¤t_id).into_iter().flatten());
979
980 for &neighbor_id in neighbors {
981 if globally_visited.insert(neighbor_id) {
982 queue.push_back(neighbor_id);
983 }
984 }
985 }
986
987 let mut entries = HashSet::new();
989 for &node_id in &components {
990 let is_root = match backward_edges.get(&node_id) {
991 Some(parents) => parents.iter().all(|p| !components.contains(p)),
992 None => true,
993 };
994 if is_root {
995 entries.insert(node_id);
996 }
997 }
998
999 if !entries.is_empty() {
1001 graphs.push(NoShuffleEnsemble {
1002 entries,
1003 components,
1004 });
1005 }
1006 }
1007
1008 Ok(graphs)
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013 use std::collections::{BTreeSet, HashMap, HashSet};
1014 use std::sync::Arc;
1015
1016 use risingwave_connector::source::SplitImpl;
1017 use risingwave_connector::source::test_source::TestSourceSplit;
1018 use risingwave_meta_model::{CreateType, I32Array, JobStatus, StreamNode, TableIdArray};
1019 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1020
1021 use super::*;
1022
1023 type Edges = (
1026 HashMap<FragmentId, Vec<FragmentId>>,
1027 HashMap<FragmentId, Vec<FragmentId>>,
1028 );
1029
1030 fn build_edges(relations: &[(u32, u32)]) -> Edges {
1033 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1034 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1035 for &(src, dst) in relations {
1036 forward_edges
1037 .entry(src.into())
1038 .or_default()
1039 .push(dst.into());
1040 backward_edges
1041 .entry(dst.into())
1042 .or_default()
1043 .push(src.into());
1044 }
1045 (forward_edges, backward_edges)
1046 }
1047
1048 fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1050 ids.iter().map(|id| (*id).into()).collect()
1051 }
1052
1053 #[allow(deprecated)]
1054 fn build_fragment(
1055 fragment_id: FragmentId,
1056 job_id: JobId,
1057 fragment_type_mask: i32,
1058 distribution_type: DistributionType,
1059 vnode_count: i32,
1060 parallelism: StreamingParallelism,
1061 ) -> fragment::Model {
1062 fragment::Model {
1063 fragment_id,
1064 job_id,
1065 fragment_type_mask,
1066 distribution_type,
1067 stream_node: StreamNode::from(&PbStreamNode::default()),
1068 state_table_ids: TableIdArray::default(),
1069 upstream_fragment_id: I32Array::default(),
1070 vnode_count,
1071 parallelism: Some(parallelism),
1072 }
1073 }
1074
1075 type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1076
1077 fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1078 let base = fragment.actors.keys().copied().min().unwrap_or_default();
1079
1080 let mut entries: Vec<_> = fragment
1081 .actors
1082 .iter()
1083 .map(|(&actor_id, info)| {
1084 let idx = actor_id.as_raw_id() - base.as_raw_id();
1085 let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1086 bitmap
1087 .iter()
1088 .enumerate()
1089 .filter_map(|(pos, is_set)| is_set.then_some(pos))
1090 .collect::<Vec<_>>()
1091 });
1092 let splits = info
1093 .splits
1094 .iter()
1095 .map(|split| split.id().to_string())
1096 .collect::<Vec<_>>();
1097 (idx.into(), info.worker_id, vnode_indices, splits)
1098 })
1099 .collect();
1100
1101 entries.sort_by_key(|(idx, _, _, _)| *idx);
1102 entries
1103 }
1104
1105 #[test]
1106 fn test_single_linear_chain() {
1107 let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1110 let initial_ids = &[2];
1111
1112 let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1114
1115 assert!(result.is_ok());
1117 let graphs = result.unwrap();
1118
1119 assert_eq!(graphs.len(), 1);
1120 let graph = &graphs[0];
1121 assert_eq!(graph.entries, to_hashset(&[1]));
1122 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1123 }
1124
1125 #[test]
1126 fn test_two_disconnected_graphs() {
1127 let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1130 let initial_ids = &[2, 10];
1131
1132 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1134
1135 assert_eq!(graphs.len(), 2);
1137
1138 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1140
1141 assert_eq!(graphs[0].entries, to_hashset(&[1]));
1143 assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1144
1145 assert_eq!(graphs[1].entries, to_hashset(&[10]));
1147 assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1148 }
1149
1150 #[test]
1151 fn test_multiple_entries_in_one_graph() {
1152 let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1154 let initial_ids = &[3];
1155
1156 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1158
1159 assert_eq!(graphs.len(), 1);
1161 let graph = &graphs[0];
1162 assert_eq!(graph.entries, to_hashset(&[1, 2]));
1163 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1164 }
1165
1166 #[test]
1167 fn test_diamond_shape_graph() {
1168 let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1170 let initial_ids = &[4];
1171
1172 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1174
1175 assert_eq!(graphs.len(), 1);
1177 let graph = &graphs[0];
1178 assert_eq!(graph.entries, to_hashset(&[1]));
1179 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1180 }
1181
1182 #[test]
1183 fn test_starting_with_multiple_nodes_in_same_graph() {
1184 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1187 let initial_ids = &[2, 4];
1188
1189 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1191
1192 assert_eq!(graphs.len(), 1);
1194 let graph = &graphs[0];
1195 assert_eq!(graph.entries, to_hashset(&[1]));
1196 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1197 }
1198
1199 #[test]
1200 fn test_empty_initial_ids() {
1201 let (forward, backward) = build_edges(&[(1, 2)]);
1203 let initial_ids: &[u32] = &[];
1204
1205 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1207
1208 assert!(graphs.is_empty());
1210 }
1211
1212 #[test]
1213 fn test_isolated_node_as_input() {
1214 let (forward, backward) = build_edges(&[(1, 2)]);
1216 let initial_ids = &[100];
1217
1218 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1220
1221 assert_eq!(graphs.len(), 1);
1223 let graph = &graphs[0];
1224 assert_eq!(graph.entries, to_hashset(&[100]));
1225 assert_eq!(graph.components, to_hashset(&[100]));
1226 }
1227
1228 #[test]
1229 fn test_graph_with_a_cycle() {
1230 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1235 let initial_ids = &[2];
1236
1237 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1239
1240 assert!(
1242 graphs.is_empty(),
1243 "A graph with no entries should not be returned"
1244 );
1245 }
1246 #[test]
1247 fn test_custom_complex() {
1248 let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1249 let initial_ids = &[1, 2, 4, 6];
1250
1251 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1253
1254 assert_eq!(graphs.len(), 2);
1256 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1258
1259 assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1261 assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1262
1263 assert_eq!(graphs[1].entries, to_hashset(&[6]));
1265 assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1266 }
1267
1268 #[test]
1269 fn render_actors_increments_actor_counter() {
1270 let actor_id_counter = AtomicU32::new(100);
1271 let fragment_id: FragmentId = 1.into();
1272 let job_id: JobId = 10.into();
1273 let database_id: DatabaseId = DatabaseId::new(3);
1274
1275 let fragment_model = build_fragment(
1276 fragment_id,
1277 job_id,
1278 0,
1279 DistributionType::Single,
1280 1,
1281 StreamingParallelism::Fixed(1),
1282 );
1283
1284 let job_model = streaming_job::Model {
1285 job_id,
1286 job_status: JobStatus::Created,
1287 create_type: CreateType::Foreground,
1288 timezone: None,
1289 config_override: None,
1290 parallelism: StreamingParallelism::Fixed(1),
1291 backfill_parallelism: None,
1292 backfill_orders: None,
1293 max_parallelism: 1,
1294 specific_resource_group: None,
1295 is_serverless_backfill: false,
1296 };
1297
1298 let database_model = database::Model {
1299 database_id,
1300 name: "test_db".into(),
1301 resource_group: "rg-a".into(),
1302 barrier_interval_ms: None,
1303 checkpoint_frequency: None,
1304 };
1305
1306 let ensembles = vec![NoShuffleEnsemble {
1307 entries: HashSet::from([fragment_id]),
1308 components: HashSet::from([fragment_id]),
1309 }];
1310
1311 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1312 let job_map = HashMap::from([(job_id, job_model)]);
1313
1314 let worker_map = BTreeMap::from([(
1315 1.into(),
1316 WorkerInfo {
1317 parallelism: NonZeroUsize::new(1).unwrap(),
1318 resource_group: Some("rg-a".into()),
1319 },
1320 )]);
1321
1322 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1323 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1324 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1325 let database_map = HashMap::from([(database_id, database_model)]);
1326 let backfill_jobs = HashSet::new();
1327
1328 let context = RenderActorsContext {
1329 fragment_source_ids: &fragment_source_ids,
1330 fragment_splits: &fragment_splits,
1331 streaming_job_databases: &streaming_job_databases,
1332 database_map: &database_map,
1333 backfill_jobs: &backfill_jobs,
1334 };
1335
1336 let result = render_actors(
1337 &actor_id_counter,
1338 &ensembles,
1339 &fragment_map,
1340 &job_map,
1341 &worker_map,
1342 AdaptiveParallelismStrategy::Auto,
1343 context,
1344 )
1345 .expect("actor rendering succeeds");
1346
1347 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1348 assert_eq!(state.len(), 1);
1349 assert!(
1350 state[0].2.is_none(),
1351 "single distribution should not assign vnode bitmaps"
1352 );
1353 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
1354 }
1355
1356 #[test]
1357 fn render_actors_aligns_hash_vnode_bitmaps() {
1358 let actor_id_counter = AtomicU32::new(0);
1359 let entry_fragment_id: FragmentId = 1.into();
1360 let downstream_fragment_id: FragmentId = 2.into();
1361 let job_id: JobId = 20.into();
1362 let database_id: DatabaseId = DatabaseId::new(5);
1363
1364 let entry_fragment = build_fragment(
1365 entry_fragment_id,
1366 job_id,
1367 0,
1368 DistributionType::Hash,
1369 4,
1370 StreamingParallelism::Fixed(2),
1371 );
1372
1373 let downstream_fragment = build_fragment(
1374 downstream_fragment_id,
1375 job_id,
1376 0,
1377 DistributionType::Hash,
1378 4,
1379 StreamingParallelism::Fixed(2),
1380 );
1381
1382 let job_model = streaming_job::Model {
1383 job_id,
1384 job_status: JobStatus::Created,
1385 create_type: CreateType::Background,
1386 timezone: None,
1387 config_override: None,
1388 parallelism: StreamingParallelism::Fixed(2),
1389 backfill_parallelism: None,
1390 backfill_orders: None,
1391 max_parallelism: 2,
1392 specific_resource_group: None,
1393 is_serverless_backfill: false,
1394 };
1395
1396 let database_model = database::Model {
1397 database_id,
1398 name: "test_db_hash".into(),
1399 resource_group: "rg-hash".into(),
1400 barrier_interval_ms: None,
1401 checkpoint_frequency: None,
1402 };
1403
1404 let ensembles = vec![NoShuffleEnsemble {
1405 entries: HashSet::from([entry_fragment_id]),
1406 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1407 }];
1408
1409 let fragment_map = HashMap::from([
1410 (entry_fragment_id, entry_fragment),
1411 (downstream_fragment_id, downstream_fragment),
1412 ]);
1413 let job_map = HashMap::from([(job_id, job_model)]);
1414
1415 let worker_map = BTreeMap::from([
1416 (
1417 1.into(),
1418 WorkerInfo {
1419 parallelism: NonZeroUsize::new(1).unwrap(),
1420 resource_group: Some("rg-hash".into()),
1421 },
1422 ),
1423 (
1424 2.into(),
1425 WorkerInfo {
1426 parallelism: NonZeroUsize::new(1).unwrap(),
1427 resource_group: Some("rg-hash".into()),
1428 },
1429 ),
1430 ]);
1431
1432 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1433 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1434 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1435 let database_map = HashMap::from([(database_id, database_model)]);
1436 let backfill_jobs = HashSet::new();
1437
1438 let context = RenderActorsContext {
1439 fragment_source_ids: &fragment_source_ids,
1440 fragment_splits: &fragment_splits,
1441 streaming_job_databases: &streaming_job_databases,
1442 database_map: &database_map,
1443 backfill_jobs: &backfill_jobs,
1444 };
1445
1446 let result = render_actors(
1447 &actor_id_counter,
1448 &ensembles,
1449 &fragment_map,
1450 &job_map,
1451 &worker_map,
1452 AdaptiveParallelismStrategy::Auto,
1453 context,
1454 )
1455 .expect("actor rendering succeeds");
1456
1457 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1458 let downstream_state =
1459 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1460
1461 assert_eq!(entry_state.len(), 2);
1462 assert_eq!(entry_state, downstream_state);
1463
1464 let assigned_vnodes: BTreeSet<_> = entry_state
1465 .iter()
1466 .flat_map(|(_, _, vnodes, _)| {
1467 vnodes
1468 .as_ref()
1469 .expect("hash distribution should populate vnode bitmap")
1470 .iter()
1471 .copied()
1472 })
1473 .collect();
1474 assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
1475 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1476 }
1477
1478 #[test]
1479 fn render_actors_propagates_source_splits() {
1480 let actor_id_counter = AtomicU32::new(0);
1481 let entry_fragment_id: FragmentId = 11.into();
1482 let downstream_fragment_id: FragmentId = 12.into();
1483 let job_id: JobId = 30.into();
1484 let database_id: DatabaseId = DatabaseId::new(7);
1485 let source_id: SourceId = 99.into();
1486
1487 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
1488 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
1489
1490 let entry_fragment = build_fragment(
1491 entry_fragment_id,
1492 job_id,
1493 source_mask,
1494 DistributionType::Hash,
1495 4,
1496 StreamingParallelism::Fixed(2),
1497 );
1498
1499 let downstream_fragment = build_fragment(
1500 downstream_fragment_id,
1501 job_id,
1502 source_scan_mask,
1503 DistributionType::Hash,
1504 4,
1505 StreamingParallelism::Fixed(2),
1506 );
1507
1508 let job_model = streaming_job::Model {
1509 job_id,
1510 job_status: JobStatus::Created,
1511 create_type: CreateType::Background,
1512 timezone: None,
1513 config_override: None,
1514 parallelism: StreamingParallelism::Fixed(2),
1515 backfill_parallelism: None,
1516 backfill_orders: None,
1517 max_parallelism: 2,
1518 specific_resource_group: None,
1519 is_serverless_backfill: false,
1520 };
1521
1522 let database_model = database::Model {
1523 database_id,
1524 name: "split_db".into(),
1525 resource_group: "rg-source".into(),
1526 barrier_interval_ms: None,
1527 checkpoint_frequency: None,
1528 };
1529
1530 let ensembles = vec![NoShuffleEnsemble {
1531 entries: HashSet::from([entry_fragment_id]),
1532 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1533 }];
1534
1535 let fragment_map = HashMap::from([
1536 (entry_fragment_id, entry_fragment),
1537 (downstream_fragment_id, downstream_fragment),
1538 ]);
1539 let job_map = HashMap::from([(job_id, job_model)]);
1540
1541 let worker_map = BTreeMap::from([
1542 (
1543 1.into(),
1544 WorkerInfo {
1545 parallelism: NonZeroUsize::new(1).unwrap(),
1546 resource_group: Some("rg-source".into()),
1547 },
1548 ),
1549 (
1550 2.into(),
1551 WorkerInfo {
1552 parallelism: NonZeroUsize::new(1).unwrap(),
1553 resource_group: Some("rg-source".into()),
1554 },
1555 ),
1556 ]);
1557
1558 let split_a = SplitImpl::Test(TestSourceSplit {
1559 id: Arc::<str>::from("split-a"),
1560 properties: HashMap::new(),
1561 offset: "0".into(),
1562 });
1563 let split_b = SplitImpl::Test(TestSourceSplit {
1564 id: Arc::<str>::from("split-b"),
1565 properties: HashMap::new(),
1566 offset: "0".into(),
1567 });
1568
1569 let fragment_source_ids = HashMap::from([
1570 (entry_fragment_id, source_id),
1571 (downstream_fragment_id, source_id),
1572 ]);
1573 let fragment_splits =
1574 HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
1575 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1576 let database_map = HashMap::from([(database_id, database_model)]);
1577 let backfill_jobs = HashSet::new();
1578
1579 let context = RenderActorsContext {
1580 fragment_source_ids: &fragment_source_ids,
1581 fragment_splits: &fragment_splits,
1582 streaming_job_databases: &streaming_job_databases,
1583 database_map: &database_map,
1584 backfill_jobs: &backfill_jobs,
1585 };
1586
1587 let result = render_actors(
1588 &actor_id_counter,
1589 &ensembles,
1590 &fragment_map,
1591 &job_map,
1592 &worker_map,
1593 AdaptiveParallelismStrategy::Auto,
1594 context,
1595 )
1596 .expect("actor rendering succeeds");
1597
1598 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1599 let downstream_state =
1600 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1601
1602 assert_eq!(entry_state, downstream_state);
1603
1604 let split_ids: BTreeSet<_> = entry_state
1605 .iter()
1606 .flat_map(|(_, _, _, splits)| splits.iter().cloned())
1607 .collect();
1608 assert_eq!(
1609 split_ids,
1610 BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
1611 );
1612 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1613 }
1614}