1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::num::NonZeroUsize;
17use std::sync::atomic::{AtomicU32, Ordering};
18
19use anyhow::anyhow;
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
23#[cfg(debug_assertions)]
24use risingwave_common::hash::VnodeBitmapExt;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
28use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_meta_model::prelude::{
31 Database, Fragment, FragmentRelation, FragmentSplits, Sink, Source, StreamingJob, Table,
32};
33use risingwave_meta_model::{
34 DatabaseId, DispatcherType, FragmentId, JobStatus, SourceId, StreamingParallelism, WorkerId,
35 database, fragment, fragment_relation, fragment_splits, object, sink, source, streaming_job,
36 table,
37};
38use risingwave_meta_model_migration::Condition;
39use risingwave_pb::common::WorkerNode;
40use risingwave_pb::stream_plan::PbStreamNode;
41use sea_orm::{
42 ColumnTrait, ConnectionTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, QueryTrait,
43 RelationTrait,
44};
45
46use crate::MetaResult;
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::controller::utils::resolve_no_shuffle_actor_mapping;
49use crate::manager::ActiveStreamingWorkerNodes;
50use crate::model::{ActorId, StreamActor, StreamingJobModelContextExt};
51use crate::stream::{AssignerBuilder, SplitDiffOptions};
52
53pub(crate) async fn resolve_streaming_job_definition<C>(
54 txn: &C,
55 job_ids: &HashSet<JobId>,
56) -> MetaResult<HashMap<JobId, String>>
57where
58 C: ConnectionTrait,
59{
60 let job_ids = job_ids.iter().cloned().collect_vec();
61
62 let common_job_definitions: Vec<(JobId, String)> = Table::find()
64 .select_only()
65 .columns([
66 table::Column::TableId,
67 #[cfg(not(debug_assertions))]
68 table::Column::Name,
69 #[cfg(debug_assertions)]
70 table::Column::Definition,
71 ])
72 .filter(table::Column::TableId.is_in(job_ids.clone()))
73 .into_tuple()
74 .all(txn)
75 .await?;
76
77 let sink_definitions: Vec<(JobId, String)> = Sink::find()
78 .select_only()
79 .columns([
80 sink::Column::SinkId,
81 #[cfg(not(debug_assertions))]
82 sink::Column::Name,
83 #[cfg(debug_assertions)]
84 sink::Column::Definition,
85 ])
86 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
87 .into_tuple()
88 .all(txn)
89 .await?;
90
91 let source_definitions: Vec<(JobId, String)> = Source::find()
92 .select_only()
93 .columns([
94 source::Column::SourceId,
95 #[cfg(not(debug_assertions))]
96 source::Column::Name,
97 #[cfg(debug_assertions)]
98 source::Column::Definition,
99 ])
100 .filter(source::Column::SourceId.is_in(job_ids.clone()))
101 .into_tuple()
102 .all(txn)
103 .await?;
104
105 let definitions: HashMap<JobId, String> = common_job_definitions
106 .into_iter()
107 .chain(sink_definitions.into_iter())
108 .chain(source_definitions.into_iter())
109 .collect();
110
111 Ok(definitions)
112}
113
114pub async fn load_fragment_info<C>(
115 txn: &C,
116 actor_id_counter: &AtomicU32,
117 database_id: Option<DatabaseId>,
118 worker_nodes: &ActiveStreamingWorkerNodes,
119 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
120) -> MetaResult<FragmentRenderMap>
121where
122 C: ConnectionTrait,
123{
124 let mut query = StreamingJob::find()
125 .select_only()
126 .column(streaming_job::Column::JobId);
127
128 if let Some(database_id) = database_id {
129 query = query
130 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
131 .filter(object::Column::DatabaseId.eq(database_id));
132 }
133
134 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
135
136 if jobs.is_empty() {
137 return Ok(HashMap::new());
138 }
139
140 let jobs: HashSet<JobId> = jobs.into_iter().collect();
141
142 let loaded = load_fragment_context_for_jobs(txn, jobs).await?;
143
144 if loaded.is_empty() {
145 return Ok(HashMap::new());
146 }
147
148 let RenderedGraph { fragments, .. } = render_actor_assignments(
149 actor_id_counter,
150 worker_nodes.current(),
151 adaptive_parallelism_strategy,
152 &loaded,
153 )?;
154
155 Ok(fragments)
156}
157
158#[derive(Debug)]
159pub struct TargetResourcePolicy {
160 pub resource_group: Option<String>,
161 pub parallelism: StreamingParallelism,
162}
163
164#[derive(Debug, Clone)]
165pub struct WorkerInfo {
166 pub parallelism: NonZeroUsize,
167 pub resource_group: Option<String>,
168}
169
170pub type FragmentRenderMap =
171 HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>;
172
173#[derive(Default)]
174pub struct RenderedGraph {
175 pub fragments: FragmentRenderMap,
176 pub ensembles: Vec<NoShuffleEnsemble>,
177}
178
179impl RenderedGraph {
180 pub fn empty() -> Self {
181 Self::default()
182 }
183}
184
185#[derive(Clone, Debug)]
189pub struct LoadedFragment {
190 pub fragment_id: FragmentId,
191 pub job_id: JobId,
192 pub fragment_type_mask: FragmentTypeMask,
193 pub distribution_type: DistributionType,
194 pub vnode_count: usize,
195 pub nodes: PbStreamNode,
196 pub state_table_ids: HashSet<TableId>,
197 pub parallelism: Option<StreamingParallelism>,
198}
199
200impl From<fragment::Model> for LoadedFragment {
201 fn from(model: fragment::Model) -> Self {
202 Self {
203 fragment_id: model.fragment_id,
204 job_id: model.job_id,
205 fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
206 distribution_type: model.distribution_type,
207 vnode_count: model.vnode_count as usize,
208 nodes: model.stream_node.to_protobuf(),
209 state_table_ids: model.state_table_ids.into_inner().into_iter().collect(),
210 parallelism: model.parallelism,
211 }
212 }
213}
214
215#[derive(Default, Debug, Clone)]
216pub struct LoadedFragmentContext {
217 pub ensembles: Vec<NoShuffleEnsemble>,
218 pub job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
219 pub job_map: HashMap<JobId, streaming_job::Model>,
220 pub streaming_job_databases: HashMap<JobId, DatabaseId>,
221 pub database_map: HashMap<DatabaseId, database::Model>,
222 pub fragment_source_ids: HashMap<FragmentId, SourceId>,
223 pub fragment_splits: HashMap<FragmentId, Vec<SplitImpl>>,
224}
225
226impl LoadedFragmentContext {
227 pub fn is_empty(&self) -> bool {
228 if self.ensembles.is_empty() {
229 assert!(
230 self.job_fragments.is_empty(),
231 "non-empty job fragments for empty ensembles: {:?}",
232 self.job_fragments
233 );
234 true
235 } else {
236 false
237 }
238 }
239
240 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
241 let job_ids: HashSet<JobId> = self
242 .streaming_job_databases
243 .iter()
244 .filter_map(|(job_id, db_id)| (*db_id == database_id).then_some(*job_id))
245 .collect();
246
247 if job_ids.is_empty() {
248 return None;
249 }
250
251 let job_fragments: HashMap<_, _> = job_ids
252 .iter()
253 .map(|job_id| (*job_id, self.job_fragments[job_id].clone()))
254 .collect();
255
256 let fragment_ids: HashSet<_> = job_fragments
257 .values()
258 .flat_map(|fragments| fragments.keys().copied())
259 .collect();
260
261 assert!(
262 !fragment_ids.is_empty(),
263 "empty fragments for non-empty database {database_id} with jobs {job_ids:?}"
264 );
265
266 let ensembles: Vec<NoShuffleEnsemble> = self
267 .ensembles
268 .iter()
269 .filter(|ensemble| {
270 if ensemble
271 .components
272 .iter()
273 .any(|fragment_id| fragment_ids.contains(fragment_id))
274 {
275 assert!(
276 ensemble
277 .components
278 .iter()
279 .all(|fragment_id| fragment_ids.contains(fragment_id)),
280 "ensemble {ensemble:?} partially exists in database {database_id} with fragments {job_fragments:?}"
281 );
282 true
283 } else {
284 false
285 }
286 })
287 .cloned()
288 .collect();
289
290 assert!(
291 !ensembles.is_empty(),
292 "empty ensembles for non-empty database {database_id} with jobs {job_fragments:?}"
293 );
294
295 let job_map = job_ids
296 .iter()
297 .filter_map(|job_id| self.job_map.get(job_id).map(|job| (*job_id, job.clone())))
298 .collect();
299
300 let streaming_job_databases = job_ids
301 .iter()
302 .filter_map(|job_id| {
303 self.streaming_job_databases
304 .get(job_id)
305 .map(|db_id| (*job_id, *db_id))
306 })
307 .collect();
308
309 let database_model = self.database_map[&database_id].clone();
310 let database_map = HashMap::from([(database_id, database_model)]);
311
312 let fragment_source_ids = self
313 .fragment_source_ids
314 .iter()
315 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
316 .map(|(fragment_id, source_id)| (*fragment_id, *source_id))
317 .collect();
318
319 let fragment_splits = self
320 .fragment_splits
321 .iter()
322 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
323 .map(|(fragment_id, splits)| (*fragment_id, splits.clone()))
324 .collect();
325
326 Some(Self {
327 ensembles,
328 job_fragments,
329 job_map,
330 streaming_job_databases,
331 database_map,
332 fragment_source_ids,
333 fragment_splits,
334 })
335 }
336
337 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
340 let Self {
341 ensembles,
342 mut job_fragments,
343 mut job_map,
344 streaming_job_databases,
345 mut database_map,
346 mut fragment_source_ids,
347 mut fragment_splits,
348 } = self;
349
350 let mut contexts = HashMap::<DatabaseId, Self>::new();
351 let mut fragment_databases = HashMap::<FragmentId, DatabaseId>::new();
352 let mut unresolved_ensembles = 0usize;
353 let mut unresolved_ensemble_sample: Option<Vec<FragmentId>> = None;
354
355 for (job_id, database_id) in streaming_job_databases {
356 let context = contexts.entry(database_id).or_insert_with(|| {
357 let database_model = database_map
358 .remove(&database_id)
359 .expect("database should exist for streaming job");
360 Self {
361 ensembles: Vec::new(),
362 job_fragments: HashMap::new(),
363 job_map: HashMap::new(),
364 streaming_job_databases: HashMap::new(),
365 database_map: HashMap::from([(database_id, database_model)]),
366 fragment_source_ids: HashMap::new(),
367 fragment_splits: HashMap::new(),
368 }
369 });
370
371 let fragments = job_fragments
372 .remove(&job_id)
373 .expect("job fragments should exist for streaming job");
374 for fragment_id in fragments.keys().copied() {
375 fragment_databases.insert(fragment_id, database_id);
376 if let Some(source_id) = fragment_source_ids.remove(&fragment_id) {
377 context.fragment_source_ids.insert(fragment_id, source_id);
378 }
379 if let Some(splits) = fragment_splits.remove(&fragment_id) {
380 context.fragment_splits.insert(fragment_id, splits);
381 }
382 }
383
384 assert!(
385 context
386 .job_map
387 .insert(
388 job_id,
389 job_map
390 .remove(&job_id)
391 .expect("streaming job should exist for loaded context"),
392 )
393 .is_none(),
394 "duplicated streaming job"
395 );
396 assert!(
397 context.job_fragments.insert(job_id, fragments).is_none(),
398 "duplicated job fragments"
399 );
400 assert!(
401 context
402 .streaming_job_databases
403 .insert(job_id, database_id)
404 .is_none(),
405 "duplicated job database mapping"
406 );
407 }
408
409 for ensemble in ensembles {
410 let Some(database_id) = ensemble
411 .components
412 .iter()
413 .find_map(|fragment_id| fragment_databases.get(fragment_id).copied())
414 else {
415 unresolved_ensembles += 1;
416 if unresolved_ensemble_sample.is_none() {
417 unresolved_ensemble_sample =
418 Some(ensemble.components.iter().copied().collect());
419 }
420 continue;
421 };
422
423 debug_assert!(
424 ensemble
425 .components
426 .iter()
427 .all(|fragment_id| fragment_databases.get(fragment_id) == Some(&database_id)),
428 "ensemble {ensemble:?} should belong to a single database"
429 );
430
431 contexts
432 .get_mut(&database_id)
433 .expect("database context should exist for ensemble")
434 .ensembles
435 .push(ensemble);
436 }
437
438 if unresolved_ensembles > 0 {
439 tracing::warn!(
440 unresolved_ensembles,
441 ?unresolved_ensemble_sample,
442 known_fragments = fragment_databases.len(),
443 "skip ensembles without resolved database while splitting loaded context"
444 );
445 }
446 debug_assert_eq!(
447 unresolved_ensembles, 0,
448 "all ensembles should be mappable to a database"
449 );
450
451 contexts
452 }
453}
454
455pub async fn load_fragment_context<C>(
458 txn: &C,
459 ensembles: Vec<NoShuffleEnsemble>,
460) -> MetaResult<LoadedFragmentContext>
461where
462 C: ConnectionTrait,
463{
464 if ensembles.is_empty() {
465 return Ok(LoadedFragmentContext::default());
466 }
467
468 let required_fragment_ids: HashSet<_> = ensembles
469 .iter()
470 .flat_map(|ensemble| ensemble.components.iter().copied())
471 .collect();
472
473 let fragment_models = Fragment::find()
474 .filter(fragment::Column::FragmentId.is_in(required_fragment_ids.iter().copied()))
475 .all(txn)
476 .await?;
477
478 let found_fragment_ids: HashSet<_> = fragment_models
479 .iter()
480 .map(|fragment| fragment.fragment_id)
481 .collect();
482
483 if found_fragment_ids.len() != required_fragment_ids.len() {
484 let missing = required_fragment_ids
485 .difference(&found_fragment_ids)
486 .copied()
487 .collect_vec();
488 return Err(anyhow!("fragments {:?} not found", missing).into());
489 }
490
491 let fragment_models: HashMap<_, _> = fragment_models
492 .into_iter()
493 .map(|fragment| (fragment.fragment_id, fragment))
494 .collect();
495
496 let job_ids: HashSet<_> = fragment_models
497 .values()
498 .map(|fragment| fragment.job_id)
499 .collect();
500
501 if job_ids.is_empty() {
502 return Ok(LoadedFragmentContext::default());
503 }
504
505 let jobs: HashMap<_, _> = StreamingJob::find()
506 .filter(streaming_job::Column::JobId.is_in(job_ids.iter().copied().collect_vec()))
507 .all(txn)
508 .await?
509 .into_iter()
510 .map(|job| (job.job_id, job))
511 .collect();
512
513 let found_job_ids: HashSet<_> = jobs.keys().copied().collect();
514 if found_job_ids.len() != job_ids.len() {
515 let missing = job_ids.difference(&found_job_ids).copied().collect_vec();
516 return Err(anyhow!("streaming jobs {:?} not found", missing).into());
517 }
518
519 build_loaded_context(txn, ensembles, fragment_models, jobs).await
520}
521
522pub async fn load_fragment_context_for_jobs<C>(
525 txn: &C,
526 job_ids: HashSet<JobId>,
527) -> MetaResult<LoadedFragmentContext>
528where
529 C: ConnectionTrait,
530{
531 if job_ids.is_empty() {
532 return Ok(LoadedFragmentContext::default());
533 }
534
535 let excluded_fragments_query = FragmentRelation::find()
536 .select_only()
537 .column(fragment_relation::Column::TargetFragmentId)
538 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
539 .into_query();
540
541 let condition = Condition::all()
542 .add(fragment::Column::JobId.is_in(job_ids.clone()))
543 .add(fragment::Column::FragmentId.not_in_subquery(excluded_fragments_query));
544
545 let fragments: Vec<FragmentId> = Fragment::find()
546 .select_only()
547 .column(fragment::Column::FragmentId)
548 .filter(condition)
549 .into_tuple()
550 .all(txn)
551 .await?;
552
553 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragments).await?;
554
555 let fragments = Fragment::find()
556 .filter(
557 fragment::Column::FragmentId.is_in(
558 ensembles
559 .iter()
560 .flat_map(|graph| graph.components.iter())
561 .cloned()
562 .collect_vec(),
563 ),
564 )
565 .all(txn)
566 .await?;
567
568 let fragment_map: HashMap<_, _> = fragments
569 .into_iter()
570 .map(|fragment| (fragment.fragment_id, fragment))
571 .collect();
572
573 let job_ids = fragment_map
574 .values()
575 .map(|fragment| fragment.job_id)
576 .collect::<BTreeSet<_>>()
577 .into_iter()
578 .collect_vec();
579
580 let jobs: HashMap<_, _> = StreamingJob::find()
581 .filter(streaming_job::Column::JobId.is_in(job_ids))
582 .all(txn)
583 .await?
584 .into_iter()
585 .map(|job| (job.job_id, job))
586 .collect();
587
588 build_loaded_context(txn, ensembles, fragment_map, jobs).await
589}
590
591pub(crate) fn render_actor_assignments(
594 actor_id_counter: &AtomicU32,
595 worker_map: &HashMap<WorkerId, WorkerNode>,
596 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
597 loaded: &LoadedFragmentContext,
598) -> MetaResult<RenderedGraph> {
599 let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
600 render_actor_assignments_with_allocator(
601 &mut actor_id_allocator,
602 worker_map,
603 adaptive_parallelism_strategy,
604 loaded,
605 )
606}
607
608pub(crate) fn preview_actor_assignments(
611 worker_map: &HashMap<WorkerId, WorkerNode>,
612 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
613 loaded: &LoadedFragmentContext,
614) -> MetaResult<RenderedGraph> {
615 let mut actor_id_allocator = RenderActorIdAllocator::Preview { next_actor_id: 0 };
616 render_actor_assignments_with_allocator(
617 &mut actor_id_allocator,
618 worker_map,
619 adaptive_parallelism_strategy,
620 loaded,
621 )
622}
623
624pub(crate) fn materialize_actor_assignments(
627 actor_id_counter: &AtomicU32,
628 rendered: RenderedGraph,
629) -> RenderedGraph {
630 let RenderedGraph {
631 fragments,
632 ensembles,
633 } = rendered;
634 #[cfg(debug_assertions)]
635 let preview_slots = collect_relative_actor_slots_by_fragment(&fragments);
636
637 let mut ordered_fragments = fragments
638 .into_iter()
639 .flat_map(|(database_id, jobs)| {
640 jobs.into_iter().flat_map(move |(job_id, fragments)| {
641 fragments.into_iter().map(move |(fragment_id, fragment)| {
642 (database_id, job_id, fragment_id, fragment)
643 })
644 })
645 })
646 .collect_vec();
647 ordered_fragments.sort_by_key(|(_, _, fragment_id, fragment)| {
658 (
659 fragment
660 .actors
661 .keys()
662 .copied()
663 .min()
664 .map(|actor_id| actor_id.as_raw_id())
665 .unwrap_or(u32::MAX),
666 *fragment_id,
667 )
668 });
669
670 let mut materialized = FragmentRenderMap::new();
671 for (database_id, job_id, fragment_id, fragment) in ordered_fragments {
672 materialized
673 .entry(database_id)
674 .or_default()
675 .entry(job_id)
676 .or_default()
677 .insert(
678 fragment_id,
679 materialize_fragment(fragment, actor_id_counter),
680 );
681 }
682
683 #[cfg(debug_assertions)]
684 assert_materialization_preserves_preview_slots(&preview_slots, &materialized);
685
686 RenderedGraph {
687 fragments: materialized,
688 ensembles,
689 }
690}
691
692#[cfg(debug_assertions)]
693type RelativeActorSlot = (u32, WorkerId, Option<Vec<usize>>, Vec<SplitId>);
694
695#[cfg(debug_assertions)]
696fn collect_relative_actor_slots(fragment: &InflightFragmentInfo) -> Vec<RelativeActorSlot> {
697 let base = fragment
698 .actors
699 .keys()
700 .copied()
701 .min()
702 .map(|actor_id| actor_id.as_raw_id())
703 .unwrap_or_default();
704
705 let mut entries = fragment
706 .actors
707 .iter()
708 .map(|(&actor_id, info)| {
709 let idx = actor_id.as_raw_id() - base;
710 let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
711 bitmap
712 .iter_vnodes()
713 .map(|vnode| vnode.to_index())
714 .collect_vec()
715 });
716 let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
717 splits.sort_unstable();
718 (idx, info.worker_id, vnode_bitmap, splits)
719 })
720 .collect_vec();
721 entries.sort_unstable_by_key(|(idx, ..)| *idx);
722 entries
723}
724
725#[cfg(debug_assertions)]
726fn collect_relative_actor_slots_by_fragment(
727 fragments: &FragmentRenderMap,
728) -> HashMap<FragmentId, Vec<RelativeActorSlot>> {
729 fragments
730 .values()
731 .flat_map(|jobs| jobs.values())
732 .flatten()
733 .map(|(fragment_id, fragment)| (*fragment_id, collect_relative_actor_slots(fragment)))
734 .collect()
735}
736
737#[cfg(debug_assertions)]
738fn assert_materialization_preserves_preview_slots(
739 preview_slots: &HashMap<FragmentId, Vec<RelativeActorSlot>>,
740 materialized_fragments: &FragmentRenderMap,
741) {
742 for (database_id, jobs) in materialized_fragments {
743 for (job_id, fragments) in jobs {
744 for (fragment_id, fragment) in fragments {
745 let expected_slots = preview_slots.get(fragment_id).unwrap_or_else(|| {
746 panic!(
747 "preview fragment {fragment_id} for database {database_id}, job {job_id} not found"
748 )
749 });
750
751 assert_eq!(
752 collect_relative_actor_slots(fragment),
753 *expected_slots,
754 "materialization changed preview slot ordering for database {database_id}, job {job_id}, fragment {fragment_id}"
755 );
756 }
757 }
758 }
759}
760
761fn render_actor_assignments_with_allocator(
762 actor_id_allocator: &mut RenderActorIdAllocator<'_>,
763 worker_map: &HashMap<WorkerId, WorkerNode>,
764 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
765 loaded: &LoadedFragmentContext,
766) -> MetaResult<RenderedGraph> {
767 if loaded.is_empty() {
768 return Ok(RenderedGraph::empty());
769 }
770
771 let render_context = RenderActorsContext {
772 fragment_source_ids: &loaded.fragment_source_ids,
773 fragment_splits: &loaded.fragment_splits,
774 streaming_job_databases: &loaded.streaming_job_databases,
775 database_map: &loaded.database_map,
776 };
777
778 let fragments = render_actors_with_allocator(
779 actor_id_allocator,
780 &loaded.ensembles,
781 &loaded.job_fragments,
782 &loaded.job_map,
783 worker_map,
784 adaptive_parallelism_strategy,
785 render_context,
786 )?;
787
788 Ok(RenderedGraph {
789 fragments,
790 ensembles: loaded.ensembles.clone(),
791 })
792}
793
794fn materialize_fragment(
795 mut fragment: InflightFragmentInfo,
796 actor_id_counter: &AtomicU32,
797) -> InflightFragmentInfo {
798 let actor_count = u32::try_from(fragment.actors.len()).expect("actor count exceeds u32::MAX");
799 let actor_id_base: ActorId = actor_id_counter
800 .fetch_add(actor_count, Ordering::Relaxed)
801 .into();
802
803 let mut actors = fragment.actors.into_iter().collect_vec();
804 actors.sort_by_key(|(actor_id, _)| *actor_id);
805
806 fragment.actors = actors
807 .into_iter()
808 .enumerate()
809 .map(|(idx, (_, info))| {
810 let actor_offset = u32::try_from(idx).expect("actor index exceeds u32::MAX");
811 (actor_id_base + actor_offset, info)
812 })
813 .collect();
814
815 fragment
816}
817
818async fn build_loaded_context<C>(
819 txn: &C,
820 ensembles: Vec<NoShuffleEnsemble>,
821 fragment_models: HashMap<FragmentId, fragment::Model>,
822 job_map: HashMap<JobId, streaming_job::Model>,
823) -> MetaResult<LoadedFragmentContext>
824where
825 C: ConnectionTrait,
826{
827 if ensembles.is_empty() {
828 return Ok(LoadedFragmentContext::default());
829 }
830
831 let mut job_fragments: HashMap<JobId, HashMap<FragmentId, LoadedFragment>> = HashMap::new();
832 for (fragment_id, model) in fragment_models {
833 job_fragments
834 .entry(model.job_id)
835 .or_default()
836 .try_insert(fragment_id, LoadedFragment::from(model))
837 .expect("duplicate fragment id for job");
838 }
839
840 #[cfg(debug_assertions)]
841 {
842 debug_sanity_check(&ensembles, &job_fragments, &job_map);
843 }
844
845 let (fragment_source_ids, fragment_splits) =
846 resolve_source_fragments(txn, &job_fragments).await?;
847
848 let job_ids = job_map.keys().copied().collect_vec();
849
850 let streaming_job_databases: HashMap<JobId, _> = StreamingJob::find()
851 .select_only()
852 .column(streaming_job::Column::JobId)
853 .column(object::Column::DatabaseId)
854 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
855 .filter(streaming_job::Column::JobId.is_in(job_ids))
856 .into_tuple()
857 .all(txn)
858 .await?
859 .into_iter()
860 .collect();
861
862 let database_map: HashMap<_, _> = Database::find()
863 .filter(
864 database::Column::DatabaseId
865 .is_in(streaming_job_databases.values().copied().collect_vec()),
866 )
867 .all(txn)
868 .await?
869 .into_iter()
870 .map(|db| (db.database_id, db))
871 .collect();
872
873 Ok(LoadedFragmentContext {
874 ensembles,
875 job_fragments,
876 job_map,
877 streaming_job_databases,
878 database_map,
879 fragment_source_ids,
880 fragment_splits,
881 })
882}
883
884struct RenderActorsContext<'a> {
887 fragment_source_ids: &'a HashMap<FragmentId, SourceId>,
888 fragment_splits: &'a HashMap<FragmentId, Vec<SplitImpl>>,
889 streaming_job_databases: &'a HashMap<JobId, DatabaseId>,
890 database_map: &'a HashMap<DatabaseId, database::Model>,
891}
892
893enum RenderActorIdAllocator<'a> {
894 Persistent(&'a AtomicU32),
895 Preview { next_actor_id: u32 },
896}
897
898impl RenderActorIdAllocator<'_> {
899 fn allocate_block(&mut self, actor_count: u32) -> ActorId {
900 match self {
901 Self::Persistent(actor_id_counter) => {
902 let actor_id_base: ActorId = actor_id_counter
903 .fetch_add(actor_count, Ordering::Relaxed)
904 .into();
905 actor_id_base
906 }
907 Self::Preview { next_actor_id } => {
908 let actor_id_base = *next_actor_id;
909 *next_actor_id = next_actor_id
910 .checked_add(actor_count)
911 .expect("preview actor id overflow");
912 let actor_id_base: ActorId = actor_id_base.into();
913 actor_id_base
914 }
915 }
916 }
917}
918
919#[cfg(test)]
920fn render_actors(
921 actor_id_counter: &AtomicU32,
922 ensembles: &[NoShuffleEnsemble],
923 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
924 job_map: &HashMap<JobId, streaming_job::Model>,
925 worker_map: &HashMap<WorkerId, WorkerNode>,
926 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
927 context: RenderActorsContext<'_>,
928) -> MetaResult<FragmentRenderMap> {
929 let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
930 render_actors_with_allocator(
931 &mut actor_id_allocator,
932 ensembles,
933 job_fragments,
934 job_map,
935 worker_map,
936 adaptive_parallelism_strategy,
937 context,
938 )
939}
940
941fn render_actors_with_allocator(
942 actor_id_allocator: &mut RenderActorIdAllocator<'_>,
943 ensembles: &[NoShuffleEnsemble],
944 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
945 job_map: &HashMap<JobId, streaming_job::Model>,
946 worker_map: &HashMap<WorkerId, WorkerNode>,
947 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
948 context: RenderActorsContext<'_>,
949) -> MetaResult<FragmentRenderMap> {
950 let RenderActorsContext {
951 fragment_source_ids,
952 fragment_splits: fragment_splits_map,
953 streaming_job_databases,
954 database_map,
955 } = context;
956
957 let mut all_fragments: FragmentRenderMap = HashMap::new();
958 let fragment_lookup: HashMap<FragmentId, &LoadedFragment> = job_fragments
959 .values()
960 .flat_map(|fragments| fragments.iter())
961 .map(|(fragment_id, fragment)| (*fragment_id, fragment))
962 .collect();
963
964 for NoShuffleEnsemble {
965 entries,
966 components,
967 } in ensembles
968 {
969 tracing::debug!("rendering ensemble entries {:?}", entries);
970
971 let entry_fragments = entries
972 .iter()
973 .map(|fragment_id| fragment_lookup.get(fragment_id).unwrap())
974 .collect_vec();
975
976 let entry_fragment_parallelism = entry_fragments
977 .iter()
978 .map(|fragment| fragment.parallelism.clone())
979 .dedup()
980 .exactly_one()
981 .map_err(|_| {
982 anyhow!(
983 "entry fragments {:?} have inconsistent parallelism settings",
984 entries.iter().copied().collect_vec()
985 )
986 })?;
987
988 let (job_id, distribution_type, vnode_count) = entry_fragments
989 .iter()
990 .map(|f| (f.job_id, f.distribution_type, f.vnode_count))
991 .dedup()
992 .exactly_one()
993 .map_err(|_| anyhow!("Multiple jobs found in no-shuffle ensemble"))?;
994
995 let job = job_map
996 .get(&job_id)
997 .ok_or_else(|| anyhow!("streaming job {job_id} not found"))?;
998
999 let database_resource_group = streaming_job_databases
1000 .get(&job_id)
1001 .and_then(|database_id| database_map.get(database_id))
1002 .unwrap()
1003 .resource_group
1004 .clone();
1005
1006 let source_entry_fragment = entry_fragments.iter().find(|f| {
1007 let mask = f.fragment_type_mask;
1008 if mask.contains(FragmentTypeFlag::Source) {
1009 assert!(!mask.contains(FragmentTypeFlag::SourceScan))
1010 }
1011 mask.contains(FragmentTypeFlag::Source) && !mask.contains(FragmentTypeFlag::Dml)
1012 });
1013
1014 let actor_template = EnsembleActorTemplate::render_new(
1015 job,
1016 worker_map,
1017 adaptive_parallelism_strategy,
1018 entry_fragment_parallelism,
1019 database_resource_group,
1020 distribution_type,
1021 vnode_count,
1022 )?;
1023
1024 let source_splits = match source_entry_fragment {
1025 Some(entry_fragment) => {
1026 let source_id = fragment_source_ids
1027 .get(&entry_fragment.fragment_id)
1028 .ok_or_else(|| {
1029 anyhow!(
1030 "missing source id in source fragment {}",
1031 entry_fragment.fragment_id
1032 )
1033 })?;
1034
1035 let entry_fragment_id = entry_fragment.fragment_id;
1036
1037 let splits = fragment_splits_map
1038 .get(&entry_fragment_id)
1039 .cloned()
1040 .unwrap_or_default();
1041
1042 let splits: std::collections::BTreeMap<_, _> =
1043 splits.into_iter().map(|s| (s.id(), s)).collect();
1044 let splits = actor_template.assign_splits(entry_fragment_id, splits);
1045 Some((splits, *source_id))
1046 }
1047 None => None,
1048 };
1049
1050 for component_fragment_id in components {
1051 let fragment = fragment_lookup.get(component_fragment_id).unwrap();
1052 let fragment_id = fragment.fragment_id;
1053 let job_id = fragment.job_id;
1054 let fragment_type_mask = fragment.fragment_type_mask;
1055 let distribution_type = fragment.distribution_type;
1056 let stream_node = &fragment.nodes;
1057 let state_table_ids = &fragment.state_table_ids;
1058 let vnode_count = fragment.vnode_count;
1059 let source_id = fragment_source_ids.get(&fragment_id).cloned();
1060
1061 let aligner = ComponentFragmentAligner::new(&actor_template, actor_id_allocator);
1062 let actors = aligner.align_component_actor(distribution_type);
1063 let mut splits = source_id
1064 .map(|source_id| {
1065 let (fragment_splits, shared_source_id) = source_splits.as_ref().unwrap();
1066 assert_eq!(*shared_source_id, source_id);
1067 aligner.align_component_splits(fragment_splits)
1068 })
1069 .unwrap_or_default();
1070
1071 let actors: HashMap<ActorId, InflightActorInfo> = actors
1072 .into_iter()
1073 .map(|(actor_id, (worker_id, vnode_bitmap))| {
1074 (
1075 actor_id,
1076 InflightActorInfo {
1077 worker_id,
1078 vnode_bitmap,
1079 splits: splits.remove(&actor_id).unwrap_or_default(),
1080 },
1081 )
1082 })
1083 .collect();
1084
1085 let fragment = InflightFragmentInfo {
1086 fragment_id,
1087 distribution_type,
1088 fragment_type_mask,
1089 vnode_count,
1090 nodes: stream_node.clone(),
1091 actors,
1092 state_table_ids: state_table_ids.clone(),
1093 };
1094
1095 let &database_id = streaming_job_databases.get(&job_id).ok_or_else(|| {
1096 anyhow!("streaming job {job_id} not found in streaming_job_databases")
1097 })?;
1098
1099 all_fragments
1100 .entry(database_id)
1101 .or_default()
1102 .entry(job_id)
1103 .or_default()
1104 .insert(fragment_id, fragment);
1105 }
1106 }
1107
1108 Ok(all_fragments)
1109}
1110
1111pub(crate) struct EnsembleActorTemplate {
1112 assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>>,
1113 distribution_type: DistributionType,
1114 actor_count: u32,
1115}
1116
1117impl EnsembleActorTemplate {
1118 pub(crate) fn render_new(
1119 job: &streaming_job::Model,
1120 worker_map: &HashMap<WorkerId, WorkerNode>,
1121 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1122 entry_fragment_parallelism: Option<StreamingParallelism>,
1123 database_resource_group: String,
1124 distribution_type: DistributionType,
1125 vnode_count: usize,
1126 ) -> MetaResult<Self> {
1127 let job_id = job.job_id;
1128 let job_strategy = job
1129 .stream_context()
1130 .adaptive_parallelism_strategy
1131 .unwrap_or(adaptive_parallelism_strategy);
1132
1133 let resource_group = match &job.specific_resource_group {
1134 None => database_resource_group,
1135 Some(resource_group) => resource_group.clone(),
1136 };
1137
1138 let available_workers: BTreeMap<WorkerId, NonZeroUsize> = worker_map
1139 .iter()
1140 .filter_map(|(worker_id, worker)| {
1141 if worker
1142 .resource_group()
1143 .as_deref()
1144 .unwrap_or(DEFAULT_RESOURCE_GROUP)
1145 == resource_group.as_str()
1146 {
1147 Some((
1148 *worker_id,
1149 worker
1150 .parallelism()
1151 .expect("should have parallelism for compute node")
1152 .try_into()
1153 .expect("parallelism for compute node"),
1154 ))
1155 } else {
1156 None
1157 }
1158 })
1159 .collect();
1160
1161 let total_parallelism = available_workers.values().map(|w| w.get()).sum::<usize>();
1162
1163 let effective_job_parallelism = if job.job_status != JobStatus::Created {
1164 job.backfill_parallelism
1165 .as_ref()
1166 .unwrap_or(&job.parallelism)
1167 } else {
1168 &job.parallelism
1169 };
1170
1171 let actual_parallelism = match entry_fragment_parallelism
1172 .as_ref()
1173 .unwrap_or(effective_job_parallelism)
1174 {
1175 StreamingParallelism::Adaptive | StreamingParallelism::Custom => {
1176 job_strategy.compute_target_parallelism(total_parallelism)
1177 }
1178 StreamingParallelism::Fixed(n) => *n,
1179 }
1180 .min(vnode_count)
1181 .min(job.max_parallelism as usize);
1182
1183 tracing::debug!(
1184 "job {}, final {} parallelism {:?} total_parallelism {} job_max {} vnode count {} fragment_override {:?}",
1185 job_id,
1186 actual_parallelism,
1187 job.parallelism,
1188 total_parallelism,
1189 job.max_parallelism,
1190 vnode_count,
1191 entry_fragment_parallelism
1192 );
1193
1194 let assigner = AssignerBuilder::new(job_id).build();
1195
1196 let actors = (0..(actual_parallelism as u32)).collect_vec();
1197 let vnodes = (0..vnode_count).collect_vec();
1198
1199 let raw_assignment = assigner.assign_hierarchical(&available_workers, &actors, &vnodes)?;
1200
1201 let assignment = raw_assignment
1202 .into_iter()
1203 .map(|(worker_id, actors)| {
1204 let actors = actors
1205 .into_iter()
1206 .map(|(actor_idx, vnodes)| {
1207 let bitmap = match distribution_type {
1208 DistributionType::Single => None,
1209 DistributionType::Hash => {
1210 Some(Bitmap::from_indices(vnode_count, &vnodes))
1211 }
1212 };
1213 (actor_idx, bitmap)
1214 })
1215 .collect();
1216 (worker_id, actors)
1217 })
1218 .collect();
1219
1220 let actor_count = u32::try_from(actors.len()).expect("actor parallelism exceeds u32::MAX");
1221
1222 Ok(Self {
1223 assignment,
1224 distribution_type,
1225 actor_count,
1226 })
1227 }
1228
1229 pub(crate) fn from_existing_inflight_fragment(fragment: &InflightFragmentInfo) -> Self {
1230 if fragment.actors.is_empty() {
1231 return Self {
1232 assignment: BTreeMap::new(),
1233 distribution_type: fragment.distribution_type,
1234 actor_count: 0,
1235 };
1236 }
1237
1238 let actor_count = fragment.actors.len() as u32;
1239
1240 let mut assignment: BTreeMap<WorkerId, BTreeMap<u32, Option<Bitmap>>> = BTreeMap::new();
1241 for (actor_idx, (&_actor_id, actor_info)) in fragment.actors.iter().enumerate() {
1243 let actor_idx = actor_idx as u32;
1244 assignment
1245 .entry(actor_info.worker_id)
1246 .or_default()
1247 .insert(actor_idx, actor_info.vnode_bitmap.clone());
1248 }
1249
1250 Self {
1251 assignment,
1252 distribution_type: fragment.distribution_type,
1253 actor_count,
1254 }
1255 }
1256
1257 pub(crate) fn assert_aligned_with(
1264 &self,
1265 other: &Self,
1266 self_fragment_id: FragmentId,
1267 other_fragment_id: FragmentId,
1268 ) {
1269 let mapping = resolve_no_shuffle_actor_mapping(
1270 self.distribution_type,
1271 self.assignment
1272 .iter()
1273 .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1274 other.distribution_type,
1275 other
1276 .assignment
1277 .iter()
1278 .flat_map(|(&wid, actors)| actors.iter().map(move |(&idx, bmp)| ((wid, idx), bmp))),
1279 );
1280
1281 for ((self_worker, _self_idx), (other_worker, _other_idx)) in &mapping {
1282 assert_eq!(
1283 self_worker, other_worker,
1284 "fragments {} and {} disagree on worker placement: {:?}",
1285 self_fragment_id, other_fragment_id, mapping,
1286 );
1287 }
1288 }
1289
1290 fn assign_splits(
1291 &self,
1292 entry_fragment_id: FragmentId,
1293 splits: BTreeMap<SplitId, SplitImpl>,
1294 ) -> HashMap<u32, Vec<SplitImpl>> {
1295 {
1296 {
1297 let empty_actor_splits: HashMap<_, _> = self
1298 .assignment
1299 .values()
1300 .flat_map(|actors| actors.keys())
1301 .map(|actor_id| (*actor_id, vec![]))
1302 .collect();
1303
1304 crate::stream::source_manager::reassign_splits(
1305 entry_fragment_id,
1306 empty_actor_splits,
1307 &splits,
1308 SplitDiffOptions::default(),
1309 )
1310 .unwrap_or_default()
1311 }
1312 }
1313 }
1314}
1315
1316pub(crate) struct ComponentFragmentAligner<'a> {
1317 actor_template: &'a EnsembleActorTemplate,
1318 actor_id_base: ActorId,
1319}
1320
1321impl<'a> ComponentFragmentAligner<'a> {
1322 fn new(
1323 actor_template: &'a EnsembleActorTemplate,
1324 actor_id_allocator: &mut RenderActorIdAllocator<'_>,
1325 ) -> Self {
1326 let actor_id_base = actor_id_allocator.allocate_block(actor_template.actor_count);
1327 Self {
1328 actor_template,
1329 actor_id_base,
1330 }
1331 }
1332
1333 pub(crate) fn new_persistent(
1334 actor_template: &'a EnsembleActorTemplate,
1335 actor_id_counter: &AtomicU32,
1336 ) -> Self {
1337 let mut actor_id_allocator = RenderActorIdAllocator::Persistent(actor_id_counter);
1338 Self::new(actor_template, &mut actor_id_allocator)
1339 }
1340
1341 pub(crate) fn align_component_actor(
1342 &self,
1343 distribution_type: DistributionType,
1344 ) -> HashMap<ActorId, (WorkerId, Option<Bitmap>)> {
1345 let EnsembleActorTemplate {
1346 assignment,
1347 actor_count,
1348 distribution_type: _,
1349 } = &self.actor_template;
1350 let actor_id_base = self.actor_id_base;
1351 {
1352 assignment
1353 .iter()
1354 .flat_map(|(worker_id, actors)| {
1355 actors
1356 .iter()
1357 .map(move |(actor_idx, bitmap)| (worker_id, actor_idx, bitmap))
1358 })
1359 .map(|(&worker_id, &actor_idx, bitmap)| {
1360 if distribution_type == DistributionType::Single {
1361 assert_eq!(*actor_count, 1);
1362 }
1363
1364 let actor_id = actor_id_base + actor_idx;
1365
1366 (actor_id, (worker_id, bitmap.clone()))
1367 })
1368 .collect()
1369 }
1370 }
1371
1372 pub(crate) fn align_component_splits(
1373 &self,
1374 split_assignment: &HashMap<u32, Vec<SplitImpl>>,
1375 ) -> HashMap<ActorId, Vec<SplitImpl>> {
1376 (0..self.actor_template.actor_count)
1377 .filter_map(|actor_idx| {
1378 split_assignment
1379 .get(&actor_idx)
1380 .map(|splits| ((self.actor_id_base + actor_idx), splits.clone()))
1381 })
1382 .collect()
1383 }
1384}
1385
1386#[cfg(debug_assertions)]
1387fn debug_sanity_check(
1388 ensembles: &[NoShuffleEnsemble],
1389 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1390 jobs: &HashMap<JobId, streaming_job::Model>,
1391) {
1392 let fragment_lookup: HashMap<FragmentId, (&LoadedFragment, JobId)> = job_fragments
1393 .iter()
1394 .flat_map(|(job_id, fragments)| {
1395 fragments
1396 .iter()
1397 .map(move |(fragment_id, fragment)| (*fragment_id, (fragment, *job_id)))
1398 })
1399 .collect();
1400
1401 debug_assert!(
1403 ensembles
1404 .iter()
1405 .all(|ensemble| ensemble.entries.is_subset(&ensemble.components)),
1406 "entries must be subset of components"
1407 );
1408
1409 let mut missing_fragments = BTreeSet::new();
1410 let mut missing_jobs = BTreeSet::new();
1411
1412 for fragment_id in ensembles
1413 .iter()
1414 .flat_map(|ensemble| ensemble.components.iter())
1415 {
1416 match fragment_lookup.get(fragment_id) {
1417 Some((fragment, job_id)) => {
1418 if !jobs.contains_key(&fragment.job_id) {
1419 missing_jobs.insert(*job_id);
1420 }
1421 }
1422 None => {
1423 missing_fragments.insert(*fragment_id);
1424 }
1425 }
1426 }
1427
1428 debug_assert!(
1429 missing_fragments.is_empty(),
1430 "missing fragments in fragment_map: {:?}",
1431 missing_fragments
1432 );
1433
1434 debug_assert!(
1435 missing_jobs.is_empty(),
1436 "missing jobs for fragments' job_id: {:?}",
1437 missing_jobs
1438 );
1439
1440 for ensemble in ensembles {
1441 let unique_vnode_counts: Vec<_> = ensemble
1442 .components
1443 .iter()
1444 .flat_map(|fragment_id| {
1445 fragment_lookup
1446 .get(fragment_id)
1447 .map(|(fragment, _)| fragment.vnode_count)
1448 })
1449 .unique()
1450 .collect();
1451
1452 debug_assert!(
1453 unique_vnode_counts.len() <= 1,
1454 "components in ensemble must share same vnode_count: ensemble={:?}, vnode_counts={:?}",
1455 ensemble.components,
1456 unique_vnode_counts
1457 );
1458 }
1459}
1460
1461async fn resolve_source_fragments<C>(
1462 txn: &C,
1463 job_fragments: &HashMap<JobId, HashMap<FragmentId, LoadedFragment>>,
1464) -> MetaResult<(
1465 HashMap<FragmentId, SourceId>,
1466 HashMap<FragmentId, Vec<SplitImpl>>,
1467)>
1468where
1469 C: ConnectionTrait,
1470{
1471 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1472 for (fragment_id, fragment) in job_fragments.values().flatten() {
1473 let mask = fragment.fragment_type_mask;
1474 if mask.contains(FragmentTypeFlag::Source)
1475 && let Some(source_id) = fragment.nodes.find_stream_source()
1476 {
1477 source_fragment_ids
1478 .entry(source_id)
1479 .or_default()
1480 .insert(*fragment_id);
1481 }
1482
1483 if mask.contains(FragmentTypeFlag::SourceScan)
1484 && let Some((source_id, _)) = fragment.nodes.find_source_backfill()
1485 {
1486 source_fragment_ids
1487 .entry(source_id)
1488 .or_default()
1489 .insert(*fragment_id);
1490 }
1491 }
1492
1493 let fragment_source_ids: HashMap<_, _> = source_fragment_ids
1494 .iter()
1495 .flat_map(|(source_id, fragment_ids)| {
1496 fragment_ids
1497 .iter()
1498 .map(|fragment_id| (*fragment_id, *source_id))
1499 })
1500 .collect();
1501
1502 let fragment_ids = fragment_source_ids.keys().copied().collect_vec();
1503
1504 let fragment_splits: Vec<_> = FragmentSplits::find()
1505 .filter(fragment_splits::Column::FragmentId.is_in(fragment_ids))
1506 .all(txn)
1507 .await?;
1508
1509 let fragment_splits: HashMap<_, _> = fragment_splits
1510 .into_iter()
1511 .flat_map(|model| {
1512 model.splits.map(|splits| {
1513 (
1514 model.fragment_id,
1515 splits
1516 .to_protobuf()
1517 .splits
1518 .iter()
1519 .flat_map(SplitImpl::try_from)
1520 .collect_vec(),
1521 )
1522 })
1523 })
1524 .collect();
1525
1526 Ok((fragment_source_ids, fragment_splits))
1527}
1528
1529#[derive(Debug)]
1531pub struct ActorGraph<'a> {
1532 pub fragments: &'a HashMap<FragmentId, (Fragment, Vec<StreamActor>)>,
1533 pub locations: &'a HashMap<ActorId, WorkerId>,
1534}
1535
1536#[derive(Debug, Clone)]
1537pub struct NoShuffleEnsemble {
1538 entries: HashSet<FragmentId>,
1539 components: HashSet<FragmentId>,
1540}
1541
1542impl NoShuffleEnsemble {
1543 pub(crate) fn singleton(fragment_id: FragmentId) -> Self {
1545 Self {
1546 entries: HashSet::from_iter([fragment_id]),
1547 components: HashSet::from_iter([fragment_id]),
1548 }
1549 }
1550
1551 #[cfg(test)]
1552 pub fn for_test(
1553 entries: impl IntoIterator<Item = FragmentId>,
1554 components: impl IntoIterator<Item = FragmentId>,
1555 ) -> Self {
1556 let entries = entries.into_iter().collect();
1557 let components = components.into_iter().collect();
1558 Self {
1559 entries,
1560 components,
1561 }
1562 }
1563
1564 pub fn fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1565 self.components.iter().cloned()
1566 }
1567
1568 pub fn entry_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1569 self.entries.iter().copied()
1570 }
1571
1572 pub fn component_fragments(&self) -> impl Iterator<Item = FragmentId> + '_ {
1573 self.components.iter().copied()
1574 }
1575
1576 pub fn contains_entry(&self, fragment_id: &FragmentId) -> bool {
1577 self.entries.contains(fragment_id)
1578 }
1579}
1580
1581pub async fn find_fragment_no_shuffle_dags_detailed(
1582 db: &impl ConnectionTrait,
1583 initial_fragment_ids: &[FragmentId],
1584) -> MetaResult<Vec<NoShuffleEnsemble>> {
1585 let all_no_shuffle_relations: Vec<(_, _)> = FragmentRelation::find()
1586 .columns([
1587 fragment_relation::Column::SourceFragmentId,
1588 fragment_relation::Column::TargetFragmentId,
1589 ])
1590 .filter(fragment_relation::Column::DispatcherType.eq(DispatcherType::NoShuffle))
1591 .into_tuple()
1592 .all(db)
1593 .await?;
1594
1595 let (forward_edges, backward_edges) =
1596 build_no_shuffle_fragment_graph_edges(all_no_shuffle_relations);
1597
1598 find_no_shuffle_graphs(initial_fragment_ids, &forward_edges, &backward_edges)
1599}
1600
1601pub(crate) fn build_no_shuffle_fragment_graph_edges(
1602 relations: impl IntoIterator<Item = (FragmentId, FragmentId)>,
1603) -> (
1604 HashMap<FragmentId, Vec<FragmentId>>,
1605 HashMap<FragmentId, Vec<FragmentId>>,
1606) {
1607 let mut forward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1608 let mut backward_edges: HashMap<FragmentId, HashSet<FragmentId>> = HashMap::new();
1609
1610 for (src, dst) in relations {
1611 forward_edges.entry(src).or_default().insert(dst);
1612 backward_edges.entry(dst).or_default().insert(src);
1613 }
1614
1615 let forward_edges = forward_edges
1616 .into_iter()
1617 .map(|(src, dst_set)| (src, dst_set.into_iter().collect()))
1618 .collect();
1619 let backward_edges = backward_edges
1620 .into_iter()
1621 .map(|(dst, src_set)| (dst, src_set.into_iter().collect()))
1622 .collect();
1623
1624 (forward_edges, backward_edges)
1625}
1626
1627pub(crate) fn find_no_shuffle_graphs(
1628 initial_fragment_ids: &[impl Into<FragmentId> + Copy],
1629 forward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1630 backward_edges: &HashMap<FragmentId, Vec<FragmentId>>,
1631) -> MetaResult<Vec<NoShuffleEnsemble>> {
1632 let mut graphs: Vec<NoShuffleEnsemble> = Vec::new();
1633 let mut globally_visited: HashSet<FragmentId> = HashSet::new();
1634
1635 for &init_id in initial_fragment_ids {
1636 let init_id = init_id.into();
1637 if globally_visited.contains(&init_id) {
1638 continue;
1639 }
1640
1641 let mut components = HashSet::new();
1643 let mut queue: VecDeque<FragmentId> = VecDeque::new();
1644
1645 queue.push_back(init_id);
1646 globally_visited.insert(init_id);
1647
1648 while let Some(current_id) = queue.pop_front() {
1649 components.insert(current_id);
1650 let neighbors = forward_edges
1651 .get(¤t_id)
1652 .into_iter()
1653 .flatten()
1654 .chain(backward_edges.get(¤t_id).into_iter().flatten());
1655
1656 for &neighbor_id in neighbors {
1657 if globally_visited.insert(neighbor_id) {
1658 queue.push_back(neighbor_id);
1659 }
1660 }
1661 }
1662
1663 let mut entries = HashSet::new();
1665 for &node_id in &components {
1666 let is_root = match backward_edges.get(&node_id) {
1667 Some(parents) => parents.iter().all(|p| !components.contains(p)),
1668 None => true,
1669 };
1670 if is_root {
1671 entries.insert(node_id);
1672 }
1673 }
1674
1675 if !entries.is_empty() {
1677 graphs.push(NoShuffleEnsemble {
1678 entries,
1679 components,
1680 });
1681 }
1682 }
1683
1684 Ok(graphs)
1685}
1686
1687#[cfg(test)]
1688mod tests {
1689 use std::collections::{BTreeSet, HashMap, HashSet};
1690 use std::sync::Arc;
1691
1692 use risingwave_connector::source::SplitImpl;
1693 use risingwave_connector::source::test_source::TestSourceSplit;
1694 use risingwave_meta_model::{CreateType, JobStatus};
1695 use risingwave_pb::common::WorkerType;
1696 use risingwave_pb::common::worker_node::Property as WorkerProperty;
1697 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
1698
1699 use super::*;
1700
1701 type Edges = (
1704 HashMap<FragmentId, Vec<FragmentId>>,
1705 HashMap<FragmentId, Vec<FragmentId>>,
1706 );
1707
1708 fn build_edges(relations: &[(u32, u32)]) -> Edges {
1711 let mut forward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1712 let mut backward_edges: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1713 for &(src, dst) in relations {
1714 forward_edges
1715 .entry(src.into())
1716 .or_default()
1717 .push(dst.into());
1718 backward_edges
1719 .entry(dst.into())
1720 .or_default()
1721 .push(src.into());
1722 }
1723 (forward_edges, backward_edges)
1724 }
1725
1726 fn to_hashset(ids: &[u32]) -> HashSet<FragmentId> {
1728 ids.iter().map(|id| (*id).into()).collect()
1729 }
1730
1731 fn build_fragment(
1732 fragment_id: FragmentId,
1733 job_id: JobId,
1734 fragment_type_mask: i32,
1735 distribution_type: DistributionType,
1736 vnode_count: i32,
1737 parallelism: StreamingParallelism,
1738 ) -> LoadedFragment {
1739 LoadedFragment {
1740 fragment_id,
1741 job_id,
1742 fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1743 distribution_type,
1744 vnode_count: vnode_count as usize,
1745 nodes: PbStreamNode::default(),
1746 state_table_ids: HashSet::new(),
1747 parallelism: Some(parallelism),
1748 }
1749 }
1750
1751 type ActorState = (ActorId, WorkerId, Option<Vec<usize>>, Vec<String>);
1752
1753 fn collect_actor_state(fragment: &InflightFragmentInfo) -> Vec<ActorState> {
1754 let base = fragment.actors.keys().copied().min().unwrap_or_default();
1755
1756 let mut entries: Vec<_> = fragment
1757 .actors
1758 .iter()
1759 .map(|(&actor_id, info)| {
1760 let idx = actor_id.as_raw_id() - base.as_raw_id();
1761 let vnode_indices = info.vnode_bitmap.as_ref().map(|bitmap| {
1762 bitmap
1763 .iter()
1764 .enumerate()
1765 .filter_map(|(pos, is_set)| is_set.then_some(pos))
1766 .collect::<Vec<_>>()
1767 });
1768 let splits = info
1769 .splits
1770 .iter()
1771 .map(|split| split.id().to_string())
1772 .collect::<Vec<_>>();
1773 (idx.into(), info.worker_id, vnode_indices, splits)
1774 })
1775 .collect();
1776
1777 entries.sort_by_key(|(idx, _, _, _)| *idx);
1778 entries
1779 }
1780
1781 fn build_worker_node(
1782 id: impl Into<WorkerId>,
1783 parallelism: usize,
1784 resource_group: &str,
1785 ) -> WorkerNode {
1786 WorkerNode {
1787 id: id.into(),
1788 r#type: WorkerType::ComputeNode as i32,
1789 property: Some(WorkerProperty {
1790 is_streaming: true,
1791 parallelism: u32::try_from(parallelism).expect("parallelism fits into u32"),
1792 resource_group: Some(resource_group.to_owned()),
1793 ..Default::default()
1794 }),
1795 ..Default::default()
1796 }
1797 }
1798
1799 #[test]
1800 fn test_single_linear_chain() {
1801 let (forward, backward) = build_edges(&[(1, 2), (2, 3)]);
1804 let initial_ids = &[2];
1805
1806 let result = find_no_shuffle_graphs(initial_ids, &forward, &backward);
1808
1809 assert!(result.is_ok());
1811 let graphs = result.unwrap();
1812
1813 assert_eq!(graphs.len(), 1);
1814 let graph = &graphs[0];
1815 assert_eq!(graph.entries, to_hashset(&[1]));
1816 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1817 }
1818
1819 #[test]
1820 fn test_two_disconnected_graphs() {
1821 let (forward, backward) = build_edges(&[(1, 2), (10, 11)]);
1824 let initial_ids = &[2, 10];
1825
1826 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1828
1829 assert_eq!(graphs.len(), 2);
1831
1832 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1834
1835 assert_eq!(graphs[0].entries, to_hashset(&[1]));
1837 assert_eq!(graphs[0].components, to_hashset(&[1, 2]));
1838
1839 assert_eq!(graphs[1].entries, to_hashset(&[10]));
1841 assert_eq!(graphs[1].components, to_hashset(&[10, 11]));
1842 }
1843
1844 #[test]
1845 fn test_multiple_entries_in_one_graph() {
1846 let (forward, backward) = build_edges(&[(1, 3), (2, 3)]);
1848 let initial_ids = &[3];
1849
1850 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1852
1853 assert_eq!(graphs.len(), 1);
1855 let graph = &graphs[0];
1856 assert_eq!(graph.entries, to_hashset(&[1, 2]));
1857 assert_eq!(graph.components, to_hashset(&[1, 2, 3]));
1858 }
1859
1860 #[test]
1861 fn test_diamond_shape_graph() {
1862 let (forward, backward) = build_edges(&[(1, 2), (1, 3), (2, 4), (3, 4)]);
1864 let initial_ids = &[4];
1865
1866 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1868
1869 assert_eq!(graphs.len(), 1);
1871 let graph = &graphs[0];
1872 assert_eq!(graph.entries, to_hashset(&[1]));
1873 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1874 }
1875
1876 #[test]
1877 fn test_starting_with_multiple_nodes_in_same_graph() {
1878 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 4)]);
1881 let initial_ids = &[2, 4];
1882
1883 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1885
1886 assert_eq!(graphs.len(), 1);
1888 let graph = &graphs[0];
1889 assert_eq!(graph.entries, to_hashset(&[1]));
1890 assert_eq!(graph.components, to_hashset(&[1, 2, 3, 4]));
1891 }
1892
1893 #[test]
1894 fn test_empty_initial_ids() {
1895 let (forward, backward) = build_edges(&[(1, 2)]);
1897 let initial_ids: &[u32] = &[];
1898
1899 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1901
1902 assert!(graphs.is_empty());
1904 }
1905
1906 #[test]
1907 fn test_isolated_node_as_input() {
1908 let (forward, backward) = build_edges(&[(1, 2)]);
1910 let initial_ids = &[100];
1911
1912 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1914
1915 assert_eq!(graphs.len(), 1);
1917 let graph = &graphs[0];
1918 assert_eq!(graph.entries, to_hashset(&[100]));
1919 assert_eq!(graph.components, to_hashset(&[100]));
1920 }
1921
1922 #[test]
1923 fn test_graph_with_a_cycle() {
1924 let (forward, backward) = build_edges(&[(1, 2), (2, 3), (3, 1)]);
1929 let initial_ids = &[2];
1930
1931 let graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1933
1934 assert!(
1936 graphs.is_empty(),
1937 "A graph with no entries should not be returned"
1938 );
1939 }
1940 #[test]
1941 fn test_custom_complex() {
1942 let (forward, backward) = build_edges(&[(1, 3), (1, 8), (2, 3), (4, 3), (3, 5), (6, 7)]);
1943 let initial_ids = &[1, 2, 4, 6];
1944
1945 let mut graphs = find_no_shuffle_graphs(initial_ids, &forward, &backward).unwrap();
1947
1948 assert_eq!(graphs.len(), 2);
1950 graphs.sort_by_key(|g| *g.components.iter().min().unwrap_or(&0.into()));
1952
1953 assert_eq!(graphs[0].entries, to_hashset(&[1, 2, 4]));
1955 assert_eq!(graphs[0].components, to_hashset(&[1, 2, 3, 4, 5, 8]));
1956
1957 assert_eq!(graphs[1].entries, to_hashset(&[6]));
1959 assert_eq!(graphs[1].components, to_hashset(&[6, 7]));
1960 }
1961
1962 #[test]
1963 fn render_actors_increments_actor_counter() {
1964 let actor_id_counter = AtomicU32::new(100);
1965 let fragment_id: FragmentId = 1.into();
1966 let job_id: JobId = 10.into();
1967 let database_id: DatabaseId = DatabaseId::new(3);
1968
1969 let fragment_model = build_fragment(
1970 fragment_id,
1971 job_id,
1972 0,
1973 DistributionType::Single,
1974 1,
1975 StreamingParallelism::Fixed(1),
1976 );
1977
1978 let job_model = streaming_job::Model {
1979 job_id,
1980 job_status: JobStatus::Created,
1981 create_type: CreateType::Foreground,
1982 timezone: None,
1983 config_override: None,
1984 adaptive_parallelism_strategy: None,
1985 parallelism: StreamingParallelism::Fixed(1),
1986 backfill_parallelism: None,
1987 backfill_orders: None,
1988 max_parallelism: 1,
1989 specific_resource_group: None,
1990 is_serverless_backfill: false,
1991 };
1992
1993 let database_model = database::Model {
1994 database_id,
1995 name: "test_db".into(),
1996 resource_group: "rg-a".into(),
1997 barrier_interval_ms: None,
1998 checkpoint_frequency: None,
1999 };
2000
2001 let ensembles = vec![NoShuffleEnsemble {
2002 entries: HashSet::from([fragment_id]),
2003 components: HashSet::from([fragment_id]),
2004 }];
2005
2006 let fragment_map = HashMap::from([(fragment_id, fragment_model)]);
2007 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2008 let job_map = HashMap::from([(job_id, job_model)]);
2009
2010 let worker_map: HashMap<WorkerId, WorkerNode> =
2011 HashMap::from([(1.into(), build_worker_node(1, 1, "rg-a"))]);
2012
2013 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2014 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2015 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2016 let database_map = HashMap::from([(database_id, database_model)]);
2017
2018 let context = RenderActorsContext {
2019 fragment_source_ids: &fragment_source_ids,
2020 fragment_splits: &fragment_splits,
2021 streaming_job_databases: &streaming_job_databases,
2022 database_map: &database_map,
2023 };
2024
2025 let result = render_actors(
2026 &actor_id_counter,
2027 &ensembles,
2028 &job_fragments,
2029 &job_map,
2030 &worker_map,
2031 AdaptiveParallelismStrategy::Auto,
2032 context,
2033 )
2034 .expect("actor rendering succeeds");
2035
2036 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2037 assert_eq!(state.len(), 1);
2038 assert!(
2039 state[0].2.is_none(),
2040 "single distribution should not assign vnode bitmaps"
2041 );
2042 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 101);
2043 }
2044
2045 #[test]
2046 fn render_actors_aligns_hash_vnode_bitmaps() {
2047 let actor_id_counter = AtomicU32::new(0);
2048 let entry_fragment_id: FragmentId = 1.into();
2049 let downstream_fragment_id: FragmentId = 2.into();
2050 let job_id: JobId = 20.into();
2051 let database_id: DatabaseId = DatabaseId::new(5);
2052
2053 let entry_fragment = build_fragment(
2054 entry_fragment_id,
2055 job_id,
2056 0,
2057 DistributionType::Hash,
2058 4,
2059 StreamingParallelism::Fixed(2),
2060 );
2061
2062 let downstream_fragment = build_fragment(
2063 downstream_fragment_id,
2064 job_id,
2065 0,
2066 DistributionType::Hash,
2067 4,
2068 StreamingParallelism::Fixed(2),
2069 );
2070
2071 let job_model = streaming_job::Model {
2072 job_id,
2073 job_status: JobStatus::Created,
2074 create_type: CreateType::Background,
2075 timezone: None,
2076 config_override: None,
2077 adaptive_parallelism_strategy: None,
2078 parallelism: StreamingParallelism::Fixed(2),
2079 backfill_parallelism: None,
2080 backfill_orders: None,
2081 max_parallelism: 2,
2082 specific_resource_group: None,
2083 is_serverless_backfill: false,
2084 };
2085
2086 let database_model = database::Model {
2087 database_id,
2088 name: "test_db_hash".into(),
2089 resource_group: "rg-hash".into(),
2090 barrier_interval_ms: None,
2091 checkpoint_frequency: None,
2092 };
2093
2094 let ensembles = vec![NoShuffleEnsemble {
2095 entries: HashSet::from([entry_fragment_id]),
2096 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2097 }];
2098
2099 let fragment_map = HashMap::from([
2100 (entry_fragment_id, entry_fragment),
2101 (downstream_fragment_id, downstream_fragment),
2102 ]);
2103 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2104 let job_map = HashMap::from([(job_id, job_model)]);
2105
2106 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2107 (1.into(), build_worker_node(1, 1, "rg-hash")),
2108 (2.into(), build_worker_node(2, 1, "rg-hash")),
2109 ]);
2110
2111 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2112 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2113 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2114 let database_map = HashMap::from([(database_id, database_model)]);
2115
2116 let context = RenderActorsContext {
2117 fragment_source_ids: &fragment_source_ids,
2118 fragment_splits: &fragment_splits,
2119 streaming_job_databases: &streaming_job_databases,
2120 database_map: &database_map,
2121 };
2122
2123 let result = render_actors(
2124 &actor_id_counter,
2125 &ensembles,
2126 &job_fragments,
2127 &job_map,
2128 &worker_map,
2129 AdaptiveParallelismStrategy::Auto,
2130 context,
2131 )
2132 .expect("actor rendering succeeds");
2133
2134 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2135 let downstream_state =
2136 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2137
2138 assert_eq!(entry_state.len(), 2);
2139 assert_eq!(entry_state, downstream_state);
2140
2141 let assigned_vnodes: BTreeSet<_> = entry_state
2142 .iter()
2143 .flat_map(|(_, _, vnodes, _)| {
2144 vnodes
2145 .as_ref()
2146 .expect("hash distribution should populate vnode bitmap")
2147 .iter()
2148 .copied()
2149 })
2150 .collect();
2151 assert_eq!(assigned_vnodes, BTreeSet::from([0, 1, 2, 3]));
2152 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2153 }
2154
2155 #[test]
2156 fn render_actors_propagates_source_splits() {
2157 let actor_id_counter = AtomicU32::new(0);
2158 let entry_fragment_id: FragmentId = 11.into();
2159 let downstream_fragment_id: FragmentId = 12.into();
2160 let job_id: JobId = 30.into();
2161 let database_id: DatabaseId = DatabaseId::new(7);
2162 let source_id: SourceId = 99.into();
2163
2164 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2165 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2166
2167 let entry_fragment = build_fragment(
2168 entry_fragment_id,
2169 job_id,
2170 source_mask,
2171 DistributionType::Hash,
2172 4,
2173 StreamingParallelism::Fixed(2),
2174 );
2175
2176 let downstream_fragment = build_fragment(
2177 downstream_fragment_id,
2178 job_id,
2179 source_scan_mask,
2180 DistributionType::Hash,
2181 4,
2182 StreamingParallelism::Fixed(2),
2183 );
2184
2185 let job_model = streaming_job::Model {
2186 job_id,
2187 job_status: JobStatus::Created,
2188 create_type: CreateType::Background,
2189 timezone: None,
2190 config_override: None,
2191 adaptive_parallelism_strategy: None,
2192 parallelism: StreamingParallelism::Fixed(2),
2193 backfill_parallelism: None,
2194 backfill_orders: None,
2195 max_parallelism: 2,
2196 specific_resource_group: None,
2197 is_serverless_backfill: false,
2198 };
2199
2200 let database_model = database::Model {
2201 database_id,
2202 name: "split_db".into(),
2203 resource_group: "rg-source".into(),
2204 barrier_interval_ms: None,
2205 checkpoint_frequency: None,
2206 };
2207
2208 let ensembles = vec![NoShuffleEnsemble {
2209 entries: HashSet::from([entry_fragment_id]),
2210 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2211 }];
2212
2213 let fragment_map = HashMap::from([
2214 (entry_fragment_id, entry_fragment),
2215 (downstream_fragment_id, downstream_fragment),
2216 ]);
2217 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2218 let job_map = HashMap::from([(job_id, job_model)]);
2219
2220 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2221 (1.into(), build_worker_node(1, 1, "rg-source")),
2222 (2.into(), build_worker_node(2, 1, "rg-source")),
2223 ]);
2224
2225 let split_a = SplitImpl::Test(TestSourceSplit {
2226 id: Arc::<str>::from("split-a"),
2227 properties: HashMap::new(),
2228 offset: "0".into(),
2229 });
2230 let split_b = SplitImpl::Test(TestSourceSplit {
2231 id: Arc::<str>::from("split-b"),
2232 properties: HashMap::new(),
2233 offset: "0".into(),
2234 });
2235
2236 let fragment_source_ids = HashMap::from([
2237 (entry_fragment_id, source_id),
2238 (downstream_fragment_id, source_id),
2239 ]);
2240 let fragment_splits =
2241 HashMap::from([(entry_fragment_id, vec![split_a.clone(), split_b.clone()])]);
2242 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2243 let database_map = HashMap::from([(database_id, database_model)]);
2244
2245 let context = RenderActorsContext {
2246 fragment_source_ids: &fragment_source_ids,
2247 fragment_splits: &fragment_splits,
2248 streaming_job_databases: &streaming_job_databases,
2249 database_map: &database_map,
2250 };
2251
2252 let result = render_actors(
2253 &actor_id_counter,
2254 &ensembles,
2255 &job_fragments,
2256 &job_map,
2257 &worker_map,
2258 AdaptiveParallelismStrategy::Auto,
2259 context,
2260 )
2261 .expect("actor rendering succeeds");
2262
2263 let entry_state = collect_actor_state(&result[&database_id][&job_id][&entry_fragment_id]);
2264 let downstream_state =
2265 collect_actor_state(&result[&database_id][&job_id][&downstream_fragment_id]);
2266
2267 assert_eq!(entry_state, downstream_state);
2268
2269 let split_ids: BTreeSet<_> = entry_state
2270 .iter()
2271 .flat_map(|(_, _, _, splits)| splits.iter().cloned())
2272 .collect();
2273 assert_eq!(
2274 split_ids,
2275 BTreeSet::from([split_a.id().to_string(), split_b.id().to_string()])
2276 );
2277 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 4);
2278 }
2279
2280 #[test]
2281 fn preview_actor_assignments_defers_actor_id_allocation() {
2282 let actor_id_counter = AtomicU32::new(100);
2283 let entry_fragment_id: FragmentId = 11.into();
2284 let downstream_fragment_id: FragmentId = 12.into();
2285 let job_id: JobId = 30.into();
2286 let database_id: DatabaseId = DatabaseId::new(7);
2287 let source_id: SourceId = 99.into();
2288
2289 let source_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::Source]) as i32;
2290 let source_scan_mask = FragmentTypeFlag::raw_flag([FragmentTypeFlag::SourceScan]) as i32;
2291
2292 let entry_fragment = build_fragment(
2293 entry_fragment_id,
2294 job_id,
2295 source_mask,
2296 DistributionType::Hash,
2297 4,
2298 StreamingParallelism::Fixed(2),
2299 );
2300
2301 let downstream_fragment = build_fragment(
2302 downstream_fragment_id,
2303 job_id,
2304 source_scan_mask,
2305 DistributionType::Hash,
2306 4,
2307 StreamingParallelism::Fixed(2),
2308 );
2309
2310 let job_model = streaming_job::Model {
2311 job_id,
2312 job_status: JobStatus::Created,
2313 create_type: CreateType::Background,
2314 timezone: None,
2315 config_override: None,
2316 adaptive_parallelism_strategy: None,
2317 parallelism: StreamingParallelism::Fixed(2),
2318 backfill_parallelism: None,
2319 backfill_orders: None,
2320 max_parallelism: 2,
2321 specific_resource_group: None,
2322 is_serverless_backfill: false,
2323 };
2324
2325 let database_model = database::Model {
2326 database_id,
2327 name: "preview_db".into(),
2328 resource_group: "rg-source".into(),
2329 barrier_interval_ms: None,
2330 checkpoint_frequency: None,
2331 };
2332
2333 let ensembles = vec![NoShuffleEnsemble {
2334 entries: HashSet::from([entry_fragment_id]),
2335 components: HashSet::from([entry_fragment_id, downstream_fragment_id]),
2336 }];
2337
2338 let fragment_map = HashMap::from([
2339 (entry_fragment_id, entry_fragment),
2340 (downstream_fragment_id, downstream_fragment),
2341 ]);
2342 let job_fragments = HashMap::from([(job_id, fragment_map)]);
2343 let job_map = HashMap::from([(job_id, job_model)]);
2344
2345 let worker_map: HashMap<WorkerId, WorkerNode> = HashMap::from([
2346 (1.into(), build_worker_node(1, 1, "rg-source")),
2347 (2.into(), build_worker_node(2, 1, "rg-source")),
2348 ]);
2349
2350 let split_a = SplitImpl::Test(TestSourceSplit {
2351 id: Arc::<str>::from("split-a"),
2352 properties: HashMap::new(),
2353 offset: "0".into(),
2354 });
2355 let split_b = SplitImpl::Test(TestSourceSplit {
2356 id: Arc::<str>::from("split-b"),
2357 properties: HashMap::new(),
2358 offset: "0".into(),
2359 });
2360
2361 let loaded = LoadedFragmentContext {
2362 ensembles,
2363 job_fragments,
2364 job_map,
2365 streaming_job_databases: HashMap::from([(job_id, database_id)]),
2366 database_map: HashMap::from([(database_id, database_model)]),
2367 fragment_source_ids: HashMap::from([
2368 (entry_fragment_id, source_id),
2369 (downstream_fragment_id, source_id),
2370 ]),
2371 fragment_splits: HashMap::from([(entry_fragment_id, vec![split_a, split_b])]),
2372 };
2373
2374 let preview =
2375 preview_actor_assignments(&worker_map, AdaptiveParallelismStrategy::Auto, &loaded)
2376 .expect("preview rendering succeeds");
2377 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 100);
2378
2379 let preview_entry_state =
2380 collect_actor_state(&preview.fragments[&database_id][&job_id][&entry_fragment_id]);
2381 let preview_downstream_state =
2382 collect_actor_state(&preview.fragments[&database_id][&job_id][&downstream_fragment_id]);
2383 assert_eq!(preview_entry_state, preview_downstream_state);
2384
2385 let materialized = materialize_actor_assignments(&actor_id_counter, preview);
2386
2387 let materialized_entry_state =
2388 collect_actor_state(&materialized.fragments[&database_id][&job_id][&entry_fragment_id]);
2389 let materialized_downstream_state = collect_actor_state(
2390 &materialized.fragments[&database_id][&job_id][&downstream_fragment_id],
2391 );
2392
2393 assert_eq!(materialized_entry_state, materialized_downstream_state);
2394 assert_eq!(materialized_entry_state, preview_entry_state);
2395 assert_eq!(materialized_downstream_state, preview_downstream_state);
2396 assert_eq!(actor_id_counter.load(Ordering::Relaxed), 104);
2397 }
2398
2399 #[test]
2401 fn render_actors_job_strategy_overrides_global() {
2402 let actor_id_counter = AtomicU32::new(0);
2403 let fragment_id: FragmentId = 1.into();
2404 let job_id: JobId = 100.into();
2405 let database_id: DatabaseId = DatabaseId::new(10);
2406
2407 let fragment_model = build_fragment(
2409 fragment_id,
2410 job_id,
2411 0,
2412 DistributionType::Hash,
2413 8,
2414 StreamingParallelism::Adaptive,
2415 );
2416
2417 let job_model = streaming_job::Model {
2419 job_id,
2420 job_status: JobStatus::Created,
2421 create_type: CreateType::Foreground,
2422 timezone: None,
2423 config_override: None,
2424 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2425 parallelism: StreamingParallelism::Adaptive,
2426 backfill_parallelism: None,
2427 backfill_orders: None,
2428 max_parallelism: 8,
2429 specific_resource_group: None,
2430 is_serverless_backfill: false,
2431 };
2432
2433 let database_model = database::Model {
2434 database_id,
2435 name: "test_db".into(),
2436 resource_group: "default".into(),
2437 barrier_interval_ms: None,
2438 checkpoint_frequency: None,
2439 };
2440
2441 let ensembles = vec![NoShuffleEnsemble {
2442 entries: HashSet::from([fragment_id]),
2443 components: HashSet::from([fragment_id]),
2444 }];
2445
2446 let fragment_map =
2447 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2448 let job_map = HashMap::from([(job_id, job_model)]);
2449
2450 let worker_map = HashMap::from([
2452 (1.into(), build_worker_node(1, 1, "default")),
2453 (2.into(), build_worker_node(2, 1, "default")),
2454 (3.into(), build_worker_node(3, 1, "default")),
2455 (4.into(), build_worker_node(4, 1, "default")),
2456 ]);
2457
2458 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2459 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2460 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2461 let database_map = HashMap::from([(database_id, database_model)]);
2462
2463 let context = RenderActorsContext {
2464 fragment_source_ids: &fragment_source_ids,
2465 fragment_splits: &fragment_splits,
2466 streaming_job_databases: &streaming_job_databases,
2467 database_map: &database_map,
2468 };
2469
2470 let result = render_actors(
2472 &actor_id_counter,
2473 &ensembles,
2474 &fragment_map,
2475 &job_map,
2476 &worker_map,
2477 AdaptiveParallelismStrategy::Full,
2478 context,
2479 )
2480 .expect("actor rendering succeeds");
2481
2482 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2483 assert_eq!(
2485 state.len(),
2486 2,
2487 "Job strategy BOUNDED(2) should override global FULL"
2488 );
2489 }
2490
2491 #[test]
2493 fn render_actors_uses_global_strategy_when_job_has_none() {
2494 let actor_id_counter = AtomicU32::new(0);
2495 let fragment_id: FragmentId = 1.into();
2496 let job_id: JobId = 101.into();
2497 let database_id: DatabaseId = DatabaseId::new(11);
2498
2499 let fragment_model = build_fragment(
2500 fragment_id,
2501 job_id,
2502 0,
2503 DistributionType::Hash,
2504 8,
2505 StreamingParallelism::Adaptive,
2506 );
2507
2508 let job_model = streaming_job::Model {
2510 job_id,
2511 job_status: JobStatus::Created,
2512 create_type: CreateType::Foreground,
2513 timezone: None,
2514 config_override: None,
2515 adaptive_parallelism_strategy: None, parallelism: StreamingParallelism::Adaptive,
2517 backfill_parallelism: None,
2518 backfill_orders: None,
2519 max_parallelism: 8,
2520 specific_resource_group: None,
2521 is_serverless_backfill: false,
2522 };
2523
2524 let database_model = database::Model {
2525 database_id,
2526 name: "test_db".into(),
2527 resource_group: "default".into(),
2528 barrier_interval_ms: None,
2529 checkpoint_frequency: None,
2530 };
2531
2532 let ensembles = vec![NoShuffleEnsemble {
2533 entries: HashSet::from([fragment_id]),
2534 components: HashSet::from([fragment_id]),
2535 }];
2536
2537 let fragment_map =
2538 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2539 let job_map = HashMap::from([(job_id, job_model)]);
2540
2541 let worker_map = HashMap::from([
2543 (1.into(), build_worker_node(1, 1, "default")),
2544 (2.into(), build_worker_node(2, 1, "default")),
2545 (3.into(), build_worker_node(3, 1, "default")),
2546 (4.into(), build_worker_node(4, 1, "default")),
2547 ]);
2548
2549 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2550 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2551 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2552 let database_map = HashMap::from([(database_id, database_model)]);
2553
2554 let context = RenderActorsContext {
2555 fragment_source_ids: &fragment_source_ids,
2556 fragment_splits: &fragment_splits,
2557 streaming_job_databases: &streaming_job_databases,
2558 database_map: &database_map,
2559 };
2560
2561 let result = render_actors(
2563 &actor_id_counter,
2564 &ensembles,
2565 &fragment_map,
2566 &job_map,
2567 &worker_map,
2568 AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(3).unwrap()),
2569 context,
2570 )
2571 .expect("actor rendering succeeds");
2572
2573 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2574 assert_eq!(
2576 state.len(),
2577 3,
2578 "Should use global strategy BOUNDED(3) when job has no custom strategy"
2579 );
2580 }
2581
2582 #[test]
2584 fn render_actors_fixed_parallelism_ignores_strategy() {
2585 let actor_id_counter = AtomicU32::new(0);
2586 let fragment_id: FragmentId = 1.into();
2587 let job_id: JobId = 102.into();
2588 let database_id: DatabaseId = DatabaseId::new(12);
2589
2590 let fragment_model = build_fragment(
2592 fragment_id,
2593 job_id,
2594 0,
2595 DistributionType::Hash,
2596 8,
2597 StreamingParallelism::Fixed(5),
2598 );
2599
2600 let job_model = streaming_job::Model {
2602 job_id,
2603 job_status: JobStatus::Created,
2604 create_type: CreateType::Foreground,
2605 timezone: None,
2606 config_override: None,
2607 adaptive_parallelism_strategy: Some("BOUNDED(2)".to_owned()),
2608 parallelism: StreamingParallelism::Fixed(5),
2609 backfill_parallelism: None,
2610 backfill_orders: None,
2611 max_parallelism: 8,
2612 specific_resource_group: None,
2613 is_serverless_backfill: false,
2614 };
2615
2616 let database_model = database::Model {
2617 database_id,
2618 name: "test_db".into(),
2619 resource_group: "default".into(),
2620 barrier_interval_ms: None,
2621 checkpoint_frequency: None,
2622 };
2623
2624 let ensembles = vec![NoShuffleEnsemble {
2625 entries: HashSet::from([fragment_id]),
2626 components: HashSet::from([fragment_id]),
2627 }];
2628
2629 let fragment_map =
2630 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2631 let job_map = HashMap::from([(job_id, job_model)]);
2632
2633 let worker_map = HashMap::from([
2635 (1.into(), build_worker_node(1, 1, "default")),
2636 (2.into(), build_worker_node(2, 1, "default")),
2637 (3.into(), build_worker_node(3, 1, "default")),
2638 (4.into(), build_worker_node(4, 1, "default")),
2639 (5.into(), build_worker_node(5, 1, "default")),
2640 (6.into(), build_worker_node(6, 1, "default")),
2641 ]);
2642
2643 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2644 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2645 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2646 let database_map = HashMap::from([(database_id, database_model)]);
2647
2648 let context = RenderActorsContext {
2649 fragment_source_ids: &fragment_source_ids,
2650 fragment_splits: &fragment_splits,
2651 streaming_job_databases: &streaming_job_databases,
2652 database_map: &database_map,
2653 };
2654
2655 let result = render_actors(
2656 &actor_id_counter,
2657 &ensembles,
2658 &fragment_map,
2659 &job_map,
2660 &worker_map,
2661 AdaptiveParallelismStrategy::Full,
2662 context,
2663 )
2664 .expect("actor rendering succeeds");
2665
2666 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2667 assert_eq!(
2669 state.len(),
2670 5,
2671 "Fixed parallelism should ignore all strategies"
2672 );
2673 }
2674
2675 #[test]
2677 fn render_actors_ratio_strategy() {
2678 let actor_id_counter = AtomicU32::new(0);
2679 let fragment_id: FragmentId = 1.into();
2680 let job_id: JobId = 103.into();
2681 let database_id: DatabaseId = DatabaseId::new(13);
2682
2683 let fragment_model = build_fragment(
2684 fragment_id,
2685 job_id,
2686 0,
2687 DistributionType::Hash,
2688 16,
2689 StreamingParallelism::Adaptive,
2690 );
2691
2692 let job_model = streaming_job::Model {
2694 job_id,
2695 job_status: JobStatus::Created,
2696 create_type: CreateType::Foreground,
2697 timezone: None,
2698 config_override: None,
2699 adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
2700 parallelism: StreamingParallelism::Adaptive,
2701 backfill_parallelism: None,
2702 backfill_orders: None,
2703 max_parallelism: 16,
2704 specific_resource_group: None,
2705 is_serverless_backfill: false,
2706 };
2707
2708 let database_model = database::Model {
2709 database_id,
2710 name: "test_db".into(),
2711 resource_group: "default".into(),
2712 barrier_interval_ms: None,
2713 checkpoint_frequency: None,
2714 };
2715
2716 let ensembles = vec![NoShuffleEnsemble {
2717 entries: HashSet::from([fragment_id]),
2718 components: HashSet::from([fragment_id]),
2719 }];
2720
2721 let fragment_map =
2722 HashMap::from([(job_id, HashMap::from([(fragment_id, fragment_model)]))]);
2723 let job_map = HashMap::from([(job_id, job_model)]);
2724
2725 let worker_map = HashMap::from([
2727 (1.into(), build_worker_node(1, 1, "default")),
2728 (2.into(), build_worker_node(2, 1, "default")),
2729 (3.into(), build_worker_node(3, 1, "default")),
2730 (4.into(), build_worker_node(4, 1, "default")),
2731 (5.into(), build_worker_node(5, 1, "default")),
2732 (6.into(), build_worker_node(6, 1, "default")),
2733 (7.into(), build_worker_node(7, 1, "default")),
2734 (8.into(), build_worker_node(8, 1, "default")),
2735 ]);
2736
2737 let fragment_source_ids: HashMap<FragmentId, SourceId> = HashMap::new();
2738 let fragment_splits: HashMap<FragmentId, Vec<SplitImpl>> = HashMap::new();
2739 let streaming_job_databases = HashMap::from([(job_id, database_id)]);
2740 let database_map = HashMap::from([(database_id, database_model)]);
2741
2742 let context = RenderActorsContext {
2743 fragment_source_ids: &fragment_source_ids,
2744 fragment_splits: &fragment_splits,
2745 streaming_job_databases: &streaming_job_databases,
2746 database_map: &database_map,
2747 };
2748
2749 let result = render_actors(
2750 &actor_id_counter,
2751 &ensembles,
2752 &fragment_map,
2753 &job_map,
2754 &worker_map,
2755 AdaptiveParallelismStrategy::Full,
2756 context,
2757 )
2758 .expect("actor rendering succeeds");
2759
2760 let state = collect_actor_state(&result[&database_id][&job_id][&fragment_id]);
2761 assert_eq!(
2763 state.len(),
2764 4,
2765 "RATIO(0.5) of 8 workers should give 4 actors"
2766 );
2767 }
2768}