1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::system_param::AdaptiveParallelismStrategy;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_connector::source::{SplitImpl, SplitMetaData};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::prelude::{
29 Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
30};
31use risingwave_meta_model::{
32 CreateType, DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism,
33 WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink, source,
34 streaming_job, table,
35};
36use risingwave_meta_model_migration::Condition;
37use risingwave_pb::common::WorkerNode;
38use risingwave_pb::stream_plan::PbStreamNode;
39use sea_orm::{
40 ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
41 RelationTrait,
42};
43
44use crate::MetaResult;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::manager::ActiveStreamingWorkerNodes;
47use crate::model::{ActorId, StreamActor, StreamingJobModelContextExt};
48use crate::stream::{AssignerBuilder, SplitDiffOptions};
49
50pub(crate) async fn resolve_streaming_job_definition<C>(
51 txn: &C,
52 job_ids: &HashSet<JobId>,
53) -> MetaResult<HashMap<JobId, String>>
54where
55 C: ConnectionTrait,
56{
57 let job_ids = job_ids.iter().cloned().collect_vec();
58
59 let common_job_definitions: Vec<(JobId, String)> = Table::find()
61 .select_only()
62 .columns([
63 table::Column::TableId,
64 #[cfg(not(debug_assertions))]
65 table::Column::Name,
66 #[cfg(debug_assertions)]
67 table::Column::Definition,
68 ])
69 .filter(table::Column::TableId.is_in(job_ids.clone()))
70 .into_tuple()
71 .all(txn)
72 .await?;
73
74 let sink_definitions: Vec<(JobId, String)> = Sink::find()
75 .select_only()
76 .columns([
77 sink::Column::SinkId,
78 #[cfg(not(debug_assertions))]
79 sink::Column::Name,
80 #[cfg(debug_assertions)]
81 sink::Column::Definition,
82 ])
83 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
84 .into_tuple()
85 .all(txn)
86 .await?;
87
88 let source_definitions: Vec<(JobId, String)> = Source::find()
89 .select_only()
90 .columns([
91 source::Column::SourceId,
92 #[cfg(not(debug_assertions))]
93 source::Column::Name,
94 #[cfg(debug_assertions)]
95 source::Column::Definition,
96 ])
97 .filter(source::Column::SourceId.is_in(job_ids.clone()))
98 .into_tuple()
99 .all(txn)
100 .await?;
101
102 let definitions: HashMap<JobId, String> = common_job_definitions
103 .into_iter()
104 .chain(sink_definitions.into_iter())
105 .chain(source_definitions.into_iter())
106 .collect();
107
108 Ok(definitions)
109}
110
111pub async fn load_fragment_info<C>(
112 txn: &C,
113 actor_id_counter: &AtomicU32,
114 database_id: Option<DatabaseId>,
115 worker_nodes: &ActiveStreamingWorkerNodes,
116 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
117) -> MetaResult<FragmentRenderMap>
118where
119 C: ConnectionTrait,
120{
121 let mut query = StreamingJob::find()
122 .select_only()
123 .column(streaming_job::Column::JobId);
124
125 if let Some(database_id) = database_id {
126 query = query
127 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
128 .filter(object::Column::DatabaseId.eq(database_id));
129 }
130
131 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
132
133 if jobs.is_empty() {
134 return Ok(HashMap::new());
135 }
136
137 let jobs: HashSet<JobId> = jobs.into_iter().collect();
138
139 let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
140
141 if loaded.is_empty() {
142 return Ok(HashMap::new());
143 }
144
145 let RenderedGraph { fragments, .. } = render_actor_assignments(
146 actor_id_counter,
147 worker_nodes.current(),
148 adaptive_parallelism_strategy,
149 &loaded,
150 )?;
151
152 Ok(fragments)
153}
154
155#[derive(Debug)]
156pub struct TargetResourcePolicy {
157 pub resource_group: Option<String>,
158 pub parallelism: StreamingParallelism,
159}
160
161#[derive(Debug, Clone)]
162pub struct WorkerInfo {
163 pub parallelism: NonZeroUsize,
164 pub resource_group: Option<String>,
165}
166
167pub type FragmentRenderMap =
168 HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
169
170#[derive(Default)]
171pub struct RenderedGraph {
172 pub fragments: FragmentRenderMap,
173 pub ensembles: Vec<NoShuffleEnsemble>,
174}
175
176impl RenderedGraph {
177 pub fn empty() -> Self {
178 Self::default()
179 }
180}
181
182#[derive(Clone, Debug)]
186pub struct LoadedFragment {
187 pub fragment_id: FragmentId,
188 pub job_id: JobId,
189 pub fragment_type_mask: FragmentTypeMask,
190 pub distribution_type: DistributionType,
191 pub vnode_count: usize,
192 pub nodes: PbStreamNode,
193 pub state_table_ids: HashSet<TableId>,
194 pub parallelism: Option<StreamingParallelism>,
195}
196
197impl From<fragment::Model> for LoadedFragment {
198 fn from(model: fragment::Model) -> Self {
199 Self {
200 fragment_id: model.fragment_id,
201 job_id: model.job_id,
202 fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
203 distribution_type: model.distribution_type,
204 vnode_count: model.vnode_count as usize,
205 nodes: model.stream_node.to_protobuf(),
206 state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
207 parallelism: model.parallelism,
208 }
209 }
210}
211
212#[derive(Default, Debug, Clone)]
213pub struct LoadedFragmentContext {
214 pub ensembles: Vec<NoShuffleEnsemble>,
215 pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
216 pub job_map: HashMap<JobId, streaming_job::Model>,
217 pub streaming_job_databases: HashMap<JobId, DatabaseId>,
218 pub database_map: HashMap<DatabaseId, database::Model>,
219 pub fragment_source_ids: HashMap<FragmentId, SourceId>,
220 pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
221}
222
223impl LoadedFragmentContext {
224 pub fn is_empty(&self) -> bool {
225 if self.ensembles.is_empty() {
226 assert!(
227 self.job_fragments.is_empty(),
228 "non-empty job fragments for empty ensembles: {:?}",
229 self.job_fragments
230 );
231 true
232 } else {
233 false
234 }
235 }
236
237 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
238 let job_ids: HashSet<JobId> = self
239 .streaming_job_databases
240 .iter()
241 .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
242 .collect();
243
244 if job_ids.is_empty() {
245 return None;
246 }
247
248 let job_fragments: HashMap<_, _> = job_ids
249 .iter()
250 .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
251 .collect();
252
253 let fragment_ids: HashSet<_> = job_fragments
254 .values()
255 .flat_map(|fragments| fragments.keys().copied())
256 .collect();
257
258 assert!(
259 !fragment_ids.is_empty(),
260 "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
261 );
262
263 let ensembles: Vec<NoShuffleEnsemble> = self
264 .ensembles
265 .iter()
266 .filter(|ensemble| {
267 if ensemble
268 .components
269 .iter()
270 .any(|fragment_id| fragment_ids.contains(fragment_id))
271 {
272 assert!(
273 ensemble
274 .components
275 .iter()
276 .all(|fragment_id| fragment_ids.contains(fragment_id)),
277 "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
278 );
279 true
280 } else {
281 false
282 }
283 })
284 .cloned()
285 .collect();
286
287 assert!(
288 !ensembles.is_empty(),
289 "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
290 );
291
292 let job_map = job_ids
293 .iter()
294 .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
295 .collect();
296
297 let streaming_job_databases = job_ids
298 .iter()
299 .filter_map(|job_id| {
300 self.streaming_job_databases
301 .get(job_id)
302 .map(|db_id| (*job_id, *db_id))
303 })
304 .collect();
305
306 let database_model = self.database_map[&database_id].clone();
307 let database_map = HashMap::from([(database_id, database_model)]);
308
309 let fragment_source_ids = self
310 .fragment_source_ids
311 .iter()
312 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
313 .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
314 .collect();
315
316 let fragment_splits = self
317 .fragment_splits
318 .iter()
319 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
320 .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
321 .collect();
322
323 Some(Self {
324 ensembles,
325 job_fragments,
326 job_map,
327 streaming_job_databases,
328 database_map,
329 fragment_source_ids,
330 fragment_splits,
331 })
332 }
333
334 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
337 let Self {
338 ensembles,
339 mut job_fragments,
340 mut job_map,
341 streaming_job_databases,
342 mut database_map,
343 mut fragment_source_ids,
344 mut fragment_splits,
345 } = self;
346
347 let mut contexts = HashMap::<DatabaseId, Self>::new();
348 let mut fragment_databases = HashMap::<FragmentId, DatabaseId>::new();
349 let mut unresolved_ensembles = 0usize;
350 let mut unresolved_ensemble_sample: Option<Vec<FragmentId>> = None;
351
352 for (job_id, database_id) in streaming_job_databases {
353 let context = contexts.entry(database_id).or_insert_with(|| {
354 let database_model = database_map
355 .remove(&database_id)
356 .expect("database should exist for streaming job");
357 Self {
358 ensembles: Vec::new(),
359 job_fragments: HashMap::new(),
360 job_map: HashMap::new(),
361 streaming_job_databases: HashMap::new(),
362 database_map: HashMap::from([(database_id, database_model)]),
363 fragment_source_ids: HashMap::new(),
364 fragment_splits: HashMap::new(),
365 }
366 });
367
368 let fragments = job_fragments
369 .remove(&job_id)
370 .expect("job fragments should exist for streaming job");
371 for fragment_id in fragments.keys().copied() {
372 fragment_databases.insert(fragment_id, database_id);
373 if let Some(source_id) = fragment_source_ids.remove(&fragment_id) {
374 context.fragment_source_ids.insert(fragment_id, source_id);
375 }
376 if let Some(splits) = fragment_splits.remove(&fragment_id) {
377 context.fragment_splits.insert(fragment_id, splits);
378 }
379 }
380
381 assert!(
382 context
383 .job_map
384 .insert(
385 job_id,
386 job_map
387 .remove(&job_id)
388 .expect("streaming job should exist for loaded context"),
389 )
390 .is_none(),
391 "duplicated streaming job"
392 );
393 assert!(
394 context.job_fragments.insert(job_id, fragments).is_none(),
395 "duplicated job fragments"
396 );
397 assert!(
398 context
399 .streaming_job_databases
400 .insert(job_id, database_id)
401 .is_none(),
402 "duplicated job database mapping"
403 );
404 }
405
406 for ensemble in ensembles {
407 let Some(database_id) = ensemble
408 .components
409 .iter()
410 .find_map(|fragment_id| fragment_databases.get(fragment_id).copied())
411 else {
412 unresolved_ensembles += 1;
413 if unresolved_ensemble_sample.is_none() {
414 unresolved_ensemble_sample =
415 Some(ensemble.components.iter().copied().collect());
416 }
417 continue;
418 };
419
420 debug_assert!(
421 ensemble
422 .components
423 .iter()
424 .all(|fragment_id| fragment_databases.get(fragment_id) == Some(&database_id)),
425 "ensemble {ensemble:?} should belong to a single database"
426 );
427
428 contexts
429 .get_mut(&database_id)
430 .expect("database context should exist for ensemble")
431 .ensembles
432 .push(ensemble);
433 }
434
435 if unresolved_ensembles > 0 {
436 tracing::warn!(
437 unresolved_ensembles,
438 ?unresolved_ensemble_sample,
439 known_fragments = fragment_databases.len(),
440 "skip ensembles without resolved database while splitting loaded context"
441 );
442 }
443 debug_assert_eq!(
444 unresolved_ensembles, 0,
445 "all ensembles should be mappable to a database"
446 );
447
448 contexts
449 }
450}
451
452pub async fn render_fragments<C>(
457 txn: &C,
458 actor_id_counter: &AtomicU32,
459 ensembles: Vec<NoShuffleEnsemble>,
460 workers: &HashMap<WorkerId, WorkerNode>,
461 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
462) -> MetaResult<RenderedGraph>
463where
464 C: ConnectionTrait,
465{
466 let loaded = load_fragment_context(txn, ensembles).await?;
467
468 if loaded.is_empty() {
469 return Ok(RenderedGraph::empty());
470 }
471
472 render_actor_assignments(
473 actor_id_counter,
474 workers,
475 adaptive_parallelism_strategy,
476 &loaded,
477 )
478}
479
480pub async fn load_fragment_context<C>(
483 txn: &C,
484 ensembles: Vec<NoShuffleEnsemble>,
485) -> MetaResult<LoadedFragmentContext>
486where
487 C: ConnectionTrait,
488{
489 if ensembles.is_empty() {
490 return Ok(LoadedFragmentContext::default());
491 }
492
493 let required_fragment_ids: HashSet<_> = ensembles
494 .iter()
495 .flat_map(|ensemble| ensemble.components.iter().copied())
496 .collect();
497
498 let fragment_models = Fragment::find()
499 .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
500 .all(txn)
501 .await?;
502
503 let found_fragment_ids: HashSet<_> = fragment_models
504 .iter()
505 .map(|fragment| fragment.fragment_id)
506 .collect();
507
508 if found_fragment_ids.len() != required_fragment_ids.len() {
509 let missing = required_fragment_ids
510 .difference(&found_fragment_ids)
511 .copied()
512 .collect_vec();
513 return Err(anyhow!("fragments {:?} not found", missing).into());
514 }
515
516 let fragment_models: HashMap<_, _> = fragment_models
517 .into_iter()
518 .map(|fragment| (fragment.fragment_id, fragment))
519 .collect();
520
521 let job_ids: HashSet<_> = fragment_models
522 .values()
523 .map(|fragment| fragment.job_id)
524 .collect();
525
526 if job_ids.is_empty() {
527 return Ok(LoadedFragmentContext::default());
528 }
529
530 let jobs: HashMap<_, _> = StreamingJob::find()
531 .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
532 .all(txn)
533 .await?
534 .into_iter()
535 .map(|job| (job.job_id, job))
536 .collect();
537
538 let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
539 if found_job_ids.len() != job_ids.len() {
540 let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
541 return Err(anyhow!("streaming jobs {:?} not found", missing).into());
542 }
543
544 build_loaded_context(txn, ensembles, fragment_models, jobs).await
545}
546
547pub async fn render_jobs<C>(
550 txn: &C,
551 actor_id_counter: &AtomicU32,
552 job_ids: HashSet<JobId>,
553 workers: &HashMap<WorkerId, WorkerNode>,
554 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
555) -> MetaResult<RenderedGraph>
556where
557 C: ConnectionTrait,
558{
559 let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
560
561 if loaded.is_empty() {
562 return Ok(RenderedGraph::empty());
563 }
564
565 render_actor_assignments(
566 actor_id_counter,
567 workers,
568 adaptive_parallelism_strategy,
569 &loaded,
570 )
571}
572
573pub async fn load_fragment_context_for_jobs<C>(
576 txn: &C,
577 job_ids: HashSet<JobId>,
578) -> MetaResult<LoadedFragmentContext>
579where
580 C: ConnectionTrait,
581{
582 if job_ids.is_empty() {
583 return Ok(LoadedFragmentContext::default());
584 }
585
586 let excluded_fragments_query = FragmentRelation::find()
587 .select_only()
588 .column(fragment_relation::Column::TargetFragmentId)
589 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
590 .into_query();
591
592 let condition = Condition::all()
593 .add(fragment::Column::JobId.is_in(job_ids.clone()))
594 .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
595
596 let fragments: Vec<FragmentId> = Fragment::find()
597 .select_only()
598 .column(fragment::Column::FragmentId)
599 .filter(condition)
600 .into_tuple()
601 .all(txn)
602 .await?;
603
604 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
605
606 let fragments = Fragment::find()
607 .filter(
608 fragment::Column::FragmentId.is_in(
609 ensembles
610 .iter()
611 .flat_map(|graph| graph.components.iter())
612 .cloned()
613 .collect_vec(),
614 ),
615 )
616 .all(txn)
617 .await?;
618
619 let fragment_map: HashMap<_, _> = fragments
620 .into_iter()
621 .map(|fragment| (fragment.fragment_id, fragment))
622 .collect();
623
624 let job_ids = fragment_map
625 .values()
626 .map(|fragment| fragment.job_id)
627 .collect::<BTreeSet<_>>()
628 .into_iter()
629 .collect_vec();
630
631 let jobs: HashMap<_, _> = StreamingJob::find()
632 .filter(streaming_job::Column::JobId.is_in(job_ids))
633 .all(txn)
634 .await?
635 .into_iter()
636 .map(|job| (job.job_id, job))
637 .collect();
638
639 build_loaded_context(txn, ensembles, fragment_map, jobs).await
640}
641
642pub(crate) fn render_actor_assignments(
645 actor_id_counter: &AtomicU32,
646 worker_map: &HashMap<WorkerId, WorkerNode>,
647 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
648 loaded: &LoadedFragmentContext,
649) -> MetaResult<RenderedGraph> {
650 if loaded.is_empty() {
651 return Ok(RenderedGraph::empty());
652 }
653
654 let backfill_jobs: HashSet<JobId> = loaded
655 .job_map
656 .iter()
657 .filter(|(_, job)| {
658 job.create_type == CreateType::Background && job.job_status == JobStatus::Creating
659 })
660 .map(|(id, _)| *id)
661 .collect();
662
663 let render_context = RenderActorsContext {
664 fragment_source_ids: &loaded.fragment_source_ids,
665 fragment_splits: &loaded.fragment_splits,
666 streaming_job_databases: &loaded.streaming_job_databases,
667 database_map: &loaded.database_map,
668 backfill_jobs: &backfill_jobs,
669 };
670
671 let fragments = render_actors(
672 actor_id_counter,
673 &loaded.ensembles,
674 &loaded.job_fragments,
675 &loaded.job_map,
676 worker_map,
677 adaptive_parallelism_strategy,
678 render_context,
679 )?;
680
681 Ok(RenderedGraph {
682 fragments,
683 ensembles: loaded.ensembles.clone(),
684 })
685}
686
687async fn build_loaded_context<C>(
688 txn: &C,
689 ensembles: Vec<NoShuffleEnsemble>,
690 fragment_models: HashMap<FragmentId, fragment::Model>,
691 job_map: HashMap<JobId, streaming_job::Model>,
692) -> MetaResult<LoadedFragmentContext>
693where
694 C: ConnectionTrait,
695{
696 if ensembles.is_empty() {
697 return Ok(LoadedFragmentContext::default());
698 }
699
700 let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
701 for (fragment_id, model) in fragment_models {
702 job_fragments
703 .entry(model.job_id)
704 .or_default()
705 .try_insert(fragment_id, LoadedFragment::from(model))
706 .expect("duplicate fragment id for job");
707 }
708
709 #[cfg(debug_assertions)]
710 {
711 debug_sanity_check(&ensembles, &job_fragments, &job_map);
712 }
713
714 let (fragment_source_ids, fragment_splits) =
715 resolve_source_fragments(txn, &job_fragments).await?;
716
717 let job_ids = job_map.keys().copied().collect_vec();
718
719 let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
720 .select_only()
721 .column(streaming_job::Column::JobId)
722 .column(object::Column::DatabaseId)
723 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
724 .filter(streaming_job::Column::JobId.is_in(job_ids))
725 .into_tuple()
726 .all(txn)
727 .await?
728 .into_iter()
729 .collect();
730
731 let database_map: HashMap<_, _> = Database::find()
732 .filter(
733 database::Column::DatabaseId
734 .is_in(streaming_job_databases.values().copied().collect_vec()),
735 )
736 .all(txn)
737 .await?
738 .into_iter()
739 .map(|db| (db.database_id, db))
740 .collect();
741
742 Ok(LoadedFragmentContext {
743 ensembles,
744 job_fragments,
745 job_map,
746 streaming_job_databases,
747 database_map,
748 fragment_source_ids,
749 fragment_splits,
750 })
751}
752
753struct RenderActorsContext<'a> {
756 fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
757 fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
758 streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
759 database_map: &'a HashMap<DatabaseId, database::Model>,
760 backfill_jobs: &'a HashSet<JobId>,
761}
762
763fn render_actors(
764 actor_id_counter: &AtomicU32,
765 ensembles: &[NoShuffleEnsemble],
766 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
767 job_map: &HashMap<JobId, streaming_job::Model>,
768 worker_map: &HashMap<WorkerId, WorkerNode>,
769 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
770 context: RenderActorsContext<'_>,
771) -> MetaResult<FragmentRenderMap> {
772 let RenderActorsContext {
773 fragment_source_ids,
774 fragment_splits: fragment_splits_map,
775 streaming_job_databases,
776 database_map,
777 backfill_jobs,
778 } = context;
779
780 let mut all_fragments: FragmentRenderMap = HashMap::new();
781 let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
782 .values()
783 .flat_map(|fragments| fragments.iter())
784 .map(|(fragment_id, fragment)| (*fragment_id, fragment))
785 .collect();
786
787 for NoShuffleEnsemble {
788 entries,
789 components,
790 } in ensembles
791 {
792 tracing::debug!("rendering ensemble entries {:?}", entries);
793
794 let entry_fragments = entries
795 .iter()
796 .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
797 .collect_vec();
798
799 let entry_fragment_parallelism = entry_fragments
800 .iter()
801 .map(|fragment| fragment.parallelism.clone())
802 .dedup()
803 .exactly_one()
804 .map_err(|_| {
805 anyhow!(
806 "entry fragments {:?} have inconsistent parallelism settings",
807 entries.iter().copied().collect_vec()
808 )
809 })?;
810
811 let (job_id, vnode_count) = entry_fragments
812 .iter()
813 .map(|f| (f.job_id, f.vnode_count))
814 .dedup()
815 .exactly_one()
816 .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
817
818 let job = job_map
819 .get(&job_id)
820 .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
821
822 let job_strategy = job
823 .stream_context()
824 .adaptive_parallelism_strategy
825 .unwrap_or(adaptive_parallelism_strategy);
826
827 let resource_group = match &job.specific_resource_group {
828 None => {
829 let database = streaming_job_databases
830 .get(&job_id)
831 .and_then(|database_id| database_map.get(database_id))
832 .unwrap();
833 database.resource_group.clone()
834 }
835 Some(resource_group) => resource_group.clone(),
836 };
837
838 let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
839 .iter()
840 .filter_map(|(worker_id, worker)| {
841 if worker
842 .resource_group()
843 .as_deref()
844 .unwrap_or(DEFAULT_RESOURCE_GROUP)
845 == resource_group.as_str()
846 {
847 Some((
848 *worker_id,
849 worker
850 .parallelism()
851 .expect("should have parallelism for compute node")
852 .try_into()
853 .expect("parallelism for compute node"),
854 ))
855 } else {
856 None
857 }
858 })
859 .collect();
860
861 let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
862
863 let effective_job_parallelism = if backfill_jobs.contains(&job_id) {
864 job.backfill_parallelism
865 .as_ref()
866 .unwrap_or(&job.parallelism)
867 } else {
868 &job.parallelism
869 };
870
871 let actual_parallelism = match entry_fragment_parallelism
872 .as_ref()
873 .unwrap_or(effective_job_parallelism)
874 {
875 StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
876 job_strategy.compute_target_parallelism(total_parallelism)
877 }
878 StreamingParallelism::Fixed(n) => *n,
879 }
880 .min(vnode_count)
881 .min(job.max_parallelism as usize);
882
883 tracing::debug!(
884 "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
885 job_id,
886 actual_parallelism,
887 job.parallelism,
888 total_parallelism,
889 job.max_parallelism,
890 vnode_count,
891 entry_fragment_parallelism
892 );
893
894 let assigner = AssignerBuilder::new(job_id).build();
895
896 let actors = (0..(actual_parallelism as u32))
897 .map_into::<ActorId>()
898 .collect_vec();
899 let vnodes = (0..vnode_count).collect_vec();
900
901 let assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
902
903 let source_entry_fragment = entry_fragments.iter().find(|f| {
904 let mask = f.fragment_type_mask;
905 if mask.contains(FragmentTypeFlag::Source) {
906 assert!(!mask.contains(FragmentTypeFlag::SourceScan))
907 }
908 mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
909 });
910
911 let (fragment_splits, shared_source_id) = match source_entry_fragment {
912 Some(entry_fragment) => {
913 let source_id = fragment_source_ids
914 .get(&entry_fragment.fragment_id)
915 .ok_or_else(|| {
916 anyhow!(
917 "missing source id in source fragment {}",
918 entry_fragment.fragment_id
919 )
920 })?;
921
922 let entry_fragment_id = entry_fragment.fragment_id;
923
924 let empty_actor_splits: HashMap<_, _> =
925 actors.iter().map(|actor_id| (*actor_id, vec![])).collect();
926
927 let splits = fragment_splits_map
928 .get(&entry_fragment_id)
929 .cloned()
930 .unwrap_or_default();
931
932 let splits: BTreeMap<_, _> = splits.into_iter().map(|s| (s.id(), s)).collect();
933
934 let fragment_splits = crate::stream::source_manager::reassign_splits(
935 entry_fragment_id,
936 empty_actor_splits,
937 &splits,
938 SplitDiffOptions::default(),
939 )
940 .unwrap_or_default();
941 (fragment_splits, Some(*source_id))
942 }
943 None => (HashMap::new(), None),
944 };
945
946 for component_fragment_id in components {
947 let fragment = fragment_lookup.get(component_fragment_id).unwrap();
948 let fragment_id = fragment.fragment_id;
949 let job_id = fragment.job_id;
950 let fragment_type_mask = fragment.fragment_type_mask;
951 let distribution_type = fragment.distribution_type;
952 let stream_node = &fragment.nodes;
953 let state_table_ids = &fragment.state_table_ids;
954 let vnode_count = fragment.vnode_count;
955
956 let actor_count =
957 u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
958 let actor_id_base = actor_id_counter.fetch_add(actor_count, Ordering::Relaxed);
959
960 let actors: HashMap<ActorId, InflightActorInfo> = assignment
961 .iter()
962 .flat_map(|(worker_id, actors)| {
963 actors
964 .iter()
965 .map(move |(actor_id, vnodes)| (worker_id, actor_id, vnodes))
966 })
967 .map(|(&worker_id, &actor_idx, vnodes)| {
968 let vnode_bitmap = match distribution_type {
969 DistributionType::Single => None,
970 DistributionType::Hash => Some(Bitmap::from_indices(vnode_count, vnodes)),
971 };
972
973 let actor_id = actor_idx + actor_id_base;
974
975 let splits = if let Some(source_id) = fragment_source_ids.get(&fragment_id) {
976 assert_eq!(shared_source_id, Some(*source_id));
977
978 fragment_splits
979 .get(&(actor_idx))
980 .cloned()
981 .unwrap_or_default()
982 } else {
983 vec![]
984 };
985
986 (
987 actor_id,
988 InflightActorInfo {
989 worker_id,
990 vnode_bitmap,
991 splits,
992 },
993 )
994 })
995 .collect();
996
997 let fragment = InflightFragmentInfo {
998 fragment_id,
999 distribution_type,
1000 fragment_type_mask,
1001 vnode_count,
1002 nodes: stream_node.clone(),
1003 actors,
1004 state_table_ids: state_table_ids.clone(),
1005 };
1006
1007 let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
1008 anyhow!("streaming job {job_id} not found in streaming_job_databases")
1009 })?;
1010
1011 all_fragments
1012 .entry(database_id)
1013 .or_default()
1014 .entry(job_id)
1015 .or_default()
1016 .insert(fragment_id, fragment);
1017 }
1018 }
1019
1020 Ok(all_fragments)
1021}
1022
1023#[cfg(debug_assertions)]
1024fn debug_sanity_check(
1025 ensembles: &[NoShuffleEnsemble],
1026 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1027 jobs: &HashMap<JobId, streaming_job::Model>,
1028) {
1029 let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
1030 .iter()
1031 .flat_map(|(job_id, fragments)| {
1032 fragments
1033 .iter()
1034 .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
1035 })
1036 .collect();
1037
1038 debug_assert!(
1040 ensembles
1041 .iter()
1042 .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
1043 "entries must be subset of components"
1044 );
1045
1046 let mut missing_fragments = BTreeSet::new();
1047 let mut missing_jobs = BTreeSet::new();
1048
1049 for fragment_id in ensembles
1050 .iter()
1051 .flat_map(|ensemble| ensemble.components.iter())
1052 {
1053 match fragment_lookup.get(fragment_id) {
1054 Some((fragment, job_id)) => {
1055 if !jobs.contains_key(&fragment.job_id) {
1056 missing_jobs.insert(*job_id);
1057 }
1058 }
1059 None => {
1060 missing_fragments.insert(*fragment_id);
1061 }
1062 }
1063 }
1064
1065 debug_assert!(
1066 missing_fragments.is_empty(),
1067 "missing fragments in fragment_map: {:?}",
1068 missing_fragments
1069 );
1070
1071 debug_assert!(
1072 missing_jobs.is_empty(),
1073 "missing jobs for fragments' job_id: {:?}",
1074 missing_jobs
1075 );
1076
1077 for ensemble in ensembles {
1078 let unique_vnode_counts: Vec<_> = ensemble
1079 .components
1080 .iter()
1081 .flat_map(|fragment_id| {
1082 fragment_lookup
1083 .get(fragment_id)
1084 .map(|(fragment, _)| fragment.vnode_count)
1085 })
1086 .unique()
1087 .collect();
1088
1089 debug_assert!(
1090 unique_vnode_counts.len() <= 1,
1091 "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
1092 ensemble.components,
1093 unique_vnode_counts
1094 );
1095 }
1096}
1097
1098async fn resolve_source_fragments<C>(
1099 txn: &C,
1100 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1101) -> MetaResult<(
1102 HashMap<FragmentId, SourceId>,
1103 HashMap<FragmentId, Vec<SplitImpl>>,
1104)>
1105where
1106 C: ConnectionTrait,
1107{
1108 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1109 for (fragment_id, fragment) in job_fragments.values().flatten() {
1110 let mask = fragment.fragment_type_mask;
1111 if mask.contains(FragmentTypeFlag::Source)
1112 && let Some(source_id) = fragment.nodes.find_stream_source()
1113 {
1114 source_fragment_ids
1115 .entry(source_id)
1116 .or_default()
1117 .insert(*fragment_id);
1118 }
1119
1120 if mask.contains(FragmentTypeFlag::SourceScan)
1121 && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1122 {
1123 source_fragment_ids
1124 .entry(source_id)
1125 .or_default()
1126 .insert(*fragment_id);
1127 }
1128 }
1129
1130 let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1131 .iter()
1132 .flat_map(|(source_id, fragment_ids)| {
1133 fragment_ids
1134 .iter()
1135 .map(|fragment_id| (*fragment_id, *source_id))
1136 })
1137 .collect();
1138
1139 let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1140
1141 let fragment_splits: Vec<_> = FragmentSplits::find()
1142 .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1143 .all(txn)
1144 .await?;
1145
1146 let fragment_splits: HashMap<_, _> = fragment_splits
1147 .into_iter()
1148 .flat_map(|model| {
1149 model.splits.map(|splits| {
1150 (
1151 model.fragment_id,
1152 splits
1153 .to_protobuf()
1154 .splits
1155 .iter()
1156 .flat_map(SplitImpl::try_from)
1157 .collect_vec(),
1158 )
1159 })
1160 })
1161 .collect();
1162
1163 Ok((fragment_source_ids, fragment_splits))
1164}
1165
1166#[derive(Debug)]
1168pub struct ActorGraph<'a> {
1169 pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1170 pub locations: &'a HashMap<ActorId, WorkerId>,
1171}
1172
1173#[derive(Debug, Clone)]
1174pub struct NoShuffleEnsemble {
1175 entries: HashSet<FragmentId>,
1176 components: HashSet<FragmentId>,
1177}
1178
1179impl NoShuffleEnsemble {
1180 #[cfg(test)]
1181 pub fn for_test(
1182 entries: impl IntoIterator<Item = FragmentId>,
1183 components: impl IntoIterator<Item = FragmentId>,
1184 ) -> Self {
1185 let entries = entries.into_iter().collect();
1186 let components = components.into_iter().collect();
1187 Self {
1188 entries,
1189 components,
1190 }
1191 }
1192
1193 pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1194 self.components.iter().cloned()
1195 }
1196
1197 pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1198 self.entries.iter().copied()
1199 }
1200
1201 pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1202 self.components.iter().copied()
1203 }
1204
1205 pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1206 self.entries.contains(fragment_id)
1207 }
1208}
1209
1210pub async fn find_fragment_no_shuffle_dags_detailed(
1211 db: &impl ConnectionTrait,
1212 initial_fragment_ids: &[FragmentId],
1213) -> MetaResult<Vec<NoShuffleEnsemble>> {
1214 let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1215 .columns([
1216 fragment_relation::Column::SourceFragmentId,
1217 fragment_relation::Column::TargetFragmentId,
1218 ])
1219 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1220 .into_tuple()
1221 .all(db)
1222 .await?;
1223
1224 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1225 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1226
1227 for (src, dst) in all_no_shuffle_relations {
1228 forward_edges.entry(src).or_default().push(dst);
1229 backward_edges.entry(dst).or_default().push(src);
1230 }
1231
1232 find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1233}
1234
1235fn find_no_shuffle_graphs(
1236 initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1237 forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1238 backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1239) -> MetaResult<Vec<NoShuffleEnsemble>> {
1240 let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1241 let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1242
1243 for &init_id in initial_fragment_ids {
1244 let init_id = init_id.into();
1245 if globally_visited.contains(&init_id) {
1246 continue;
1247 }
1248
1249 let mut components = HashSet::new();
1251 let mut queue: VecDeque<FragmentId> = VecDeque::new();
1252
1253 queue.push_back(init_id);
1254 globally_visited.insert(init_id);
1255
1256 while let Some(current_id) = queue.pop_front() {
1257 components.insert(current_id);
1258 let neighbors = forward_edges
1259 .get(¤t_id)
1260 .into_iter()
1261 .flatten()
1262 .chain(backward_edges.get(¤t_id).into_iter().flatten());
1263
1264 for &neighbor_id in neighbors {
1265 if globally_visited.insert(neighbor_id) {
1266 queue.push_back(neighbor_id);
1267 }
1268 }
1269 }
1270
1271 let mut entries = HashSet::new();
1273 for &node_id in &components {
1274 let is_root = match backward_edges.get(&node_id) {
1275 Some(parents) => parents.iter().all(|p| !components.contains(p)),
1276 None => true,
1277 };
1278 if is_root {
1279 entries.insert(node_id);
1280 }
1281 }
1282
1283 if !entries.is_empty() {
1285 graphs.push(NoShuffleEnsemble {
1286 entries,
1287 components,
1288 });
1289 }
1290 }
1291
1292 Ok(graphs)
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297 use std::collections::{BTreeSet, HashMap, HashSet};
1298 use std::sync::Arc;
1299
1300 use risingwave_connector::source::SplitImpl;
1301 use risingwave_connector::source::test_source::TestSourceSplit;
1302 use risingwave_meta_model::{CreateType, JobStatus};
1303 use risingwave_pb::common::WorkerType;
1304 use risingwave_pb::common::worker_node::Property as WorkerProperty;
1305 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1306
1307 use super::*;
1308
1309 type Edges = (
1312 HashMap<FragmentId, Vec<FragmentId>>,
1313 HashMap<FragmentId, Vec<FragmentId>>,
1314 );
1315
1316 fn build_edges(relations: &[(u32, u32)]) -> Edges {
1319 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1320 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1321 for &(src, dst) in relations {
1322 forward_edges
1323 .entry(src.into())
1324 .or_default()
1325 .push(dst.into());
1326 backward_edges
1327 .entry(dst.into())
1328 .or_default()
1329 .push(src.into());
1330 }
1331 (forward_edges, backward_edges)
1332 }
1333
1334 fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1336 ids.iter().map(|id| (*id).into()).collect()
1337 }
1338
1339 fn build_fragment(
1340 fragment_id: FragmentId,
1341 job_id: JobId,
1342 fragment_type_mask: i32,
1343 distribution_type: DistributionType,
1344 vnode_count: i32,
1345 parallelism: StreamingParallelism,
1346 ) -> LoadedFragment {
1347 LoadedFragment {
1348 fragment_id,
1349 job_id,
1350 fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1351 distribution_type,
1352 vnode_count: vnode_count as usize,
1353 nodes: PbStreamNode::default(),
1354 state_table_ids: HashSet::new(),
1355 parallelism: Some(parallelism),
1356 }
1357 }
1358
1359 type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1360
1361 fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1362 let base = fragment.actors.keys().copied().min().unwrap_or_default();
1363
1364 let mut entries: Vec<_> = fragment
1365 .actors
1366 .iter()
1367 .map(|(&actor_id, info)| {
1368 let idx = actor_id.as_raw_id() - base.as_raw_id();
1369 let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1370 bitmap
1371 .iter()
1372 .enumerate()
1373 .filter_map(|(pos, is_set)| is_set.then_some(pos))
1374 .collect::<Vec<_>>()
1375 });
1376 let splits = info
1377 .splits
1378 .iter()
1379 .map(|split| split.id().to_string())
1380 .collect::<Vec<_>>();
1381 (idx.into(), info.worker_id, vnode_indices, splits)
1382 })
1383 .collect();
1384
1385 entries.sort_by_key(|(idx, _, _, _)| *idx);
1386 entries
1387 }
1388
1389 fn build_worker_node(
1390 id: impl Into<WorkerId>,
1391 parallelism: usize,
1392 resource_group: &str,
1393 ) -> WorkerNode {
1394 WorkerNode {
1395 id: id.into(),
1396 r#type: WorkerType::ComputeNode as i32,
1397 property: Some(WorkerProperty {
1398 is_streaming: true,
1399 parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1400 resource_group: Some(resource_group.to_owned()),
1401 ..Default::default()
1402 }),
1403 ..Default::default()
1404 }
1405 }
1406
1407 #[test]
1408 fn test_single_linear_chain() {
1409 let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1412 let initial_ids = &[2];
1413
1414 let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1416
1417 assert!(result.is_ok());
1419 let graphs = result.unwrap();
1420
1421 assert_eq!(graphs.len(), 1);
1422 let graph = &graphs[0];
1423 assert_eq!(graph.entries, to_hashset(&[1]));
1424 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1425 }
1426
1427 #[test]
1428 fn test_two_disconnected_graphs() {
1429 let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1432 let initial_ids = &[2, 10];
1433
1434 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1436
1437 assert_eq!(graphs.len(), 2);
1439
1440 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1442
1443 assert_eq!(graphs[0].entries, to_hashset(&[1]));
1445 assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1446
1447 assert_eq!(graphs[1].entries, to_hashset(&[10]));
1449 assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1450 }
1451
1452 #[test]
1453 fn test_multiple_entries_in_one_graph() {
1454 let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1456 let initial_ids = &[3];
1457
1458 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1460
1461 assert_eq!(graphs.len(), 1);
1463 let graph = &graphs[0];
1464 assert_eq!(graph.entries, to_hashset(&[1, 2]));
1465 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1466 }
1467
1468 #[test]
1469 fn test_diamond_shape_graph() {
1470 let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1472 let initial_ids = &[4];
1473
1474 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1476
1477 assert_eq!(graphs.len(), 1);
1479 let graph = &graphs[0];
1480 assert_eq!(graph.entries, to_hashset(&[1]));
1481 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1482 }
1483
1484 #[test]
1485 fn test_starting_with_multiple_nodes_in_same_graph() {
1486 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1489 let initial_ids = &[2, 4];
1490
1491 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1493
1494 assert_eq!(graphs.len(), 1);
1496 let graph = &graphs[0];
1497 assert_eq!(graph.entries, to_hashset(&[1]));
1498 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1499 }
1500
1501 #[test]
1502 fn test_empty_initial_ids() {
1503 let (forward, backward) = build_edges(&[(1, 2)]);
1505 let initial_ids: &[u32] = &[];
1506
1507 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1509
1510 assert!(graphs.is_empty());
1512 }
1513
1514 #[test]
1515 fn test_isolated_node_as_input() {
1516 let (forward, backward) = build_edges(&[(1, 2)]);
1518 let initial_ids = &[100];
1519
1520 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1522
1523 assert_eq!(graphs.len(), 1);
1525 let graph = &graphs[0];
1526 assert_eq!(graph.entries, to_hashset(&[100]));
1527 assert_eq!(graph.components, to_hashset(&[100]));
1528 }
1529
1530 #[test]
1531 fn test_graph_with_a_cycle() {
1532 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1537 let initial_ids = &[2];
1538
1539 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1541
1542 assert!(
1544 graphs.is_empty(),
1545 "A graph with no entries should not be returned"
1546 );
1547 }
1548 #[test]
1549 fn test_custom_complex() {
1550 let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1551 let initial_ids = &[1, 2, 4, 6];
1552
1553 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1555
1556 assert_eq!(graphs.len(), 2);
1558 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1560
1561 assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1563 assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1564
1565 assert_eq!(graphs[1].entries, to_hashset(&[6]));
1567 assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1568 }
1569
1570 #[test]
1571 fn render_actors_increments_actor_counter() {
1572 let actor_id_counter = AtomicU32::new(100);
1573 let fragment_id: FragmentId = 1.into();
1574 let job_id: JobId = 10.into();
1575 let database_id: DatabaseId = DatabaseId::new(3);
1576
1577 let fragment_model = build_fragment(
1578 fragment_id,
1579 job_id,
1580 0,
1581 DistributionType::Single,
1582 1,
1583 StreamingParallelism::Fixed(1),
1584 );
1585
1586 let job_model = streaming_job::Model {
1587 job_id,
1588 job_status: JobStatus::Created,
1589 create_type: CreateType::Foreground,
1590 timezone: None,
1591 config_override: None,
1592 adaptive_parallelism_strategy: None,
1593 parallelism: StreamingParallelism::Fixed(1),
1594 backfill_parallelism: None,
1595 backfill_orders: None,
1596 max_parallelism: 1,
1597 specific_resource_group: None,
1598 is_serverless_backfill: false,
1599 };
1600
1601 let database_model = database::Model {
1602 database_id,
1603 name: "test_db".into(),
1604 resource_group: "rg-a".into(),
1605 barrier_interval_ms: None,
1606 checkpoint_frequency: None,
1607 };
1608
1609 let ensembles = vec![NoShuffleEnsemble {
1610 entries: HashSet::from([fragment_id]),
1611 components: HashSet::from([fragment_id]),
1612 }];
1613
1614 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1615 let job_fragments = HashMap::from([(job_id, fragment_map)]);
1616 let job_map = HashMap::from([(job_id, job_model)]);
1617
1618 let worker_map: HashMap<WorkerId, WorkerNode> =
1619 HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
1620
1621 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1622 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1623 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1624 let database_map = HashMap::from([(database_id, database_model)]);
1625 let backfill_jobs = HashSet::new();
1626
1627 let context = RenderActorsContext {
1628 fragment_source_ids: &fragment_source_ids,
1629 fragment_splits: &fragment_splits,
1630 streaming_job_databases: &streaming_job_databases,
1631 database_map: &database_map,
1632 backfill_jobs: &backfill_jobs,
1633 };
1634
1635 let result = render_actors(
1636 &actor_id_counter,
1637 &ensembles,
1638 &job_fragments,
1639 &job_map,
1640 &worker_map,
1641 AdaptiveParallelismStrategy::Auto,
1642 context,
1643 )
1644 .expect("actor rendering succeeds");
1645
1646 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1647 assert_eq!(state.len(), 1);
1648 assert!(
1649 state[0].2.is_none(),
1650 "single distribution should not assign vnode bitmaps"
1651 );
1652 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
1653 }
1654
1655 #[test]
1656 fn render_actors_aligns_hash_vnode_bitmaps() {
1657 let actor_id_counter = AtomicU32::new(0);
1658 let entry_fragment_id: FragmentId = 1.into();
1659 let downstream_fragment_id: FragmentId = 2.into();
1660 let job_id: JobId = 20.into();
1661 let database_id: DatabaseId = DatabaseId::new(5);
1662
1663 let entry_fragment = build_fragment(
1664 entry_fragment_id,
1665 job_id,
1666 0,
1667 DistributionType::Hash,
1668 4,
1669 StreamingParallelism::Fixed(2),
1670 );
1671
1672 let downstream_fragment = build_fragment(
1673 downstream_fragment_id,
1674 job_id,
1675 0,
1676 DistributionType::Hash,
1677 4,
1678 StreamingParallelism::Fixed(2),
1679 );
1680
1681 let job_model = streaming_job::Model {
1682 job_id,
1683 job_status: JobStatus::Created,
1684 create_type: CreateType::Background,
1685 timezone: None,
1686 config_override: None,
1687 adaptive_parallelism_strategy: None,
1688 parallelism: StreamingParallelism::Fixed(2),
1689 backfill_parallelism: None,
1690 backfill_orders: None,
1691 max_parallelism: 2,
1692 specific_resource_group: None,
1693 is_serverless_backfill: false,
1694 };
1695
1696 let database_model = database::Model {
1697 database_id,
1698 name: "test_db_hash".into(),
1699 resource_group: "rg-hash".into(),
1700 barrier_interval_ms: None,
1701 checkpoint_frequency: None,
1702 };
1703
1704 let ensembles = vec![NoShuffleEnsemble {
1705 entries: HashSet::from([entry_fragment_id]),
1706 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1707 }];
1708
1709 let fragment_map = HashMap::from([
1710 (entry_fragment_id, entry_fragment),
1711 (downstream_fragment_id, downstream_fragment),
1712 ]);
1713 let job_fragments = HashMap::from([(job_id, fragment_map)]);
1714 let job_map = HashMap::from([(job_id, job_model)]);
1715
1716 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
1717 (1.into(), build_worker_node(1, 1, "rg-hash")),
1718 (2.into(), build_worker_node(2, 1, "rg-hash")),
1719 ]);
1720
1721 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1722 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1723 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1724 let database_map = HashMap::from([(database_id, database_model)]);
1725 let backfill_jobs = HashSet::new();
1726
1727 let context = RenderActorsContext {
1728 fragment_source_ids: &fragment_source_ids,
1729 fragment_splits: &fragment_splits,
1730 streaming_job_databases: &streaming_job_databases,
1731 database_map: &database_map,
1732 backfill_jobs: &backfill_jobs,
1733 };
1734
1735 let result = render_actors(
1736 &actor_id_counter,
1737 &ensembles,
1738 &job_fragments,
1739 &job_map,
1740 &worker_map,
1741 AdaptiveParallelismStrategy::Auto,
1742 context,
1743 )
1744 .expect("actor rendering succeeds");
1745
1746 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1747 let downstream_state =
1748 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1749
1750 assert_eq!(entry_state.len(), 2);
1751 assert_eq!(entry_state, downstream_state);
1752
1753 let assigned_vnodes: BTreeSet<_> = entry_state
1754 .iter()
1755 .flat_map(|(_, _, vnodes, _)| {
1756 vnodes
1757 .as_ref()
1758 .expect("hash distribution should populate vnode bitmap")
1759 .iter()
1760 .copied()
1761 })
1762 .collect();
1763 assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
1764 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1765 }
1766
1767 #[test]
1768 fn render_actors_propagates_source_splits() {
1769 let actor_id_counter = AtomicU32::new(0);
1770 let entry_fragment_id: FragmentId = 11.into();
1771 let downstream_fragment_id: FragmentId = 12.into();
1772 let job_id: JobId = 30.into();
1773 let database_id: DatabaseId = DatabaseId::new(7);
1774 let source_id: SourceId = 99.into();
1775
1776 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
1777 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
1778
1779 let entry_fragment = build_fragment(
1780 entry_fragment_id,
1781 job_id,
1782 source_mask,
1783 DistributionType::Hash,
1784 4,
1785 StreamingParallelism::Fixed(2),
1786 );
1787
1788 let downstream_fragment = build_fragment(
1789 downstream_fragment_id,
1790 job_id,
1791 source_scan_mask,
1792 DistributionType::Hash,
1793 4,
1794 StreamingParallelism::Fixed(2),
1795 );
1796
1797 let job_model = streaming_job::Model {
1798 job_id,
1799 job_status: JobStatus::Created,
1800 create_type: CreateType::Background,
1801 timezone: None,
1802 config_override: None,
1803 adaptive_parallelism_strategy: None,
1804 parallelism: StreamingParallelism::Fixed(2),
1805 backfill_parallelism: None,
1806 backfill_orders: None,
1807 max_parallelism: 2,
1808 specific_resource_group: None,
1809 is_serverless_backfill: false,
1810 };
1811
1812 let database_model = database::Model {
1813 database_id,
1814 name: "split_db".into(),
1815 resource_group: "rg-source".into(),
1816 barrier_interval_ms: None,
1817 checkpoint_frequency: None,
1818 };
1819
1820 let ensembles = vec![NoShuffleEnsemble {
1821 entries: HashSet::from([entry_fragment_id]),
1822 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1823 }];
1824
1825 let fragment_map = HashMap::from([
1826 (entry_fragment_id, entry_fragment),
1827 (downstream_fragment_id, downstream_fragment),
1828 ]);
1829 let job_fragments = HashMap::from([(job_id, fragment_map)]);
1830 let job_map = HashMap::from([(job_id, job_model)]);
1831
1832 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
1833 (1.into(), build_worker_node(1, 1, "rg-source")),
1834 (2.into(), build_worker_node(2, 1, "rg-source")),
1835 ]);
1836
1837 let split_a = SplitImpl::Test(TestSourceSplit {
1838 id: Arc::<str>::from("split-a"),
1839 properties: HashMap::new(),
1840 offset: "0".into(),
1841 });
1842 let split_b = SplitImpl::Test(TestSourceSplit {
1843 id: Arc::<str>::from("split-b"),
1844 properties: HashMap::new(),
1845 offset: "0".into(),
1846 });
1847
1848 let fragment_source_ids = HashMap::from([
1849 (entry_fragment_id, source_id),
1850 (downstream_fragment_id, source_id),
1851 ]);
1852 let fragment_splits =
1853 HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
1854 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1855 let database_map = HashMap::from([(database_id, database_model)]);
1856 let backfill_jobs = HashSet::new();
1857
1858 let context = RenderActorsContext {
1859 fragment_source_ids: &fragment_source_ids,
1860 fragment_splits: &fragment_splits,
1861 streaming_job_databases: &streaming_job_databases,
1862 database_map: &database_map,
1863 backfill_jobs: &backfill_jobs,
1864 };
1865
1866 let result = render_actors(
1867 &actor_id_counter,
1868 &ensembles,
1869 &job_fragments,
1870 &job_map,
1871 &worker_map,
1872 AdaptiveParallelismStrategy::Auto,
1873 context,
1874 )
1875 .expect("actor rendering succeeds");
1876
1877 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1878 let downstream_state =
1879 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1880
1881 assert_eq!(entry_state, downstream_state);
1882
1883 let split_ids: BTreeSet<_> = entry_state
1884 .iter()
1885 .flat_map(|(_, _, _, splits)| splits.iter().cloned())
1886 .collect();
1887 assert_eq!(
1888 split_ids,
1889 BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
1890 );
1891 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1892 }
1893
1894 #[test]
1896 fn render_actors_job_strategy_overrides_global() {
1897 let actor_id_counter = AtomicU32::new(0);
1898 let fragment_id: FragmentId = 1.into();
1899 let job_id: JobId = 100.into();
1900 let database_id: DatabaseId = DatabaseId::new(10);
1901
1902 let fragment_model = build_fragment(
1904 fragment_id,
1905 job_id,
1906 0,
1907 DistributionType::Hash,
1908 8,
1909 StreamingParallelism::Adaptive,
1910 );
1911
1912 let job_model = streaming_job::Model {
1914 job_id,
1915 job_status: JobStatus::Created,
1916 create_type: CreateType::Foreground,
1917 timezone: None,
1918 config_override: None,
1919 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1920 parallelism: StreamingParallelism::Adaptive,
1921 backfill_parallelism: None,
1922 backfill_orders: None,
1923 max_parallelism: 8,
1924 specific_resource_group: None,
1925 is_serverless_backfill: false,
1926 };
1927
1928 let database_model = database::Model {
1929 database_id,
1930 name: "test_db".into(),
1931 resource_group: "default".into(),
1932 barrier_interval_ms: None,
1933 checkpoint_frequency: None,
1934 };
1935
1936 let ensembles = vec![NoShuffleEnsemble {
1937 entries: HashSet::from([fragment_id]),
1938 components: HashSet::from([fragment_id]),
1939 }];
1940
1941 let fragment_map =
1942 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
1943 let job_map = HashMap::from([(job_id, job_model)]);
1944
1945 let worker_map = HashMap::from([
1947 (1.into(), build_worker_node(1, 1, "default")),
1948 (2.into(), build_worker_node(2, 1, "default")),
1949 (3.into(), build_worker_node(3, 1, "default")),
1950 (4.into(), build_worker_node(4, 1, "default")),
1951 ]);
1952
1953 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1954 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1955 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1956 let database_map = HashMap::from([(database_id, database_model)]);
1957 let backfill_jobs = HashSet::new();
1958
1959 let context = RenderActorsContext {
1960 fragment_source_ids: &fragment_source_ids,
1961 fragment_splits: &fragment_splits,
1962 streaming_job_databases: &streaming_job_databases,
1963 database_map: &database_map,
1964 backfill_jobs: &backfill_jobs,
1965 };
1966
1967 let result = render_actors(
1969 &actor_id_counter,
1970 &ensembles,
1971 &fragment_map,
1972 &job_map,
1973 &worker_map,
1974 AdaptiveParallelismStrategy::Full,
1975 context,
1976 )
1977 .expect("actor rendering succeeds");
1978
1979 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1980 assert_eq!(
1982 state.len(),
1983 2,
1984 "Job strategy BOUNDED(2) should override global FULL"
1985 );
1986 }
1987
1988 #[test]
1990 fn render_actors_uses_global_strategy_when_job_has_none() {
1991 let actor_id_counter = AtomicU32::new(0);
1992 let fragment_id: FragmentId = 1.into();
1993 let job_id: JobId = 101.into();
1994 let database_id: DatabaseId = DatabaseId::new(11);
1995
1996 let fragment_model = build_fragment(
1997 fragment_id,
1998 job_id,
1999 0,
2000 DistributionType::Hash,
2001 8,
2002 StreamingParallelism::Adaptive,
2003 );
2004
2005 let job_model = streaming_job::Model {
2007 job_id,
2008 job_status: JobStatus::Created,
2009 create_type: CreateType::Foreground,
2010 timezone: None,
2011 config_override: None,
2012 adaptive_parallelism_strategy: None, parallelism: StreamingParallelism::Adaptive,
2014 backfill_parallelism: None,
2015 backfill_orders: None,
2016 max_parallelism: 8,
2017 specific_resource_group: None,
2018 is_serverless_backfill: false,
2019 };
2020
2021 let database_model = database::Model {
2022 database_id,
2023 name: "test_db".into(),
2024 resource_group: "default".into(),
2025 barrier_interval_ms: None,
2026 checkpoint_frequency: None,
2027 };
2028
2029 let ensembles = vec![NoShuffleEnsemble {
2030 entries: HashSet::from([fragment_id]),
2031 components: HashSet::from([fragment_id]),
2032 }];
2033
2034 let fragment_map =
2035 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2036 let job_map = HashMap::from([(job_id, job_model)]);
2037
2038 let worker_map = HashMap::from([
2040 (1.into(), build_worker_node(1, 1, "default")),
2041 (2.into(), build_worker_node(2, 1, "default")),
2042 (3.into(), build_worker_node(3, 1, "default")),
2043 (4.into(), build_worker_node(4, 1, "default")),
2044 ]);
2045
2046 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2047 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2048 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2049 let database_map = HashMap::from([(database_id, database_model)]);
2050 let backfill_jobs = HashSet::new();
2051
2052 let context = RenderActorsContext {
2053 fragment_source_ids: &fragment_source_ids,
2054 fragment_splits: &fragment_splits,
2055 streaming_job_databases: &streaming_job_databases,
2056 database_map: &database_map,
2057 backfill_jobs: &backfill_jobs,
2058 };
2059
2060 let result = render_actors(
2062 &actor_id_counter,
2063 &ensembles,
2064 &fragment_map,
2065 &job_map,
2066 &worker_map,
2067 AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(3).unwrap()),
2068 context,
2069 )
2070 .expect("actor rendering succeeds");
2071
2072 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2073 assert_eq!(
2075 state.len(),
2076 3,
2077 "Should use global strategy BOUNDED(3) when job has no custom strategy"
2078 );
2079 }
2080
2081 #[test]
2083 fn render_actors_fixed_parallelism_ignores_strategy() {
2084 let actor_id_counter = AtomicU32::new(0);
2085 let fragment_id: FragmentId = 1.into();
2086 let job_id: JobId = 102.into();
2087 let database_id: DatabaseId = DatabaseId::new(12);
2088
2089 let fragment_model = build_fragment(
2091 fragment_id,
2092 job_id,
2093 0,
2094 DistributionType::Hash,
2095 8,
2096 StreamingParallelism::Fixed(5),
2097 );
2098
2099 let job_model = streaming_job::Model {
2101 job_id,
2102 job_status: JobStatus::Created,
2103 create_type: CreateType::Foreground,
2104 timezone: None,
2105 config_override: None,
2106 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2107 parallelism: StreamingParallelism::Fixed(5),
2108 backfill_parallelism: None,
2109 backfill_orders: None,
2110 max_parallelism: 8,
2111 specific_resource_group: None,
2112 is_serverless_backfill: false,
2113 };
2114
2115 let database_model = database::Model {
2116 database_id,
2117 name: "test_db".into(),
2118 resource_group: "default".into(),
2119 barrier_interval_ms: None,
2120 checkpoint_frequency: None,
2121 };
2122
2123 let ensembles = vec![NoShuffleEnsemble {
2124 entries: HashSet::from([fragment_id]),
2125 components: HashSet::from([fragment_id]),
2126 }];
2127
2128 let fragment_map =
2129 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2130 let job_map = HashMap::from([(job_id, job_model)]);
2131
2132 let worker_map = HashMap::from([
2134 (1.into(), build_worker_node(1, 1, "default")),
2135 (2.into(), build_worker_node(2, 1, "default")),
2136 (3.into(), build_worker_node(3, 1, "default")),
2137 (4.into(), build_worker_node(4, 1, "default")),
2138 (5.into(), build_worker_node(5, 1, "default")),
2139 (6.into(), build_worker_node(6, 1, "default")),
2140 ]);
2141
2142 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2143 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2144 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2145 let database_map = HashMap::from([(database_id, database_model)]);
2146 let backfill_jobs = HashSet::new();
2147
2148 let context = RenderActorsContext {
2149 fragment_source_ids: &fragment_source_ids,
2150 fragment_splits: &fragment_splits,
2151 streaming_job_databases: &streaming_job_databases,
2152 database_map: &database_map,
2153 backfill_jobs: &backfill_jobs,
2154 };
2155
2156 let result = render_actors(
2157 &actor_id_counter,
2158 &ensembles,
2159 &fragment_map,
2160 &job_map,
2161 &worker_map,
2162 AdaptiveParallelismStrategy::Full,
2163 context,
2164 )
2165 .expect("actor rendering succeeds");
2166
2167 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2168 assert_eq!(
2170 state.len(),
2171 5,
2172 "Fixed parallelism should ignore all strategies"
2173 );
2174 }
2175
2176 #[test]
2178 fn render_actors_ratio_strategy() {
2179 let actor_id_counter = AtomicU32::new(0);
2180 let fragment_id: FragmentId = 1.into();
2181 let job_id: JobId = 103.into();
2182 let database_id: DatabaseId = DatabaseId::new(13);
2183
2184 let fragment_model = build_fragment(
2185 fragment_id,
2186 job_id,
2187 0,
2188 DistributionType::Hash,
2189 16,
2190 StreamingParallelism::Adaptive,
2191 );
2192
2193 let job_model = streaming_job::Model {
2195 job_id,
2196 job_status: JobStatus::Created,
2197 create_type: CreateType::Foreground,
2198 timezone: None,
2199 config_override: None,
2200 adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2201 parallelism: StreamingParallelism::Adaptive,
2202 backfill_parallelism: None,
2203 backfill_orders: None,
2204 max_parallelism: 16,
2205 specific_resource_group: None,
2206 is_serverless_backfill: false,
2207 };
2208
2209 let database_model = database::Model {
2210 database_id,
2211 name: "test_db".into(),
2212 resource_group: "default".into(),
2213 barrier_interval_ms: None,
2214 checkpoint_frequency: None,
2215 };
2216
2217 let ensembles = vec![NoShuffleEnsemble {
2218 entries: HashSet::from([fragment_id]),
2219 components: HashSet::from([fragment_id]),
2220 }];
2221
2222 let fragment_map =
2223 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2224 let job_map = HashMap::from([(job_id, job_model)]);
2225
2226 let worker_map = HashMap::from([
2228 (1.into(), build_worker_node(1, 1, "default")),
2229 (2.into(), build_worker_node(2, 1, "default")),
2230 (3.into(), build_worker_node(3, 1, "default")),
2231 (4.into(), build_worker_node(4, 1, "default")),
2232 (5.into(), build_worker_node(5, 1, "default")),
2233 (6.into(), build_worker_node(6, 1, "default")),
2234 (7.into(), build_worker_node(7, 1, "default")),
2235 (8.into(), build_worker_node(8, 1, "default")),
2236 ]);
2237
2238 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2239 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2240 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2241 let database_map = HashMap::from([(database_id, database_model)]);
2242 let backfill_jobs = HashSet::new();
2243
2244 let context = RenderActorsContext {
2245 fragment_source_ids: &fragment_source_ids,
2246 fragment_splits: &fragment_splits,
2247 streaming_job_databases: &streaming_job_databases,
2248 database_map: &database_map,
2249 backfill_jobs: &backfill_jobs,
2250 };
2251
2252 let result = render_actors(
2253 &actor_id_counter,
2254 &ensembles,
2255 &fragment_map,
2256 &job_map,
2257 &worker_map,
2258 AdaptiveParallelismStrategy::Full,
2259 context,
2260 )
2261 .expect("actor rendering succeeds");
2262
2263 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2264 assert_eq!(
2266 state.len(),
2267 4,
2268 "RATIO(0.5) of 8 workers should give 4 actors"
2269 );
2270 }
2271}