1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::system_param::AdaptiveParallelismStrategy;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_connector::source::{SplitImpl, SplitMetaData};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::prelude::{
29 Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
30};
31use risingwave_meta_model::{
32 CreateType, DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism,
33 WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink, source,
34 streaming_job, table,
35};
36use risingwave_meta_model_migration::Condition;
37use risingwave_pb::common::WorkerNode;
38use risingwave_pb::stream_plan::PbStreamNode;
39use sea_orm::{
40 ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
41 RelationTrait,
42};
43
44use crate::MetaResult;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::manager::ActiveStreamingWorkerNodes;
47use crate::model::{ActorId, StreamActor, StreamingJobModelContextExt};
48use crate::stream::{AssignerBuilder, SplitDiffOptions};
49
50pub(crate) async fn resolve_streaming_job_definition<C>(
51 txn: &C,
52 job_ids: &HashSet<JobId>,
53) -> MetaResult<HashMap<JobId, String>>
54where
55 C: ConnectionTrait,
56{
57 let job_ids = job_ids.iter().cloned().collect_vec();
58
59 let common_job_definitions: Vec<(JobId, String)> = Table::find()
61 .select_only()
62 .columns([
63 table::Column::TableId,
64 #[cfg(not(debug_assertions))]
65 table::Column::Name,
66 #[cfg(debug_assertions)]
67 table::Column::Definition,
68 ])
69 .filter(table::Column::TableId.is_in(job_ids.clone()))
70 .into_tuple()
71 .all(txn)
72 .await?;
73
74 let sink_definitions: Vec<(JobId, String)> = Sink::find()
75 .select_only()
76 .columns([
77 sink::Column::SinkId,
78 #[cfg(not(debug_assertions))]
79 sink::Column::Name,
80 #[cfg(debug_assertions)]
81 sink::Column::Definition,
82 ])
83 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
84 .into_tuple()
85 .all(txn)
86 .await?;
87
88 let source_definitions: Vec<(JobId, String)> = Source::find()
89 .select_only()
90 .columns([
91 source::Column::SourceId,
92 #[cfg(not(debug_assertions))]
93 source::Column::Name,
94 #[cfg(debug_assertions)]
95 source::Column::Definition,
96 ])
97 .filter(source::Column::SourceId.is_in(job_ids.clone()))
98 .into_tuple()
99 .all(txn)
100 .await?;
101
102 let definitions: HashMap<JobId, String> = common_job_definitions
103 .into_iter()
104 .chain(sink_definitions.into_iter())
105 .chain(source_definitions.into_iter())
106 .collect();
107
108 Ok(definitions)
109}
110
111pub async fn load_fragment_info<C>(
112 txn: &C,
113 actor_id_counter: &AtomicU32,
114 database_id: Option<DatabaseId>,
115 worker_nodes: &ActiveStreamingWorkerNodes,
116 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
117) -> MetaResult<FragmentRenderMap>
118where
119 C: ConnectionTrait,
120{
121 let mut query = StreamingJob::find()
122 .select_only()
123 .column(streaming_job::Column::JobId);
124
125 if let Some(database_id) = database_id {
126 query = query
127 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
128 .filter(object::Column::DatabaseId.eq(database_id));
129 }
130
131 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
132
133 if jobs.is_empty() {
134 return Ok(HashMap::new());
135 }
136
137 let jobs: HashSet<JobId> = jobs.into_iter().collect();
138
139 let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
140
141 if loaded.is_empty() {
142 return Ok(HashMap::new());
143 }
144
145 let RenderedGraph { fragments, .. } = render_actor_assignments(
146 actor_id_counter,
147 worker_nodes.current(),
148 adaptive_parallelism_strategy,
149 &loaded,
150 )?;
151
152 Ok(fragments)
153}
154
155#[derive(Debug)]
156pub struct TargetResourcePolicy {
157 pub resource_group: Option<String>,
158 pub parallelism: StreamingParallelism,
159}
160
161#[derive(Debug, Clone)]
162pub struct WorkerInfo {
163 pub parallelism: NonZeroUsize,
164 pub resource_group: Option<String>,
165}
166
167pub type FragmentRenderMap =
168 HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
169
170#[derive(Default)]
171pub struct RenderedGraph {
172 pub fragments: FragmentRenderMap,
173 pub ensembles: Vec<NoShuffleEnsemble>,
174}
175
176impl RenderedGraph {
177 pub fn empty() -> Self {
178 Self::default()
179 }
180}
181
182#[derive(Clone, Debug)]
186pub struct LoadedFragment {
187 pub fragment_id: FragmentId,
188 pub job_id: JobId,
189 pub fragment_type_mask: FragmentTypeMask,
190 pub distribution_type: DistributionType,
191 pub vnode_count: usize,
192 pub nodes: PbStreamNode,
193 pub state_table_ids: HashSet<TableId>,
194 pub parallelism: Option<StreamingParallelism>,
195}
196
197impl From<fragment::Model> for LoadedFragment {
198 fn from(model: fragment::Model) -> Self {
199 Self {
200 fragment_id: model.fragment_id,
201 job_id: model.job_id,
202 fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
203 distribution_type: model.distribution_type,
204 vnode_count: model.vnode_count as usize,
205 nodes: model.stream_node.to_protobuf(),
206 state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
207 parallelism: model.parallelism,
208 }
209 }
210}
211
212#[derive(Default, Debug, Clone)]
213pub struct LoadedFragmentContext {
214 pub ensembles: Vec<NoShuffleEnsemble>,
215 pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
216 pub job_map: HashMap<JobId, streaming_job::Model>,
217 pub streaming_job_databases: HashMap<JobId, DatabaseId>,
218 pub database_map: HashMap<DatabaseId, database::Model>,
219 pub fragment_source_ids: HashMap<FragmentId, SourceId>,
220 pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
221}
222
223impl LoadedFragmentContext {
224 pub fn is_empty(&self) -> bool {
225 if self.ensembles.is_empty() {
226 assert!(
227 self.job_fragments.is_empty(),
228 "non-empty job fragments for empty ensembles: {:?}",
229 self.job_fragments
230 );
231 true
232 } else {
233 false
234 }
235 }
236
237 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
238 let job_ids: HashSet<JobId> = self
239 .streaming_job_databases
240 .iter()
241 .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
242 .collect();
243
244 if job_ids.is_empty() {
245 return None;
246 }
247
248 let job_fragments: HashMap<_, _> = job_ids
249 .iter()
250 .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
251 .collect();
252
253 let fragment_ids: HashSet<_> = job_fragments
254 .values()
255 .flat_map(|fragments| fragments.keys().copied())
256 .collect();
257
258 assert!(
259 !fragment_ids.is_empty(),
260 "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
261 );
262
263 let ensembles: Vec<NoShuffleEnsemble> = self
264 .ensembles
265 .iter()
266 .filter(|ensemble| {
267 if ensemble
268 .components
269 .iter()
270 .any(|fragment_id| fragment_ids.contains(fragment_id))
271 {
272 assert!(
273 ensemble
274 .components
275 .iter()
276 .all(|fragment_id| fragment_ids.contains(fragment_id)),
277 "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
278 );
279 true
280 } else {
281 false
282 }
283 })
284 .cloned()
285 .collect();
286
287 assert!(
288 !ensembles.is_empty(),
289 "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
290 );
291
292 let job_map = job_ids
293 .iter()
294 .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
295 .collect();
296
297 let streaming_job_databases = job_ids
298 .iter()
299 .filter_map(|job_id| {
300 self.streaming_job_databases
301 .get(job_id)
302 .map(|db_id| (*job_id, *db_id))
303 })
304 .collect();
305
306 let database_model = self.database_map[&database_id].clone();
307 let database_map = HashMap::from([(database_id, database_model)]);
308
309 let fragment_source_ids = self
310 .fragment_source_ids
311 .iter()
312 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
313 .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
314 .collect();
315
316 let fragment_splits = self
317 .fragment_splits
318 .iter()
319 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
320 .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
321 .collect();
322
323 Some(Self {
324 ensembles,
325 job_fragments,
326 job_map,
327 streaming_job_databases,
328 database_map,
329 fragment_source_ids,
330 fragment_splits,
331 })
332 }
333}
334
335pub async fn render_fragments<C>(
340 txn: &C,
341 actor_id_counter: &AtomicU32,
342 ensembles: Vec<NoShuffleEnsemble>,
343 workers: &HashMap<WorkerId, WorkerNode>,
344 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
345) -> MetaResult<RenderedGraph>
346where
347 C: ConnectionTrait,
348{
349 let loaded = load_fragment_context(txn, ensembles).await?;
350
351 if loaded.is_empty() {
352 return Ok(RenderedGraph::empty());
353 }
354
355 render_actor_assignments(
356 actor_id_counter,
357 workers,
358 adaptive_parallelism_strategy,
359 &loaded,
360 )
361}
362
363pub async fn load_fragment_context<C>(
366 txn: &C,
367 ensembles: Vec<NoShuffleEnsemble>,
368) -> MetaResult<LoadedFragmentContext>
369where
370 C: ConnectionTrait,
371{
372 if ensembles.is_empty() {
373 return Ok(LoadedFragmentContext::default());
374 }
375
376 let required_fragment_ids: HashSet<_> = ensembles
377 .iter()
378 .flat_map(|ensemble| ensemble.components.iter().copied())
379 .collect();
380
381 let fragment_models = Fragment::find()
382 .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
383 .all(txn)
384 .await?;
385
386 let found_fragment_ids: HashSet<_> = fragment_models
387 .iter()
388 .map(|fragment| fragment.fragment_id)
389 .collect();
390
391 if found_fragment_ids.len() != required_fragment_ids.len() {
392 let missing = required_fragment_ids
393 .difference(&found_fragment_ids)
394 .copied()
395 .collect_vec();
396 return Err(anyhow!("fragments {:?} not found", missing).into());
397 }
398
399 let fragment_models: HashMap<_, _> = fragment_models
400 .into_iter()
401 .map(|fragment| (fragment.fragment_id, fragment))
402 .collect();
403
404 let job_ids: HashSet<_> = fragment_models
405 .values()
406 .map(|fragment| fragment.job_id)
407 .collect();
408
409 if job_ids.is_empty() {
410 return Ok(LoadedFragmentContext::default());
411 }
412
413 let jobs: HashMap<_, _> = StreamingJob::find()
414 .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
415 .all(txn)
416 .await?
417 .into_iter()
418 .map(|job| (job.job_id, job))
419 .collect();
420
421 let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
422 if found_job_ids.len() != job_ids.len() {
423 let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
424 return Err(anyhow!("streaming jobs {:?} not found", missing).into());
425 }
426
427 build_loaded_context(txn, ensembles, fragment_models, jobs).await
428}
429
430pub async fn render_jobs<C>(
433 txn: &C,
434 actor_id_counter: &AtomicU32,
435 job_ids: HashSet<JobId>,
436 workers: &HashMap<WorkerId, WorkerNode>,
437 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
438) -> MetaResult<RenderedGraph>
439where
440 C: ConnectionTrait,
441{
442 let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
443
444 if loaded.is_empty() {
445 return Ok(RenderedGraph::empty());
446 }
447
448 render_actor_assignments(
449 actor_id_counter,
450 workers,
451 adaptive_parallelism_strategy,
452 &loaded,
453 )
454}
455
456pub async fn load_fragment_context_for_jobs<C>(
459 txn: &C,
460 job_ids: HashSet<JobId>,
461) -> MetaResult<LoadedFragmentContext>
462where
463 C: ConnectionTrait,
464{
465 if job_ids.is_empty() {
466 return Ok(LoadedFragmentContext::default());
467 }
468
469 let excluded_fragments_query = FragmentRelation::find()
470 .select_only()
471 .column(fragment_relation::Column::TargetFragmentId)
472 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
473 .into_query();
474
475 let condition = Condition::all()
476 .add(fragment::Column::JobId.is_in(job_ids.clone()))
477 .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
478
479 let fragments: Vec<FragmentId> = Fragment::find()
480 .select_only()
481 .column(fragment::Column::FragmentId)
482 .filter(condition)
483 .into_tuple()
484 .all(txn)
485 .await?;
486
487 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
488
489 let fragments = Fragment::find()
490 .filter(
491 fragment::Column::FragmentId.is_in(
492 ensembles
493 .iter()
494 .flat_map(|graph| graph.components.iter())
495 .cloned()
496 .collect_vec(),
497 ),
498 )
499 .all(txn)
500 .await?;
501
502 let fragment_map: HashMap<_, _> = fragments
503 .into_iter()
504 .map(|fragment| (fragment.fragment_id, fragment))
505 .collect();
506
507 let job_ids = fragment_map
508 .values()
509 .map(|fragment| fragment.job_id)
510 .collect::<BTreeSet<_>>()
511 .into_iter()
512 .collect_vec();
513
514 let jobs: HashMap<_, _> = StreamingJob::find()
515 .filter(streaming_job::Column::JobId.is_in(job_ids))
516 .all(txn)
517 .await?
518 .into_iter()
519 .map(|job| (job.job_id, job))
520 .collect();
521
522 build_loaded_context(txn, ensembles, fragment_map, jobs).await
523}
524
525pub(crate) fn render_actor_assignments(
528 actor_id_counter: &AtomicU32,
529 worker_map: &HashMap<WorkerId, WorkerNode>,
530 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
531 loaded: &LoadedFragmentContext,
532) -> MetaResult<RenderedGraph> {
533 if loaded.is_empty() {
534 return Ok(RenderedGraph::empty());
535 }
536
537 let backfill_jobs: HashSet<JobId> = loaded
538 .job_map
539 .iter()
540 .filter(|(_, job)| {
541 job.create_type == CreateType::Background && job.job_status == JobStatus::Creating
542 })
543 .map(|(id, _)| *id)
544 .collect();
545
546 let render_context = RenderActorsContext {
547 fragment_source_ids: &loaded.fragment_source_ids,
548 fragment_splits: &loaded.fragment_splits,
549 streaming_job_databases: &loaded.streaming_job_databases,
550 database_map: &loaded.database_map,
551 backfill_jobs: &backfill_jobs,
552 };
553
554 let fragments = render_actors(
555 actor_id_counter,
556 &loaded.ensembles,
557 &loaded.job_fragments,
558 &loaded.job_map,
559 worker_map,
560 adaptive_parallelism_strategy,
561 render_context,
562 )?;
563
564 Ok(RenderedGraph {
565 fragments,
566 ensembles: loaded.ensembles.clone(),
567 })
568}
569
570async fn build_loaded_context<C>(
571 txn: &C,
572 ensembles: Vec<NoShuffleEnsemble>,
573 fragment_models: HashMap<FragmentId, fragment::Model>,
574 job_map: HashMap<JobId, streaming_job::Model>,
575) -> MetaResult<LoadedFragmentContext>
576where
577 C: ConnectionTrait,
578{
579 if ensembles.is_empty() {
580 return Ok(LoadedFragmentContext::default());
581 }
582
583 let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
584 for (fragment_id, model) in fragment_models {
585 job_fragments
586 .entry(model.job_id)
587 .or_default()
588 .try_insert(fragment_id, LoadedFragment::from(model))
589 .expect("duplicate fragment id for job");
590 }
591
592 #[cfg(debug_assertions)]
593 {
594 debug_sanity_check(&ensembles, &job_fragments, &job_map);
595 }
596
597 let (fragment_source_ids, fragment_splits) =
598 resolve_source_fragments(txn, &job_fragments).await?;
599
600 let job_ids = job_map.keys().copied().collect_vec();
601
602 let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
603 .select_only()
604 .column(streaming_job::Column::JobId)
605 .column(object::Column::DatabaseId)
606 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
607 .filter(streaming_job::Column::JobId.is_in(job_ids))
608 .into_tuple()
609 .all(txn)
610 .await?
611 .into_iter()
612 .collect();
613
614 let database_map: HashMap<_, _> = Database::find()
615 .filter(
616 database::Column::DatabaseId
617 .is_in(streaming_job_databases.values().copied().collect_vec()),
618 )
619 .all(txn)
620 .await?
621 .into_iter()
622 .map(|db| (db.database_id, db))
623 .collect();
624
625 Ok(LoadedFragmentContext {
626 ensembles,
627 job_fragments,
628 job_map,
629 streaming_job_databases,
630 database_map,
631 fragment_source_ids,
632 fragment_splits,
633 })
634}
635
636struct RenderActorsContext<'a> {
639 fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
640 fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
641 streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
642 database_map: &'a HashMap<DatabaseId, database::Model>,
643 backfill_jobs: &'a HashSet<JobId>,
644}
645
646fn render_actors(
647 actor_id_counter: &AtomicU32,
648 ensembles: &[NoShuffleEnsemble],
649 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
650 job_map: &HashMap<JobId, streaming_job::Model>,
651 worker_map: &HashMap<WorkerId, WorkerNode>,
652 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
653 context: RenderActorsContext<'_>,
654) -> MetaResult<FragmentRenderMap> {
655 let RenderActorsContext {
656 fragment_source_ids,
657 fragment_splits: fragment_splits_map,
658 streaming_job_databases,
659 database_map,
660 backfill_jobs,
661 } = context;
662
663 let mut all_fragments: FragmentRenderMap = HashMap::new();
664 let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
665 .values()
666 .flat_map(|fragments| fragments.iter())
667 .map(|(fragment_id, fragment)| (*fragment_id, fragment))
668 .collect();
669
670 for NoShuffleEnsemble {
671 entries,
672 components,
673 } in ensembles
674 {
675 tracing::debug!("rendering ensemble entries {:?}", entries);
676
677 let entry_fragments = entries
678 .iter()
679 .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
680 .collect_vec();
681
682 let entry_fragment_parallelism = entry_fragments
683 .iter()
684 .map(|fragment| fragment.parallelism.clone())
685 .dedup()
686 .exactly_one()
687 .map_err(|_| {
688 anyhow!(
689 "entry fragments {:?} have inconsistent parallelism settings",
690 entries.iter().copied().collect_vec()
691 )
692 })?;
693
694 let (job_id, vnode_count) = entry_fragments
695 .iter()
696 .map(|f| (f.job_id, f.vnode_count))
697 .dedup()
698 .exactly_one()
699 .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
700
701 let job = job_map
702 .get(&job_id)
703 .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
704
705 let job_strategy = job
706 .stream_context()
707 .adaptive_parallelism_strategy
708 .unwrap_or(adaptive_parallelism_strategy);
709
710 let resource_group = match &job.specific_resource_group {
711 None => {
712 let database = streaming_job_databases
713 .get(&job_id)
714 .and_then(|database_id| database_map.get(database_id))
715 .unwrap();
716 database.resource_group.clone()
717 }
718 Some(resource_group) => resource_group.clone(),
719 };
720
721 let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
722 .iter()
723 .filter_map(|(worker_id, worker)| {
724 if worker
725 .resource_group()
726 .as_deref()
727 .unwrap_or(DEFAULT_RESOURCE_GROUP)
728 == resource_group.as_str()
729 {
730 Some((
731 *worker_id,
732 worker
733 .parallelism()
734 .expect("should have parallelism for compute node")
735 .try_into()
736 .expect("parallelism for compute node"),
737 ))
738 } else {
739 None
740 }
741 })
742 .collect();
743
744 let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
745
746 let effective_job_parallelism = if backfill_jobs.contains(&job_id) {
747 job.backfill_parallelism
748 .as_ref()
749 .unwrap_or(&job.parallelism)
750 } else {
751 &job.parallelism
752 };
753
754 let actual_parallelism = match entry_fragment_parallelism
755 .as_ref()
756 .unwrap_or(effective_job_parallelism)
757 {
758 StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
759 job_strategy.compute_target_parallelism(total_parallelism)
760 }
761 StreamingParallelism::Fixed(n) => *n,
762 }
763 .min(vnode_count)
764 .min(job.max_parallelism as usize);
765
766 tracing::debug!(
767 "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
768 job_id,
769 actual_parallelism,
770 job.parallelism,
771 total_parallelism,
772 job.max_parallelism,
773 vnode_count,
774 entry_fragment_parallelism
775 );
776
777 let assigner = AssignerBuilder::new(job_id).build();
778
779 let actors = (0..(actual_parallelism as u32))
780 .map_into::<ActorId>()
781 .collect_vec();
782 let vnodes = (0..vnode_count).collect_vec();
783
784 let assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
785
786 let source_entry_fragment = entry_fragments.iter().find(|f| {
787 let mask = f.fragment_type_mask;
788 if mask.contains(FragmentTypeFlag::Source) {
789 assert!(!mask.contains(FragmentTypeFlag::SourceScan))
790 }
791 mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
792 });
793
794 let (fragment_splits, shared_source_id) = match source_entry_fragment {
795 Some(entry_fragment) => {
796 let source_id = fragment_source_ids
797 .get(&entry_fragment.fragment_id)
798 .ok_or_else(|| {
799 anyhow!(
800 "missing source id in source fragment {}",
801 entry_fragment.fragment_id
802 )
803 })?;
804
805 let entry_fragment_id = entry_fragment.fragment_id;
806
807 let empty_actor_splits: HashMap<_, _> =
808 actors.iter().map(|actor_id| (*actor_id, vec![])).collect();
809
810 let splits = fragment_splits_map
811 .get(&entry_fragment_id)
812 .cloned()
813 .unwrap_or_default();
814
815 let splits: BTreeMap<_, _> = splits.into_iter().map(|s| (s.id(), s)).collect();
816
817 let fragment_splits = crate::stream::source_manager::reassign_splits(
818 entry_fragment_id,
819 empty_actor_splits,
820 &splits,
821 SplitDiffOptions::default(),
822 )
823 .unwrap_or_default();
824 (fragment_splits, Some(*source_id))
825 }
826 None => (HashMap::new(), None),
827 };
828
829 for component_fragment_id in components {
830 let fragment = fragment_lookup.get(component_fragment_id).unwrap();
831 let fragment_id = fragment.fragment_id;
832 let job_id = fragment.job_id;
833 let fragment_type_mask = fragment.fragment_type_mask;
834 let distribution_type = fragment.distribution_type;
835 let stream_node = &fragment.nodes;
836 let state_table_ids = &fragment.state_table_ids;
837 let vnode_count = fragment.vnode_count;
838
839 let actor_count =
840 u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
841 let actor_id_base = actor_id_counter.fetch_add(actor_count, Ordering::Relaxed);
842
843 let actors: HashMap<ActorId, InflightActorInfo> = assignment
844 .iter()
845 .flat_map(|(worker_id, actors)| {
846 actors
847 .iter()
848 .map(move |(actor_id, vnodes)| (worker_id, actor_id, vnodes))
849 })
850 .map(|(&worker_id, &actor_idx, vnodes)| {
851 let vnode_bitmap = match distribution_type {
852 DistributionType::Single => None,
853 DistributionType::Hash => Some(Bitmap::from_indices(vnode_count, vnodes)),
854 };
855
856 let actor_id = actor_idx + actor_id_base;
857
858 let splits = if let Some(source_id) = fragment_source_ids.get(&fragment_id) {
859 assert_eq!(shared_source_id, Some(*source_id));
860
861 fragment_splits
862 .get(&(actor_idx))
863 .cloned()
864 .unwrap_or_default()
865 } else {
866 vec![]
867 };
868
869 (
870 actor_id,
871 InflightActorInfo {
872 worker_id,
873 vnode_bitmap,
874 splits,
875 },
876 )
877 })
878 .collect();
879
880 let fragment = InflightFragmentInfo {
881 fragment_id,
882 distribution_type,
883 fragment_type_mask,
884 vnode_count,
885 nodes: stream_node.clone(),
886 actors,
887 state_table_ids: state_table_ids.clone(),
888 };
889
890 let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
891 anyhow!("streaming job {job_id} not found in streaming_job_databases")
892 })?;
893
894 all_fragments
895 .entry(database_id)
896 .or_default()
897 .entry(job_id)
898 .or_default()
899 .insert(fragment_id, fragment);
900 }
901 }
902
903 Ok(all_fragments)
904}
905
906#[cfg(debug_assertions)]
907fn debug_sanity_check(
908 ensembles: &[NoShuffleEnsemble],
909 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
910 jobs: &HashMap<JobId, streaming_job::Model>,
911) {
912 let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
913 .iter()
914 .flat_map(|(job_id, fragments)| {
915 fragments
916 .iter()
917 .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
918 })
919 .collect();
920
921 debug_assert!(
923 ensembles
924 .iter()
925 .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
926 "entries must be subset of components"
927 );
928
929 let mut missing_fragments = BTreeSet::new();
930 let mut missing_jobs = BTreeSet::new();
931
932 for fragment_id in ensembles
933 .iter()
934 .flat_map(|ensemble| ensemble.components.iter())
935 {
936 match fragment_lookup.get(fragment_id) {
937 Some((fragment, job_id)) => {
938 if !jobs.contains_key(&fragment.job_id) {
939 missing_jobs.insert(*job_id);
940 }
941 }
942 None => {
943 missing_fragments.insert(*fragment_id);
944 }
945 }
946 }
947
948 debug_assert!(
949 missing_fragments.is_empty(),
950 "missing fragments in fragment_map: {:?}",
951 missing_fragments
952 );
953
954 debug_assert!(
955 missing_jobs.is_empty(),
956 "missing jobs for fragments' job_id: {:?}",
957 missing_jobs
958 );
959
960 for ensemble in ensembles {
961 let unique_vnode_counts: Vec<_> = ensemble
962 .components
963 .iter()
964 .flat_map(|fragment_id| {
965 fragment_lookup
966 .get(fragment_id)
967 .map(|(fragment, _)| fragment.vnode_count)
968 })
969 .unique()
970 .collect();
971
972 debug_assert!(
973 unique_vnode_counts.len() <= 1,
974 "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
975 ensemble.components,
976 unique_vnode_counts
977 );
978 }
979}
980
981async fn resolve_source_fragments<C>(
982 txn: &C,
983 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
984) -> MetaResult<(
985 HashMap<FragmentId, SourceId>,
986 HashMap<FragmentId, Vec<SplitImpl>>,
987)>
988where
989 C: ConnectionTrait,
990{
991 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
992 for (fragment_id, fragment) in job_fragments.values().flatten() {
993 let mask = fragment.fragment_type_mask;
994 if mask.contains(FragmentTypeFlag::Source)
995 && let Some(source_id) = fragment.nodes.find_stream_source()
996 {
997 source_fragment_ids
998 .entry(source_id)
999 .or_default()
1000 .insert(*fragment_id);
1001 }
1002
1003 if mask.contains(FragmentTypeFlag::SourceScan)
1004 && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1005 {
1006 source_fragment_ids
1007 .entry(source_id)
1008 .or_default()
1009 .insert(*fragment_id);
1010 }
1011 }
1012
1013 let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1014 .iter()
1015 .flat_map(|(source_id, fragment_ids)| {
1016 fragment_ids
1017 .iter()
1018 .map(|fragment_id| (*fragment_id, *source_id))
1019 })
1020 .collect();
1021
1022 let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1023
1024 let fragment_splits: Vec<_> = FragmentSplits::find()
1025 .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1026 .all(txn)
1027 .await?;
1028
1029 let fragment_splits: HashMap<_, _> = fragment_splits
1030 .into_iter()
1031 .flat_map(|model| {
1032 model.splits.map(|splits| {
1033 (
1034 model.fragment_id,
1035 splits
1036 .to_protobuf()
1037 .splits
1038 .iter()
1039 .flat_map(SplitImpl::try_from)
1040 .collect_vec(),
1041 )
1042 })
1043 })
1044 .collect();
1045
1046 Ok((fragment_source_ids, fragment_splits))
1047}
1048
1049#[derive(Debug)]
1051pub struct ActorGraph<'a> {
1052 pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1053 pub locations: &'a HashMap<ActorId, WorkerId>,
1054}
1055
1056#[derive(Debug, Clone)]
1057pub struct NoShuffleEnsemble {
1058 entries: HashSet<FragmentId>,
1059 components: HashSet<FragmentId>,
1060}
1061
1062impl NoShuffleEnsemble {
1063 #[cfg(test)]
1064 pub fn for_test(
1065 entries: impl IntoIterator<Item = FragmentId>,
1066 components: impl IntoIterator<Item = FragmentId>,
1067 ) -> Self {
1068 let entries = entries.into_iter().collect();
1069 let components = components.into_iter().collect();
1070 Self {
1071 entries,
1072 components,
1073 }
1074 }
1075
1076 pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1077 self.components.iter().cloned()
1078 }
1079
1080 pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1081 self.entries.iter().copied()
1082 }
1083
1084 pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1085 self.components.iter().copied()
1086 }
1087
1088 pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1089 self.entries.contains(fragment_id)
1090 }
1091}
1092
1093pub async fn find_fragment_no_shuffle_dags_detailed(
1094 db: &impl ConnectionTrait,
1095 initial_fragment_ids: &[FragmentId],
1096) -> MetaResult<Vec<NoShuffleEnsemble>> {
1097 let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1098 .columns([
1099 fragment_relation::Column::SourceFragmentId,
1100 fragment_relation::Column::TargetFragmentId,
1101 ])
1102 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1103 .into_tuple()
1104 .all(db)
1105 .await?;
1106
1107 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1108 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1109
1110 for (src, dst) in all_no_shuffle_relations {
1111 forward_edges.entry(src).or_default().push(dst);
1112 backward_edges.entry(dst).or_default().push(src);
1113 }
1114
1115 find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1116}
1117
1118fn find_no_shuffle_graphs(
1119 initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1120 forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1121 backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1122) -> MetaResult<Vec<NoShuffleEnsemble>> {
1123 let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1124 let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1125
1126 for &init_id in initial_fragment_ids {
1127 let init_id = init_id.into();
1128 if globally_visited.contains(&init_id) {
1129 continue;
1130 }
1131
1132 let mut components = HashSet::new();
1134 let mut queue: VecDeque<FragmentId> = VecDeque::new();
1135
1136 queue.push_back(init_id);
1137 globally_visited.insert(init_id);
1138
1139 while let Some(current_id) = queue.pop_front() {
1140 components.insert(current_id);
1141 let neighbors = forward_edges
1142 .get(¤t_id)
1143 .into_iter()
1144 .flatten()
1145 .chain(backward_edges.get(¤t_id).into_iter().flatten());
1146
1147 for &neighbor_id in neighbors {
1148 if globally_visited.insert(neighbor_id) {
1149 queue.push_back(neighbor_id);
1150 }
1151 }
1152 }
1153
1154 let mut entries = HashSet::new();
1156 for &node_id in &components {
1157 let is_root = match backward_edges.get(&node_id) {
1158 Some(parents) => parents.iter().all(|p| !components.contains(p)),
1159 None => true,
1160 };
1161 if is_root {
1162 entries.insert(node_id);
1163 }
1164 }
1165
1166 if !entries.is_empty() {
1168 graphs.push(NoShuffleEnsemble {
1169 entries,
1170 components,
1171 });
1172 }
1173 }
1174
1175 Ok(graphs)
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180 use std::collections::{BTreeSet, HashMap, HashSet};
1181 use std::sync::Arc;
1182
1183 use risingwave_connector::source::SplitImpl;
1184 use risingwave_connector::source::test_source::TestSourceSplit;
1185 use risingwave_meta_model::{CreateType, JobStatus};
1186 use risingwave_pb::common::WorkerType;
1187 use risingwave_pb::common::worker_node::Property as WorkerProperty;
1188 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1189
1190 use super::*;
1191
1192 type Edges = (
1195 HashMap<FragmentId, Vec<FragmentId>>,
1196 HashMap<FragmentId, Vec<FragmentId>>,
1197 );
1198
1199 fn build_edges(relations: &[(u32, u32)]) -> Edges {
1202 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1203 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1204 for &(src, dst) in relations {
1205 forward_edges
1206 .entry(src.into())
1207 .or_default()
1208 .push(dst.into());
1209 backward_edges
1210 .entry(dst.into())
1211 .or_default()
1212 .push(src.into());
1213 }
1214 (forward_edges, backward_edges)
1215 }
1216
1217 fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1219 ids.iter().map(|id| (*id).into()).collect()
1220 }
1221
1222 fn build_fragment(
1223 fragment_id: FragmentId,
1224 job_id: JobId,
1225 fragment_type_mask: i32,
1226 distribution_type: DistributionType,
1227 vnode_count: i32,
1228 parallelism: StreamingParallelism,
1229 ) -> LoadedFragment {
1230 LoadedFragment {
1231 fragment_id,
1232 job_id,
1233 fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1234 distribution_type,
1235 vnode_count: vnode_count as usize,
1236 nodes: PbStreamNode::default(),
1237 state_table_ids: HashSet::new(),
1238 parallelism: Some(parallelism),
1239 }
1240 }
1241
1242 type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1243
1244 fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1245 let base = fragment.actors.keys().copied().min().unwrap_or_default();
1246
1247 let mut entries: Vec<_> = fragment
1248 .actors
1249 .iter()
1250 .map(|(&actor_id, info)| {
1251 let idx = actor_id.as_raw_id() - base.as_raw_id();
1252 let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1253 bitmap
1254 .iter()
1255 .enumerate()
1256 .filter_map(|(pos, is_set)| is_set.then_some(pos))
1257 .collect::<Vec<_>>()
1258 });
1259 let splits = info
1260 .splits
1261 .iter()
1262 .map(|split| split.id().to_string())
1263 .collect::<Vec<_>>();
1264 (idx.into(), info.worker_id, vnode_indices, splits)
1265 })
1266 .collect();
1267
1268 entries.sort_by_key(|(idx, _, _, _)| *idx);
1269 entries
1270 }
1271
1272 fn build_worker_node(
1273 id: impl Into<WorkerId>,
1274 parallelism: usize,
1275 resource_group: &str,
1276 ) -> WorkerNode {
1277 WorkerNode {
1278 id: id.into(),
1279 r#type: WorkerType::ComputeNode as i32,
1280 property: Some(WorkerProperty {
1281 is_streaming: true,
1282 parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1283 resource_group: Some(resource_group.to_owned()),
1284 ..Default::default()
1285 }),
1286 ..Default::default()
1287 }
1288 }
1289
1290 #[test]
1291 fn test_single_linear_chain() {
1292 let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1295 let initial_ids = &[2];
1296
1297 let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1299
1300 assert!(result.is_ok());
1302 let graphs = result.unwrap();
1303
1304 assert_eq!(graphs.len(), 1);
1305 let graph = &graphs[0];
1306 assert_eq!(graph.entries, to_hashset(&[1]));
1307 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1308 }
1309
1310 #[test]
1311 fn test_two_disconnected_graphs() {
1312 let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1315 let initial_ids = &[2, 10];
1316
1317 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1319
1320 assert_eq!(graphs.len(), 2);
1322
1323 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1325
1326 assert_eq!(graphs[0].entries, to_hashset(&[1]));
1328 assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1329
1330 assert_eq!(graphs[1].entries, to_hashset(&[10]));
1332 assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1333 }
1334
1335 #[test]
1336 fn test_multiple_entries_in_one_graph() {
1337 let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1339 let initial_ids = &[3];
1340
1341 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1343
1344 assert_eq!(graphs.len(), 1);
1346 let graph = &graphs[0];
1347 assert_eq!(graph.entries, to_hashset(&[1, 2]));
1348 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1349 }
1350
1351 #[test]
1352 fn test_diamond_shape_graph() {
1353 let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1355 let initial_ids = &[4];
1356
1357 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1359
1360 assert_eq!(graphs.len(), 1);
1362 let graph = &graphs[0];
1363 assert_eq!(graph.entries, to_hashset(&[1]));
1364 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1365 }
1366
1367 #[test]
1368 fn test_starting_with_multiple_nodes_in_same_graph() {
1369 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1372 let initial_ids = &[2, 4];
1373
1374 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1376
1377 assert_eq!(graphs.len(), 1);
1379 let graph = &graphs[0];
1380 assert_eq!(graph.entries, to_hashset(&[1]));
1381 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1382 }
1383
1384 #[test]
1385 fn test_empty_initial_ids() {
1386 let (forward, backward) = build_edges(&[(1, 2)]);
1388 let initial_ids: &[u32] = &[];
1389
1390 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1392
1393 assert!(graphs.is_empty());
1395 }
1396
1397 #[test]
1398 fn test_isolated_node_as_input() {
1399 let (forward, backward) = build_edges(&[(1, 2)]);
1401 let initial_ids = &[100];
1402
1403 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1405
1406 assert_eq!(graphs.len(), 1);
1408 let graph = &graphs[0];
1409 assert_eq!(graph.entries, to_hashset(&[100]));
1410 assert_eq!(graph.components, to_hashset(&[100]));
1411 }
1412
1413 #[test]
1414 fn test_graph_with_a_cycle() {
1415 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1420 let initial_ids = &[2];
1421
1422 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1424
1425 assert!(
1427 graphs.is_empty(),
1428 "A graph with no entries should not be returned"
1429 );
1430 }
1431 #[test]
1432 fn test_custom_complex() {
1433 let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1434 let initial_ids = &[1, 2, 4, 6];
1435
1436 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1438
1439 assert_eq!(graphs.len(), 2);
1441 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1443
1444 assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1446 assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1447
1448 assert_eq!(graphs[1].entries, to_hashset(&[6]));
1450 assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1451 }
1452
1453 #[test]
1454 fn render_actors_increments_actor_counter() {
1455 let actor_id_counter = AtomicU32::new(100);
1456 let fragment_id: FragmentId = 1.into();
1457 let job_id: JobId = 10.into();
1458 let database_id: DatabaseId = DatabaseId::new(3);
1459
1460 let fragment_model = build_fragment(
1461 fragment_id,
1462 job_id,
1463 0,
1464 DistributionType::Single,
1465 1,
1466 StreamingParallelism::Fixed(1),
1467 );
1468
1469 let job_model = streaming_job::Model {
1470 job_id,
1471 job_status: JobStatus::Created,
1472 create_type: CreateType::Foreground,
1473 timezone: None,
1474 config_override: None,
1475 adaptive_parallelism_strategy: None,
1476 parallelism: StreamingParallelism::Fixed(1),
1477 backfill_parallelism: None,
1478 backfill_orders: None,
1479 max_parallelism: 1,
1480 specific_resource_group: None,
1481 is_serverless_backfill: false,
1482 };
1483
1484 let database_model = database::Model {
1485 database_id,
1486 name: "test_db".into(),
1487 resource_group: "rg-a".into(),
1488 barrier_interval_ms: None,
1489 checkpoint_frequency: None,
1490 };
1491
1492 let ensembles = vec![NoShuffleEnsemble {
1493 entries: HashSet::from([fragment_id]),
1494 components: HashSet::from([fragment_id]),
1495 }];
1496
1497 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
1498 let job_fragments = HashMap::from([(job_id, fragment_map)]);
1499 let job_map = HashMap::from([(job_id, job_model)]);
1500
1501 let worker_map: HashMap<WorkerId, WorkerNode> =
1502 HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
1503
1504 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1505 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1506 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1507 let database_map = HashMap::from([(database_id, database_model)]);
1508 let backfill_jobs = HashSet::new();
1509
1510 let context = RenderActorsContext {
1511 fragment_source_ids: &fragment_source_ids,
1512 fragment_splits: &fragment_splits,
1513 streaming_job_databases: &streaming_job_databases,
1514 database_map: &database_map,
1515 backfill_jobs: &backfill_jobs,
1516 };
1517
1518 let result = render_actors(
1519 &actor_id_counter,
1520 &ensembles,
1521 &job_fragments,
1522 &job_map,
1523 &worker_map,
1524 AdaptiveParallelismStrategy::Auto,
1525 context,
1526 )
1527 .expect("actor rendering succeeds");
1528
1529 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1530 assert_eq!(state.len(), 1);
1531 assert!(
1532 state[0].2.is_none(),
1533 "single distribution should not assign vnode bitmaps"
1534 );
1535 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
1536 }
1537
1538 #[test]
1539 fn render_actors_aligns_hash_vnode_bitmaps() {
1540 let actor_id_counter = AtomicU32::new(0);
1541 let entry_fragment_id: FragmentId = 1.into();
1542 let downstream_fragment_id: FragmentId = 2.into();
1543 let job_id: JobId = 20.into();
1544 let database_id: DatabaseId = DatabaseId::new(5);
1545
1546 let entry_fragment = build_fragment(
1547 entry_fragment_id,
1548 job_id,
1549 0,
1550 DistributionType::Hash,
1551 4,
1552 StreamingParallelism::Fixed(2),
1553 );
1554
1555 let downstream_fragment = build_fragment(
1556 downstream_fragment_id,
1557 job_id,
1558 0,
1559 DistributionType::Hash,
1560 4,
1561 StreamingParallelism::Fixed(2),
1562 );
1563
1564 let job_model = streaming_job::Model {
1565 job_id,
1566 job_status: JobStatus::Created,
1567 create_type: CreateType::Background,
1568 timezone: None,
1569 config_override: None,
1570 adaptive_parallelism_strategy: None,
1571 parallelism: StreamingParallelism::Fixed(2),
1572 backfill_parallelism: None,
1573 backfill_orders: None,
1574 max_parallelism: 2,
1575 specific_resource_group: None,
1576 is_serverless_backfill: false,
1577 };
1578
1579 let database_model = database::Model {
1580 database_id,
1581 name: "test_db_hash".into(),
1582 resource_group: "rg-hash".into(),
1583 barrier_interval_ms: None,
1584 checkpoint_frequency: None,
1585 };
1586
1587 let ensembles = vec![NoShuffleEnsemble {
1588 entries: HashSet::from([entry_fragment_id]),
1589 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1590 }];
1591
1592 let fragment_map = HashMap::from([
1593 (entry_fragment_id, entry_fragment),
1594 (downstream_fragment_id, downstream_fragment),
1595 ]);
1596 let job_fragments = HashMap::from([(job_id, fragment_map)]);
1597 let job_map = HashMap::from([(job_id, job_model)]);
1598
1599 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
1600 (1.into(), build_worker_node(1, 1, "rg-hash")),
1601 (2.into(), build_worker_node(2, 1, "rg-hash")),
1602 ]);
1603
1604 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1605 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1606 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1607 let database_map = HashMap::from([(database_id, database_model)]);
1608 let backfill_jobs = HashSet::new();
1609
1610 let context = RenderActorsContext {
1611 fragment_source_ids: &fragment_source_ids,
1612 fragment_splits: &fragment_splits,
1613 streaming_job_databases: &streaming_job_databases,
1614 database_map: &database_map,
1615 backfill_jobs: &backfill_jobs,
1616 };
1617
1618 let result = render_actors(
1619 &actor_id_counter,
1620 &ensembles,
1621 &job_fragments,
1622 &job_map,
1623 &worker_map,
1624 AdaptiveParallelismStrategy::Auto,
1625 context,
1626 )
1627 .expect("actor rendering succeeds");
1628
1629 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1630 let downstream_state =
1631 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1632
1633 assert_eq!(entry_state.len(), 2);
1634 assert_eq!(entry_state, downstream_state);
1635
1636 let assigned_vnodes: BTreeSet<_> = entry_state
1637 .iter()
1638 .flat_map(|(_, _, vnodes, _)| {
1639 vnodes
1640 .as_ref()
1641 .expect("hash distribution should populate vnode bitmap")
1642 .iter()
1643 .copied()
1644 })
1645 .collect();
1646 assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
1647 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1648 }
1649
1650 #[test]
1651 fn render_actors_propagates_source_splits() {
1652 let actor_id_counter = AtomicU32::new(0);
1653 let entry_fragment_id: FragmentId = 11.into();
1654 let downstream_fragment_id: FragmentId = 12.into();
1655 let job_id: JobId = 30.into();
1656 let database_id: DatabaseId = DatabaseId::new(7);
1657 let source_id: SourceId = 99.into();
1658
1659 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
1660 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
1661
1662 let entry_fragment = build_fragment(
1663 entry_fragment_id,
1664 job_id,
1665 source_mask,
1666 DistributionType::Hash,
1667 4,
1668 StreamingParallelism::Fixed(2),
1669 );
1670
1671 let downstream_fragment = build_fragment(
1672 downstream_fragment_id,
1673 job_id,
1674 source_scan_mask,
1675 DistributionType::Hash,
1676 4,
1677 StreamingParallelism::Fixed(2),
1678 );
1679
1680 let job_model = streaming_job::Model {
1681 job_id,
1682 job_status: JobStatus::Created,
1683 create_type: CreateType::Background,
1684 timezone: None,
1685 config_override: None,
1686 adaptive_parallelism_strategy: None,
1687 parallelism: StreamingParallelism::Fixed(2),
1688 backfill_parallelism: None,
1689 backfill_orders: None,
1690 max_parallelism: 2,
1691 specific_resource_group: None,
1692 is_serverless_backfill: false,
1693 };
1694
1695 let database_model = database::Model {
1696 database_id,
1697 name: "split_db".into(),
1698 resource_group: "rg-source".into(),
1699 barrier_interval_ms: None,
1700 checkpoint_frequency: None,
1701 };
1702
1703 let ensembles = vec![NoShuffleEnsemble {
1704 entries: HashSet::from([entry_fragment_id]),
1705 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
1706 }];
1707
1708 let fragment_map = HashMap::from([
1709 (entry_fragment_id, entry_fragment),
1710 (downstream_fragment_id, downstream_fragment),
1711 ]);
1712 let job_fragments = HashMap::from([(job_id, fragment_map)]);
1713 let job_map = HashMap::from([(job_id, job_model)]);
1714
1715 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
1716 (1.into(), build_worker_node(1, 1, "rg-source")),
1717 (2.into(), build_worker_node(2, 1, "rg-source")),
1718 ]);
1719
1720 let split_a = SplitImpl::Test(TestSourceSplit {
1721 id: Arc::<str>::from("split-a"),
1722 properties: HashMap::new(),
1723 offset: "0".into(),
1724 });
1725 let split_b = SplitImpl::Test(TestSourceSplit {
1726 id: Arc::<str>::from("split-b"),
1727 properties: HashMap::new(),
1728 offset: "0".into(),
1729 });
1730
1731 let fragment_source_ids = HashMap::from([
1732 (entry_fragment_id, source_id),
1733 (downstream_fragment_id, source_id),
1734 ]);
1735 let fragment_splits =
1736 HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
1737 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1738 let database_map = HashMap::from([(database_id, database_model)]);
1739 let backfill_jobs = HashSet::new();
1740
1741 let context = RenderActorsContext {
1742 fragment_source_ids: &fragment_source_ids,
1743 fragment_splits: &fragment_splits,
1744 streaming_job_databases: &streaming_job_databases,
1745 database_map: &database_map,
1746 backfill_jobs: &backfill_jobs,
1747 };
1748
1749 let result = render_actors(
1750 &actor_id_counter,
1751 &ensembles,
1752 &job_fragments,
1753 &job_map,
1754 &worker_map,
1755 AdaptiveParallelismStrategy::Auto,
1756 context,
1757 )
1758 .expect("actor rendering succeeds");
1759
1760 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
1761 let downstream_state =
1762 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
1763
1764 assert_eq!(entry_state, downstream_state);
1765
1766 let split_ids: BTreeSet<_> = entry_state
1767 .iter()
1768 .flat_map(|(_, _, _, splits)| splits.iter().cloned())
1769 .collect();
1770 assert_eq!(
1771 split_ids,
1772 BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
1773 );
1774 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
1775 }
1776
1777 #[test]
1779 fn render_actors_job_strategy_overrides_global() {
1780 let actor_id_counter = AtomicU32::new(0);
1781 let fragment_id: FragmentId = 1.into();
1782 let job_id: JobId = 100.into();
1783 let database_id: DatabaseId = DatabaseId::new(10);
1784
1785 let fragment_model = build_fragment(
1787 fragment_id,
1788 job_id,
1789 0,
1790 DistributionType::Hash,
1791 8,
1792 StreamingParallelism::Adaptive,
1793 );
1794
1795 let job_model = streaming_job::Model {
1797 job_id,
1798 job_status: JobStatus::Created,
1799 create_type: CreateType::Foreground,
1800 timezone: None,
1801 config_override: None,
1802 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1803 parallelism: StreamingParallelism::Adaptive,
1804 backfill_parallelism: None,
1805 backfill_orders: None,
1806 max_parallelism: 8,
1807 specific_resource_group: None,
1808 is_serverless_backfill: false,
1809 };
1810
1811 let database_model = database::Model {
1812 database_id,
1813 name: "test_db".into(),
1814 resource_group: "default".into(),
1815 barrier_interval_ms: None,
1816 checkpoint_frequency: None,
1817 };
1818
1819 let ensembles = vec![NoShuffleEnsemble {
1820 entries: HashSet::from([fragment_id]),
1821 components: HashSet::from([fragment_id]),
1822 }];
1823
1824 let fragment_map =
1825 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
1826 let job_map = HashMap::from([(job_id, job_model)]);
1827
1828 let worker_map = HashMap::from([
1830 (1.into(), build_worker_node(1, 1, "default")),
1831 (2.into(), build_worker_node(2, 1, "default")),
1832 (3.into(), build_worker_node(3, 1, "default")),
1833 (4.into(), build_worker_node(4, 1, "default")),
1834 ]);
1835
1836 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1837 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1838 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1839 let database_map = HashMap::from([(database_id, database_model)]);
1840 let backfill_jobs = HashSet::new();
1841
1842 let context = RenderActorsContext {
1843 fragment_source_ids: &fragment_source_ids,
1844 fragment_splits: &fragment_splits,
1845 streaming_job_databases: &streaming_job_databases,
1846 database_map: &database_map,
1847 backfill_jobs: &backfill_jobs,
1848 };
1849
1850 let result = render_actors(
1852 &actor_id_counter,
1853 &ensembles,
1854 &fragment_map,
1855 &job_map,
1856 &worker_map,
1857 AdaptiveParallelismStrategy::Full,
1858 context,
1859 )
1860 .expect("actor rendering succeeds");
1861
1862 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1863 assert_eq!(
1865 state.len(),
1866 2,
1867 "Job strategy BOUNDED(2) should override global FULL"
1868 );
1869 }
1870
1871 #[test]
1873 fn render_actors_uses_global_strategy_when_job_has_none() {
1874 let actor_id_counter = AtomicU32::new(0);
1875 let fragment_id: FragmentId = 1.into();
1876 let job_id: JobId = 101.into();
1877 let database_id: DatabaseId = DatabaseId::new(11);
1878
1879 let fragment_model = build_fragment(
1880 fragment_id,
1881 job_id,
1882 0,
1883 DistributionType::Hash,
1884 8,
1885 StreamingParallelism::Adaptive,
1886 );
1887
1888 let job_model = streaming_job::Model {
1890 job_id,
1891 job_status: JobStatus::Created,
1892 create_type: CreateType::Foreground,
1893 timezone: None,
1894 config_override: None,
1895 adaptive_parallelism_strategy: None, parallelism: StreamingParallelism::Adaptive,
1897 backfill_parallelism: None,
1898 backfill_orders: None,
1899 max_parallelism: 8,
1900 specific_resource_group: None,
1901 is_serverless_backfill: false,
1902 };
1903
1904 let database_model = database::Model {
1905 database_id,
1906 name: "test_db".into(),
1907 resource_group: "default".into(),
1908 barrier_interval_ms: None,
1909 checkpoint_frequency: None,
1910 };
1911
1912 let ensembles = vec![NoShuffleEnsemble {
1913 entries: HashSet::from([fragment_id]),
1914 components: HashSet::from([fragment_id]),
1915 }];
1916
1917 let fragment_map =
1918 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
1919 let job_map = HashMap::from([(job_id, job_model)]);
1920
1921 let worker_map = HashMap::from([
1923 (1.into(), build_worker_node(1, 1, "default")),
1924 (2.into(), build_worker_node(2, 1, "default")),
1925 (3.into(), build_worker_node(3, 1, "default")),
1926 (4.into(), build_worker_node(4, 1, "default")),
1927 ]);
1928
1929 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
1930 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
1931 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
1932 let database_map = HashMap::from([(database_id, database_model)]);
1933 let backfill_jobs = HashSet::new();
1934
1935 let context = RenderActorsContext {
1936 fragment_source_ids: &fragment_source_ids,
1937 fragment_splits: &fragment_splits,
1938 streaming_job_databases: &streaming_job_databases,
1939 database_map: &database_map,
1940 backfill_jobs: &backfill_jobs,
1941 };
1942
1943 let result = render_actors(
1945 &actor_id_counter,
1946 &ensembles,
1947 &fragment_map,
1948 &job_map,
1949 &worker_map,
1950 AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(3).unwrap()),
1951 context,
1952 )
1953 .expect("actor rendering succeeds");
1954
1955 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
1956 assert_eq!(
1958 state.len(),
1959 3,
1960 "Should use global strategy BOUNDED(3) when job has no custom strategy"
1961 );
1962 }
1963
1964 #[test]
1966 fn render_actors_fixed_parallelism_ignores_strategy() {
1967 let actor_id_counter = AtomicU32::new(0);
1968 let fragment_id: FragmentId = 1.into();
1969 let job_id: JobId = 102.into();
1970 let database_id: DatabaseId = DatabaseId::new(12);
1971
1972 let fragment_model = build_fragment(
1974 fragment_id,
1975 job_id,
1976 0,
1977 DistributionType::Hash,
1978 8,
1979 StreamingParallelism::Fixed(5),
1980 );
1981
1982 let job_model = streaming_job::Model {
1984 job_id,
1985 job_status: JobStatus::Created,
1986 create_type: CreateType::Foreground,
1987 timezone: None,
1988 config_override: None,
1989 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
1990 parallelism: StreamingParallelism::Fixed(5),
1991 backfill_parallelism: None,
1992 backfill_orders: None,
1993 max_parallelism: 8,
1994 specific_resource_group: None,
1995 is_serverless_backfill: false,
1996 };
1997
1998 let database_model = database::Model {
1999 database_id,
2000 name: "test_db".into(),
2001 resource_group: "default".into(),
2002 barrier_interval_ms: None,
2003 checkpoint_frequency: None,
2004 };
2005
2006 let ensembles = vec![NoShuffleEnsemble {
2007 entries: HashSet::from([fragment_id]),
2008 components: HashSet::from([fragment_id]),
2009 }];
2010
2011 let fragment_map =
2012 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2013 let job_map = HashMap::from([(job_id, job_model)]);
2014
2015 let worker_map = HashMap::from([
2017 (1.into(), build_worker_node(1, 1, "default")),
2018 (2.into(), build_worker_node(2, 1, "default")),
2019 (3.into(), build_worker_node(3, 1, "default")),
2020 (4.into(), build_worker_node(4, 1, "default")),
2021 (5.into(), build_worker_node(5, 1, "default")),
2022 (6.into(), build_worker_node(6, 1, "default")),
2023 ]);
2024
2025 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2026 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2027 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2028 let database_map = HashMap::from([(database_id, database_model)]);
2029 let backfill_jobs = HashSet::new();
2030
2031 let context = RenderActorsContext {
2032 fragment_source_ids: &fragment_source_ids,
2033 fragment_splits: &fragment_splits,
2034 streaming_job_databases: &streaming_job_databases,
2035 database_map: &database_map,
2036 backfill_jobs: &backfill_jobs,
2037 };
2038
2039 let result = render_actors(
2040 &actor_id_counter,
2041 &ensembles,
2042 &fragment_map,
2043 &job_map,
2044 &worker_map,
2045 AdaptiveParallelismStrategy::Full,
2046 context,
2047 )
2048 .expect("actor rendering succeeds");
2049
2050 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2051 assert_eq!(
2053 state.len(),
2054 5,
2055 "Fixed parallelism should ignore all strategies"
2056 );
2057 }
2058
2059 #[test]
2061 fn render_actors_ratio_strategy() {
2062 let actor_id_counter = AtomicU32::new(0);
2063 let fragment_id: FragmentId = 1.into();
2064 let job_id: JobId = 103.into();
2065 let database_id: DatabaseId = DatabaseId::new(13);
2066
2067 let fragment_model = build_fragment(
2068 fragment_id,
2069 job_id,
2070 0,
2071 DistributionType::Hash,
2072 16,
2073 StreamingParallelism::Adaptive,
2074 );
2075
2076 let job_model = streaming_job::Model {
2078 job_id,
2079 job_status: JobStatus::Created,
2080 create_type: CreateType::Foreground,
2081 timezone: None,
2082 config_override: None,
2083 adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2084 parallelism: StreamingParallelism::Adaptive,
2085 backfill_parallelism: None,
2086 backfill_orders: None,
2087 max_parallelism: 16,
2088 specific_resource_group: None,
2089 is_serverless_backfill: false,
2090 };
2091
2092 let database_model = database::Model {
2093 database_id,
2094 name: "test_db".into(),
2095 resource_group: "default".into(),
2096 barrier_interval_ms: None,
2097 checkpoint_frequency: None,
2098 };
2099
2100 let ensembles = vec![NoShuffleEnsemble {
2101 entries: HashSet::from([fragment_id]),
2102 components: HashSet::from([fragment_id]),
2103 }];
2104
2105 let fragment_map =
2106 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2107 let job_map = HashMap::from([(job_id, job_model)]);
2108
2109 let worker_map = HashMap::from([
2111 (1.into(), build_worker_node(1, 1, "default")),
2112 (2.into(), build_worker_node(2, 1, "default")),
2113 (3.into(), build_worker_node(3, 1, "default")),
2114 (4.into(), build_worker_node(4, 1, "default")),
2115 (5.into(), build_worker_node(5, 1, "default")),
2116 (6.into(), build_worker_node(6, 1, "default")),
2117 (7.into(), build_worker_node(7, 1, "default")),
2118 (8.into(), build_worker_node(8, 1, "default")),
2119 ]);
2120
2121 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2122 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2123 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2124 let database_map = HashMap::from([(database_id, database_model)]);
2125 let backfill_jobs = HashSet::new();
2126
2127 let context = RenderActorsContext {
2128 fragment_source_ids: &fragment_source_ids,
2129 fragment_splits: &fragment_splits,
2130 streaming_job_databases: &streaming_job_databases,
2131 database_map: &database_map,
2132 backfill_jobs: &backfill_jobs,
2133 };
2134
2135 let result = render_actors(
2136 &actor_id_counter,
2137 &ensembles,
2138 &fragment_map,
2139 &job_map,
2140 &worker_map,
2141 AdaptiveParallelismStrategy::Full,
2142 context,
2143 )
2144 .expect("actor rendering succeeds");
2145
2146 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2147 assert_eq!(
2149 state.len(),
2150 4,
2151 "RATIO(0.5) of 8 workers should give 4 actors"
2152 );
2153 }
2154}