1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
22use risingwave_common::hash::{
23 ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
24};
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
29use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
30use risingwave_pb::catalog::Table;
31use risingwave_pb::common::{ActorInfo, PbActorLocation};
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::table_fragments::fragment::{
34 FragmentDistributionType, PbFragmentDistributionType,
35};
36use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
37use risingwave_pb::meta::table_parallelism::{
38 FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
39 PbParallelism,
40};
41use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
42use risingwave_pb::plan_common::PbExprContext;
43use risingwave_pb::stream_plan::stream_node::NodeBody;
44use risingwave_pb::stream_plan::{
45 DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
46 PbStreamContext, StreamNode,
47};
48use strum::Display;
49
50use super::{ActorId, FragmentId};
51
52#[derive(Debug, Copy, Clone, Eq, PartialEq)]
54pub enum TableParallelism {
55 Adaptive,
57 Fixed(usize),
60 Custom,
67}
68
69impl From<PbTableParallelism> for TableParallelism {
70 fn from(value: PbTableParallelism) -> Self {
71 use Parallelism::*;
72 match &value.parallelism {
73 Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
74 Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
75 Some(Custom(_)) => Self::Custom,
76 _ => unreachable!(),
77 }
78 }
79}
80
81impl From<TableParallelism> for PbTableParallelism {
82 fn from(value: TableParallelism) -> Self {
83 use TableParallelism::*;
84
85 let parallelism = match value {
86 Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
87 Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
88 parallelism: n as u32,
89 }),
90 Custom => PbParallelism::Custom(PbCustomParallelism {}),
91 };
92
93 Self {
94 parallelism: Some(parallelism),
95 }
96 }
97}
98
99impl From<StreamingParallelism> for TableParallelism {
100 fn from(value: StreamingParallelism) -> Self {
101 match value {
102 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
103 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
104 StreamingParallelism::Custom => TableParallelism::Custom,
105 }
106 }
107}
108
109impl From<TableParallelism> for StreamingParallelism {
110 fn from(value: TableParallelism) -> Self {
111 match value {
112 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
113 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
114 TableParallelism::Custom => StreamingParallelism::Custom,
115 }
116 }
117}
118
119pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
120pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
121pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
122pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
123
124pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
125pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
127pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
130
131#[derive(Debug, Clone)]
132pub struct DownstreamFragmentRelation {
133 pub downstream_fragment_id: FragmentId,
134 pub dispatcher_type: DispatcherType,
135 pub dist_key_indices: Vec<u32>,
136 pub output_mapping: PbDispatchOutputMapping,
137}
138
139impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
140 fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
141 Self {
142 downstream_fragment_id: fragment_id,
143 dispatcher_type: dispatch.get_type().unwrap().into(),
144 dist_key_indices: dispatch.dist_key_indices,
145 output_mapping: dispatch.output_mapping.unwrap(),
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
151pub struct StreamJobFragmentsToCreate {
152 pub inner: StreamJobFragments,
153 pub downstreams: FragmentDownstreamRelation,
154}
155
156impl Deref for StreamJobFragmentsToCreate {
157 type Target = StreamJobFragments;
158
159 fn deref(&self) -> &Self::Target {
160 &self.inner
161 }
162}
163
164#[derive(Clone, Debug)]
165pub struct StreamActor {
166 pub actor_id: ActorId,
167 pub fragment_id: FragmentId,
168 pub vnode_bitmap: Option<Bitmap>,
169 pub mview_definition: String,
170 pub expr_context: Option<PbExprContext>,
171 pub config_override: Arc<str>,
173}
174
175impl StreamActor {
176 fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
177 PbStreamActor {
178 actor_id: self.actor_id,
179 fragment_id: self.fragment_id,
180 dispatcher: dispatchers.collect(),
181 vnode_bitmap: self
182 .vnode_bitmap
183 .as_ref()
184 .map(|bitmap| bitmap.to_protobuf()),
185 mview_definition: self.mview_definition.clone(),
186 expr_context: self.expr_context.clone(),
187 config_override: self.config_override.to_string(),
188 }
189 }
190}
191
192#[derive(Clone, Debug, Default)]
193pub struct Fragment {
194 pub fragment_id: FragmentId,
195 pub fragment_type_mask: FragmentTypeMask,
196 pub distribution_type: PbFragmentDistributionType,
197 pub actors: Vec<StreamActor>,
198 pub state_table_ids: Vec<TableId>,
199 pub maybe_vnode_count: Option<u32>,
200 pub nodes: StreamNode,
201}
202
203impl Fragment {
204 pub fn to_protobuf(
205 &self,
206 upstream_fragments: impl Iterator<Item = FragmentId>,
207 dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
208 ) -> PbFragment {
209 PbFragment {
210 fragment_id: self.fragment_id,
211 fragment_type_mask: self.fragment_type_mask.into(),
212 distribution_type: self.distribution_type as _,
213 actors: self
214 .actors
215 .iter()
216 .map(|actor| {
217 actor.to_protobuf(
218 dispatchers
219 .and_then(|dispatchers| dispatchers.get(&actor.actor_id))
220 .into_iter()
221 .flatten()
222 .cloned(),
223 )
224 })
225 .collect(),
226 state_table_ids: self.state_table_ids.clone(),
227 upstream_fragment_ids: upstream_fragments.collect(),
228 maybe_vnode_count: self.maybe_vnode_count,
229 nodes: Some(self.nodes.clone()),
230 }
231 }
232}
233
234impl VnodeCountCompat for Fragment {
235 fn vnode_count_inner(&self) -> VnodeCount {
236 VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
237 }
238}
239
240impl IsSingleton for Fragment {
241 fn is_singleton(&self) -> bool {
242 matches!(self.distribution_type, FragmentDistributionType::Single)
243 }
244}
245
246#[derive(Debug, Clone)]
252pub struct StreamJobFragments {
253 pub stream_job_id: JobId,
255
256 pub state: State,
258
259 pub fragments: BTreeMap<FragmentId, Fragment>,
261
262 pub actor_status: BTreeMap<ActorId, ActorStatus>,
264
265 pub ctx: StreamContext,
267
268 pub assigned_parallelism: TableParallelism,
270
271 pub max_parallelism: usize,
282}
283
284#[derive(Debug, Clone, Default)]
285pub struct StreamContext {
286 pub timezone: Option<String>,
288
289 pub config_override: Arc<str>,
291
292 pub adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
294}
295
296impl StreamContext {
297 pub fn to_protobuf(&self) -> PbStreamContext {
298 PbStreamContext {
299 timezone: self.timezone.clone().unwrap_or("".into()),
300 config_override: self.config_override.to_string(),
301 adaptive_parallelism_strategy: self
302 .adaptive_parallelism_strategy
303 .as_ref()
304 .map(ToString::to_string)
305 .unwrap_or_default(),
306 }
307 }
308
309 pub fn to_expr_context(&self) -> PbExprContext {
310 PbExprContext {
311 time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
313 strict_mode: false,
314 }
315 }
316
317 pub fn from_protobuf(prost: &PbStreamContext) -> Self {
318 Self {
319 timezone: if prost.get_timezone().is_empty() {
320 None
321 } else {
322 Some(prost.get_timezone().clone())
323 },
324 config_override: prost.get_config_override().as_str().into(),
325 adaptive_parallelism_strategy: if prost.get_adaptive_parallelism_strategy().is_empty() {
326 None
327 } else {
328 Some(
329 parse_strategy(prost.get_adaptive_parallelism_strategy())
330 .expect("adaptive parallelism strategy should be validated in frontend"),
331 )
332 },
333 }
334 }
335}
336
337#[easy_ext::ext(StreamingJobModelContextExt)]
338impl risingwave_meta_model::streaming_job::Model {
339 pub fn stream_context(&self) -> StreamContext {
340 StreamContext {
341 timezone: self.timezone.clone(),
342 config_override: self.config_override.clone().unwrap_or_default().into(),
343 adaptive_parallelism_strategy: self.adaptive_parallelism_strategy.as_deref().map(|s| {
344 parse_strategy(s).expect("strategy should be validated before persisting")
345 }),
346 }
347 }
348}
349
350impl StreamJobFragments {
351 pub fn to_protobuf(
352 &self,
353 fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
354 fragment_dispatchers: &FragmentActorDispatchers,
355 ) -> PbTableFragments {
356 PbTableFragments {
357 table_id: self.stream_job_id,
358 state: self.state as _,
359 fragments: self
360 .fragments
361 .iter()
362 .map(|(id, fragment)| {
363 (
364 *id,
365 fragment.to_protobuf(
366 fragment_upstreams.get(id).into_iter().flatten().cloned(),
367 fragment_dispatchers.get(id),
368 ),
369 )
370 })
371 .collect(),
372 actor_status: self
373 .actor_status
374 .iter()
375 .map(|(actor_id, status)| (*actor_id, *status))
376 .collect(),
377 ctx: Some(self.ctx.to_protobuf()),
378 parallelism: Some(self.assigned_parallelism.into()),
379 node_label: "".to_owned(),
380 backfill_done: true,
381 max_parallelism: Some(self.max_parallelism as _),
382 }
383 }
384}
385
386pub type StreamJobActorsToCreate = HashMap<
387 WorkerId,
388 HashMap<
389 FragmentId,
390 (
391 StreamNode,
392 Vec<StreamActorWithUpDownstreams>,
393 HashSet<SubscriberId>,
394 ),
395 >,
396>;
397
398impl StreamJobFragments {
399 pub fn for_test(job_id: JobId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
401 Self::new(
402 job_id,
403 fragments,
404 &BTreeMap::new(),
405 StreamContext::default(),
406 TableParallelism::Adaptive,
407 VirtualNode::COUNT_FOR_TEST,
408 )
409 }
410
411 pub fn new(
414 stream_job_id: JobId,
415 fragments: BTreeMap<FragmentId, Fragment>,
416 actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
417 ctx: StreamContext,
418 table_parallelism: TableParallelism,
419 max_parallelism: usize,
420 ) -> Self {
421 let actor_status = actor_locations
422 .iter()
423 .map(|(&actor_id, alignment_id)| {
424 (
425 actor_id,
426 ActorStatus {
427 location: PbActorLocation::from_worker(alignment_id.worker_id()),
428 },
429 )
430 })
431 .collect();
432
433 Self {
434 stream_job_id,
435 state: State::Initial,
436 fragments,
437 actor_status,
438 ctx,
439 assigned_parallelism: table_parallelism,
440 max_parallelism,
441 }
442 }
443
444 pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
445 self.fragments.keys().cloned()
446 }
447
448 pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
449 self.fragments.values()
450 }
451
452 pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
453 self.fragments
454 .get(&fragment_id)
455 .map(|f| f.actors.as_slice())
456 .unwrap_or_default()
457 }
458
459 pub fn stream_job_id(&self) -> JobId {
461 self.stream_job_id
462 }
463
464 pub fn timezone(&self) -> Option<String> {
466 self.ctx.timezone.clone()
467 }
468
469 pub fn is_created(&self) -> bool {
471 self.state == State::Created
472 }
473
474 pub fn actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
476 self.fragments
477 .values()
478 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
479 }
480
481 pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
482 self.fragments
483 .values()
484 .flat_map(|fragment| {
485 fragment
486 .actors
487 .iter()
488 .map(|actor| (actor.actor_id, fragment.fragment_id))
489 })
490 .collect()
491 }
492
493 #[cfg(test)]
495 pub fn actors(&self) -> Vec<StreamActor> {
496 self.fragments
497 .values()
498 .flat_map(|fragment| fragment.actors.clone())
499 .collect()
500 }
501
502 #[cfg(test)]
504 pub fn mview_fragment_ids(&self) -> Vec<FragmentId> {
505 self.fragments
506 .values()
507 .filter(move |fragment| {
508 fragment
509 .fragment_type_mask
510 .contains(FragmentTypeFlag::Mview)
511 })
512 .map(|fragment| fragment.fragment_id)
513 .collect()
514 }
515
516 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
517 Self::tracking_progress_actor_ids_impl(self.fragments.values().map(|fragment| {
518 (
519 fragment.fragment_type_mask,
520 fragment.actors.iter().map(|actor| actor.actor_id),
521 )
522 }))
523 }
524
525 pub fn tracking_progress_actor_ids_impl(
527 fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
528 ) -> Vec<(ActorId, BackfillUpstreamType)> {
529 let mut actor_ids = vec![];
530 for (fragment_type_mask, actors) in fragments {
531 if fragment_type_mask.contains(FragmentTypeFlag::CdcFilter) {
532 return vec![];
535 }
536 if fragment_type_mask.contains_any([
537 FragmentTypeFlag::Values,
538 FragmentTypeFlag::StreamScan,
539 FragmentTypeFlag::SourceScan,
540 FragmentTypeFlag::LocalityProvider,
541 ]) {
542 actor_ids.extend(actors.map(|actor_id| {
543 (
544 actor_id,
545 BackfillUpstreamType::from_fragment_type_mask(fragment_type_mask),
546 )
547 }));
548 }
549 }
550 actor_ids
551 }
552
553 pub fn root_fragment(&self) -> Option<Fragment> {
554 self.mview_fragment()
555 .or_else(|| self.sink_fragment())
556 .or_else(|| self.source_fragment())
557 }
558
559 pub fn mview_fragment(&self) -> Option<Fragment> {
561 self.fragments
562 .values()
563 .find(|fragment| {
564 fragment
565 .fragment_type_mask
566 .contains(FragmentTypeFlag::Mview)
567 })
568 .cloned()
569 }
570
571 pub fn source_fragment(&self) -> Option<Fragment> {
572 self.fragments
573 .values()
574 .find(|fragment| {
575 fragment
576 .fragment_type_mask
577 .contains(FragmentTypeFlag::Source)
578 })
579 .cloned()
580 }
581
582 pub fn sink_fragment(&self) -> Option<Fragment> {
583 self.fragments
584 .values()
585 .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
586 .cloned()
587 }
588
589 pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
592 let mut source_fragments = HashMap::new();
593
594 for fragment in self.fragments() {
595 {
596 if let Some(source_id) = fragment.nodes.find_stream_source() {
597 source_fragments
598 .entry(source_id)
599 .or_insert(BTreeSet::new())
600 .insert(fragment.fragment_id as FragmentId);
601 }
602 }
603 }
604 source_fragments
605 }
606
607 pub fn source_backfill_fragments(
608 &self,
609 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
610 Self::source_backfill_fragments_impl(
611 self.fragments
612 .iter()
613 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
614 )
615 }
616
617 pub fn source_backfill_fragments_impl(
622 fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
623 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
624 let mut source_backfill_fragments = HashMap::new();
625
626 for (fragment_id, fragment_node) in fragments {
627 {
628 if let Some((source_id, upstream_source_fragment_id)) =
629 fragment_node.find_source_backfill()
630 {
631 source_backfill_fragments
632 .entry(source_id)
633 .or_insert(BTreeSet::new())
634 .insert((fragment_id, upstream_source_fragment_id));
635 }
636 }
637 }
638 source_backfill_fragments
639 }
640
641 pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
644 let mut union_fragment_id = None;
645 for (fragment_id, fragment) in &self.fragments {
646 {
647 {
648 visit_stream_node_body(&fragment.nodes, |body| {
649 if let NodeBody::Union(_) = body {
650 if let Some(union_fragment_id) = union_fragment_id.as_mut() {
651 assert_eq!(*union_fragment_id, *fragment_id);
653 } else {
654 union_fragment_id = Some(*fragment_id);
655 }
656 }
657 })
658 }
659 }
660 }
661
662 let union_fragment_id =
663 union_fragment_id.expect("fragment of placeholder merger not found");
664
665 (self
666 .fragments
667 .get_mut(&union_fragment_id)
668 .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
669 }
670
671 fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
673 let table_id = match stream_node.node_body.as_ref() {
674 Some(NodeBody::StreamScan(stream_scan)) => Some(stream_scan.table_id),
675 Some(NodeBody::StreamCdcScan(stream_scan)) => Some(stream_scan.table_id),
676 Some(NodeBody::LocalityProvider(state)) => {
677 Some(state.state_table.as_ref().expect("must have state").id)
678 }
679 _ => None,
680 };
681 if let Some(table_id) = table_id {
682 table_ids.entry(table_id).or_default().add_assign(1);
683 }
684
685 for child in &stream_node.input {
686 Self::resolve_dependent_table(child, table_ids);
687 }
688 }
689
690 pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
691 Self::upstream_table_counts_impl(self.fragments.values().map(|fragment| &fragment.nodes))
692 }
693
694 pub fn upstream_table_counts_impl(
696 fragment_nodes: impl Iterator<Item = &StreamNode>,
697 ) -> HashMap<TableId, usize> {
698 let mut table_ids = HashMap::new();
699 fragment_nodes.for_each(|node| {
700 Self::resolve_dependent_table(node, &mut table_ids);
701 });
702
703 table_ids
704 }
705
706 pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
708 let mut map = BTreeMap::default();
709 for (&actor_id, actor_status) in &self.actor_status {
710 let node_id = actor_status.worker_id();
711 map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
712 }
713 map
714 }
715
716 pub fn actors_to_create(
717 &self,
718 ) -> impl Iterator<
719 Item = (
720 FragmentId,
721 &StreamNode,
722 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
723 ),
724 > + '_ {
725 self.fragments.values().map(move |fragment| {
726 (
727 fragment.fragment_id,
728 &fragment.nodes,
729 fragment.actors.iter().map(move |actor| {
730 let worker_id: WorkerId = self
731 .actor_status
732 .get(&actor.actor_id)
733 .expect("should exist")
734 .worker_id();
735 (actor, worker_id)
736 }),
737 )
738 })
739 }
740
741 pub fn mv_table_id(&self) -> Option<TableId> {
742 self.fragments
743 .values()
744 .flat_map(|f| f.state_table_ids.iter().copied())
745 .find(|table_id| self.stream_job_id.is_mv_table_id(*table_id))
746 }
747
748 pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<TableId, Table> {
749 let mut tables = BTreeMap::new();
750 for fragment in fragments {
751 stream_graph_visitor::visit_stream_node_tables_inner(
752 &mut fragment.nodes.clone(),
753 false,
754 true,
755 |table, _| {
756 let table_id = table.id;
757 tables
758 .try_insert(table_id, table.clone())
759 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
760 },
761 );
762 }
763 tables
764 }
765
766 pub fn internal_table_ids(&self) -> Vec<TableId> {
768 self.fragments
769 .values()
770 .flat_map(|f| f.state_table_ids.iter().copied())
771 .filter(|&t| !self.stream_job_id.is_mv_table_id(t))
772 .collect_vec()
773 }
774
775 pub fn all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
777 self.fragments
778 .values()
779 .flat_map(|f| f.state_table_ids.clone())
780 }
781
782 pub fn fill_expr_context(mut self) -> Self {
784 self.fragments.values_mut().for_each(|fragment| {
785 fragment.actors.iter_mut().for_each(|actor| {
786 if actor.expr_context.is_none() {
787 actor.expr_context = Some(self.ctx.to_expr_context());
788 }
789 });
790 });
791 self
792 }
793}
794
795#[derive(Debug, Display, Clone, Copy, PartialEq, Eq)]
796pub enum BackfillUpstreamType {
797 MView,
798 Values,
799 Source,
800 LocalityProvider,
801}
802
803impl BackfillUpstreamType {
804 pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
805 let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
806 let is_values = mask.contains(FragmentTypeFlag::Values);
807 let is_source = mask.contains(FragmentTypeFlag::SourceScan);
808 let is_locality_provider = mask.contains(FragmentTypeFlag::LocalityProvider);
809
810 debug_assert!(
813 is_mview as u8 + is_values as u8 + is_source as u8 + is_locality_provider as u8 == 1,
814 "a backfill fragment should either be mview, value, source, or locality provider, found {:?}",
815 mask
816 );
817
818 if is_mview {
819 BackfillUpstreamType::MView
820 } else if is_values {
821 BackfillUpstreamType::Values
822 } else if is_source {
823 BackfillUpstreamType::Source
824 } else if is_locality_provider {
825 BackfillUpstreamType::LocalityProvider
826 } else {
827 unreachable!("invalid fragment type mask: {:?}", mask);
828 }
829 }
830}