1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::{
22 IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat, WorkerSlotId,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node};
25use risingwave_connector::source::SplitImpl;
26use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
27use risingwave_pb::catalog::Table;
28use risingwave_pb::common::{ActorInfo, PbActorLocation};
29use risingwave_pb::meta::table_fragments::actor_status::ActorState;
30use risingwave_pb::meta::table_fragments::fragment::{
31 FragmentDistributionType, PbFragmentDistributionType,
32};
33use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
34use risingwave_pb::meta::table_parallelism::{
35 FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
36 PbParallelism,
37};
38use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
39use risingwave_pb::plan_common::PbExprContext;
40use risingwave_pb::stream_plan::stream_node::NodeBody;
41use risingwave_pb::stream_plan::{
42 DispatchStrategy, Dispatcher, FragmentTypeFlag, PbDispatcher, PbStreamActor, PbStreamContext,
43 StreamNode,
44};
45
46use super::{ActorId, FragmentId};
47use crate::model::MetadataModelResult;
48use crate::stream::{SplitAssignment, build_actor_connector_splits};
49
50#[derive(Debug, Copy, Clone, Eq, PartialEq)]
52pub enum TableParallelism {
53 Adaptive,
55 Fixed(usize),
58 Custom,
65}
66
67impl From<PbTableParallelism> for TableParallelism {
68 fn from(value: PbTableParallelism) -> Self {
69 use Parallelism::*;
70 match &value.parallelism {
71 Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
72 Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
73 Some(Custom(_)) => Self::Custom,
74 _ => unreachable!(),
75 }
76 }
77}
78
79impl From<TableParallelism> for PbTableParallelism {
80 fn from(value: TableParallelism) -> Self {
81 use TableParallelism::*;
82
83 let parallelism = match value {
84 Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
85 Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
86 parallelism: n as u32,
87 }),
88 Custom => PbParallelism::Custom(PbCustomParallelism {}),
89 };
90
91 Self {
92 parallelism: Some(parallelism),
93 }
94 }
95}
96
97impl From<StreamingParallelism> for TableParallelism {
98 fn from(value: StreamingParallelism) -> Self {
99 match value {
100 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
101 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
102 StreamingParallelism::Custom => TableParallelism::Custom,
103 }
104 }
105}
106
107impl From<TableParallelism> for StreamingParallelism {
108 fn from(value: TableParallelism) -> Self {
109 match value {
110 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
111 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
112 TableParallelism::Custom => StreamingParallelism::Custom,
113 }
114 }
115}
116
117pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
118pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
119pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
120pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
121
122pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
123pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
125pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
128
129#[derive(Debug, Clone)]
130pub struct DownstreamFragmentRelation {
131 pub downstream_fragment_id: FragmentId,
132 pub dispatcher_type: DispatcherType,
133 pub dist_key_indices: Vec<u32>,
134 pub output_indices: Vec<u32>,
135}
136
137impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
138 fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
139 Self {
140 downstream_fragment_id: fragment_id,
141 dispatcher_type: dispatch.get_type().unwrap().into(),
142 dist_key_indices: dispatch.dist_key_indices,
143 output_indices: dispatch.output_indices,
144 }
145 }
146}
147
148#[derive(Debug, Clone)]
149pub struct StreamJobFragmentsToCreate {
150 pub inner: StreamJobFragments,
151 pub downstreams: FragmentDownstreamRelation,
152}
153
154impl Deref for StreamJobFragmentsToCreate {
155 type Target = StreamJobFragments;
156
157 fn deref(&self) -> &Self::Target {
158 &self.inner
159 }
160}
161
162#[derive(Clone, Debug)]
163pub struct StreamActor {
164 pub actor_id: u32,
165 pub fragment_id: u32,
166 pub vnode_bitmap: Option<Bitmap>,
167 pub mview_definition: String,
168 pub expr_context: Option<PbExprContext>,
169}
170
171impl StreamActor {
172 fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
173 PbStreamActor {
174 actor_id: self.actor_id,
175 fragment_id: self.fragment_id,
176 dispatcher: dispatchers.collect(),
177 vnode_bitmap: self
178 .vnode_bitmap
179 .as_ref()
180 .map(|bitmap| bitmap.to_protobuf()),
181 mview_definition: self.mview_definition.clone(),
182 expr_context: self.expr_context.clone(),
183 }
184 }
185}
186
187#[derive(Clone, Debug, Default)]
188pub struct Fragment {
189 pub fragment_id: FragmentId,
190 pub fragment_type_mask: u32,
191 pub distribution_type: PbFragmentDistributionType,
192 pub actors: Vec<StreamActor>,
193 pub state_table_ids: Vec<u32>,
194 pub maybe_vnode_count: Option<u32>,
195 pub nodes: StreamNode,
196}
197
198impl Fragment {
199 pub fn to_protobuf(
200 &self,
201 upstream_fragments: impl Iterator<Item = FragmentId>,
202 dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
203 ) -> PbFragment {
204 PbFragment {
205 fragment_id: self.fragment_id,
206 fragment_type_mask: self.fragment_type_mask,
207 distribution_type: self.distribution_type as _,
208 actors: self
209 .actors
210 .iter()
211 .map(|actor| {
212 actor.to_protobuf(
213 dispatchers
214 .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
215 .into_iter()
216 .flatten()
217 .cloned(),
218 )
219 })
220 .collect(),
221 state_table_ids: self.state_table_ids.clone(),
222 upstream_fragment_ids: upstream_fragments.collect(),
223 maybe_vnode_count: self.maybe_vnode_count,
224 nodes: Some(self.nodes.clone()),
225 }
226 }
227}
228
229impl VnodeCountCompat for Fragment {
230 fn vnode_count_inner(&self) -> VnodeCount {
231 VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
232 }
233}
234
235impl IsSingleton for Fragment {
236 fn is_singleton(&self) -> bool {
237 matches!(self.distribution_type, FragmentDistributionType::Single)
238 }
239}
240
241#[derive(Debug, Clone)]
247pub struct StreamJobFragments {
248 pub stream_job_id: TableId,
250
251 pub state: State,
253
254 pub fragments: BTreeMap<FragmentId, Fragment>,
256
257 pub actor_status: BTreeMap<ActorId, ActorStatus>,
259
260 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
263
264 pub ctx: StreamContext,
266
267 pub assigned_parallelism: TableParallelism,
269
270 pub max_parallelism: usize,
281}
282
283#[derive(Debug, Clone, Default)]
284pub struct StreamContext {
285 pub timezone: Option<String>,
287}
288
289impl StreamContext {
290 pub fn to_protobuf(&self) -> PbStreamContext {
291 PbStreamContext {
292 timezone: self.timezone.clone().unwrap_or("".into()),
293 }
294 }
295
296 pub fn to_expr_context(&self) -> PbExprContext {
297 PbExprContext {
298 time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
300 strict_mode: false,
301 }
302 }
303
304 pub fn from_protobuf(prost: &PbStreamContext) -> Self {
305 Self {
306 timezone: if prost.get_timezone().is_empty() {
307 None
308 } else {
309 Some(prost.get_timezone().clone())
310 },
311 }
312 }
313}
314
315impl StreamJobFragments {
316 pub fn to_protobuf(
317 &self,
318 fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
319 fragment_dispatchers: &FragmentActorDispatchers,
320 ) -> PbTableFragments {
321 PbTableFragments {
322 table_id: self.stream_job_id.table_id(),
323 state: self.state as _,
324 fragments: self
325 .fragments
326 .iter()
327 .map(|(id, fragment)| {
328 (
329 *id,
330 fragment.to_protobuf(
331 fragment_upstreams.get(id).into_iter().flatten().cloned(),
332 fragment_dispatchers.get(&(*id as _)),
333 ),
334 )
335 })
336 .collect(),
337 actor_status: self.actor_status.clone().into_iter().collect(),
338 actor_splits: build_actor_connector_splits(&self.actor_splits),
339 ctx: Some(self.ctx.to_protobuf()),
340 parallelism: Some(self.assigned_parallelism.into()),
341 node_label: "".to_owned(),
342 backfill_done: true,
343 max_parallelism: Some(self.max_parallelism as _),
344 }
345 }
346}
347
348pub type StreamJobActorsToCreate =
349 HashMap<WorkerId, HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>)>>;
350
351impl StreamJobFragments {
352 pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
354 Self::new(
355 table_id,
356 fragments,
357 &BTreeMap::new(),
358 StreamContext::default(),
359 TableParallelism::Adaptive,
360 VirtualNode::COUNT_FOR_TEST,
361 )
362 }
363
364 pub fn new(
367 stream_job_id: TableId,
368 fragments: BTreeMap<FragmentId, Fragment>,
369 actor_locations: &BTreeMap<ActorId, WorkerSlotId>,
370 ctx: StreamContext,
371 table_parallelism: TableParallelism,
372 max_parallelism: usize,
373 ) -> Self {
374 let actor_status = actor_locations
375 .iter()
376 .map(|(&actor_id, worker_slot_id)| {
377 (
378 actor_id,
379 ActorStatus {
380 location: PbActorLocation::from_worker(worker_slot_id.worker_id()),
381 state: ActorState::Inactive as i32,
382 },
383 )
384 })
385 .collect();
386
387 Self {
388 stream_job_id,
389 state: State::Initial,
390 fragments,
391 actor_status,
392 actor_splits: HashMap::default(),
393 ctx,
394 assigned_parallelism: table_parallelism,
395 max_parallelism,
396 }
397 }
398
399 pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
400 self.fragments.keys().cloned()
401 }
402
403 pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
404 self.fragments.values()
405 }
406
407 pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
408 self.fragments
409 .get(&fragment_id)
410 .map(|f| f.actors.as_slice())
411 .unwrap_or_default()
412 }
413
414 pub fn stream_job_id(&self) -> TableId {
416 self.stream_job_id
417 }
418
419 pub fn state(&self) -> State {
421 self.state
422 }
423
424 pub fn timezone(&self) -> Option<String> {
426 self.ctx.timezone.clone()
427 }
428
429 pub fn is_created(&self) -> bool {
431 self.state == State::Created
432 }
433
434 pub fn is_initial(&self) -> bool {
436 self.state == State::Initial
437 }
438
439 pub fn set_state(&mut self, state: State) {
441 self.state = state;
442 }
443
444 pub fn update_actors_state(&mut self, state: ActorState) {
446 for actor_status in self.actor_status.values_mut() {
447 actor_status.set_state(state);
448 }
449 }
450
451 pub fn set_actor_splits_by_split_assignment(&mut self, split_assignment: SplitAssignment) {
452 self.actor_splits = split_assignment.into_values().flatten().collect();
453 }
454
455 pub fn actor_ids(&self) -> Vec<ActorId> {
457 self.fragments
458 .values()
459 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
460 .collect()
461 }
462
463 pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
464 self.fragments
465 .values()
466 .flat_map(|fragment| {
467 fragment
468 .actors
469 .iter()
470 .map(|actor| (actor.actor_id, fragment.fragment_id))
471 })
472 .collect()
473 }
474
475 #[cfg(test)]
477 pub fn actors(&self) -> Vec<StreamActor> {
478 self.fragments
479 .values()
480 .flat_map(|fragment| fragment.actors.clone())
481 .collect()
482 }
483
484 pub fn filter_actor_ids(
486 &self,
487 check_type: impl Fn(u32) -> bool + 'static,
488 ) -> impl Iterator<Item = ActorId> + '_ {
489 self.fragments
490 .values()
491 .filter(move |fragment| check_type(fragment.fragment_type_mask))
492 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
493 }
494
495 pub fn mview_actor_ids(&self) -> Vec<ActorId> {
497 Self::filter_actor_ids(self, |fragment_type_mask| {
498 (fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0
499 })
500 .collect()
501 }
502
503 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
505 let mut actor_ids = vec![];
506 for fragment in self.fragments.values() {
507 if fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32 != 0 {
508 return vec![];
511 }
512 if (fragment.fragment_type_mask
513 & (FragmentTypeFlag::Values as u32
514 | FragmentTypeFlag::StreamScan as u32
515 | FragmentTypeFlag::SourceScan as u32))
516 != 0
517 {
518 actor_ids.extend(fragment.actors.iter().map(|actor| {
519 (
520 actor.actor_id,
521 BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
522 )
523 }));
524 }
525 }
526 actor_ids
527 }
528
529 pub fn root_fragment(&self) -> Option<Fragment> {
530 self.mview_fragment()
531 .or_else(|| self.sink_fragment())
532 .or_else(|| self.source_fragment())
533 }
534
535 pub fn mview_fragment(&self) -> Option<Fragment> {
537 self.fragments
538 .values()
539 .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0)
540 .cloned()
541 }
542
543 pub fn source_fragment(&self) -> Option<Fragment> {
544 self.fragments
545 .values()
546 .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Source as u32) != 0)
547 .cloned()
548 }
549
550 pub fn sink_fragment(&self) -> Option<Fragment> {
551 self.fragments
552 .values()
553 .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Sink as u32) != 0)
554 .cloned()
555 }
556
557 pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
558 Self::filter_actor_ids(self, |mask| {
559 (mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0
560 })
561 .collect()
562 }
563
564 pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
567 let mut source_fragments = HashMap::new();
568
569 for fragment in self.fragments() {
570 {
571 if let Some(source_id) = fragment.nodes.find_stream_source() {
572 source_fragments
573 .entry(source_id as SourceId)
574 .or_insert(BTreeSet::new())
575 .insert(fragment.fragment_id as FragmentId);
576 }
577 }
578 }
579 source_fragments
580 }
581
582 pub fn source_backfill_fragments(
587 &self,
588 ) -> MetadataModelResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
589 let mut source_backfill_fragments = HashMap::new();
590
591 for fragment in self.fragments() {
592 {
593 if let Some((source_id, upstream_source_fragment_id)) =
594 fragment.nodes.find_source_backfill()
595 {
596 source_backfill_fragments
597 .entry(source_id as SourceId)
598 .or_insert(BTreeSet::new())
599 .insert((fragment.fragment_id, upstream_source_fragment_id));
600 }
601 }
602 }
603 Ok(source_backfill_fragments)
604 }
605
606 pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
609 let mut union_fragment_id = None;
610 for (fragment_id, fragment) in &self.fragments {
611 {
612 {
613 visit_stream_node(&fragment.nodes, |body| {
614 if let NodeBody::Union(_) = body {
615 if let Some(union_fragment_id) = union_fragment_id.as_mut() {
616 assert_eq!(*union_fragment_id, *fragment_id);
618 } else {
619 union_fragment_id = Some(*fragment_id);
620 }
621 }
622 })
623 }
624 }
625 }
626
627 let union_fragment_id =
628 union_fragment_id.expect("fragment of placeholder merger not found");
629
630 (self
631 .fragments
632 .get_mut(&union_fragment_id)
633 .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
634 }
635
636 fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
638 let table_id = match stream_node.node_body.as_ref() {
639 Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
640 Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
641 _ => None,
642 };
643 if let Some(table_id) = table_id {
644 table_ids.entry(table_id).or_default().add_assign(1);
645 }
646
647 for child in &stream_node.input {
648 Self::resolve_dependent_table(child, table_ids);
649 }
650 }
651
652 pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
654 let mut table_ids = HashMap::new();
655 self.fragments.values().for_each(|fragment| {
656 Self::resolve_dependent_table(&fragment.nodes, &mut table_ids);
657 });
658
659 table_ids
660 }
661
662 pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
664 let mut map = BTreeMap::default();
665 for (&actor_id, actor_status) in &self.actor_status {
666 let node_id = actor_status.worker_id() as WorkerId;
667 map.entry(node_id)
668 .or_insert_with(Vec::new)
669 .push((actor_id, actor_status.state()));
670 }
671 map
672 }
673
674 pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
676 let mut map = BTreeMap::default();
677 for (&actor_id, actor_status) in &self.actor_status {
678 let node_id = actor_status.worker_id() as WorkerId;
679 map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
680 }
681 map
682 }
683
684 pub fn active_actors(&self) -> Vec<StreamActor> {
686 let mut actors = vec![];
687 for fragment in self.fragments.values() {
688 for actor in &fragment.actors {
689 if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
690 continue;
691 }
692 actors.push(actor.clone());
693 }
694 }
695 actors
696 }
697
698 pub fn actors_to_create(
699 &self,
700 ) -> impl Iterator<
701 Item = (
702 FragmentId,
703 &StreamNode,
704 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
705 ),
706 > + '_ {
707 self.fragments.values().map(move |fragment| {
708 (
709 fragment.fragment_id,
710 &fragment.nodes,
711 fragment.actors.iter().map(move |actor| {
712 let worker_id = self
713 .actor_status
714 .get(&actor.actor_id)
715 .expect("should exist")
716 .worker_id() as WorkerId;
717 (actor, worker_id)
718 }),
719 )
720 })
721 }
722
723 pub fn mv_table_id(&self) -> Option<u32> {
724 if self
725 .fragments
726 .values()
727 .flat_map(|f| f.state_table_ids.iter())
728 .any(|table_id| *table_id == self.stream_job_id.table_id)
729 {
730 Some(self.stream_job_id.table_id)
731 } else {
732 None
733 }
734 }
735
736 pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
741 self.collect_tables_inner(true)
742 }
743
744 pub fn all_tables(&self) -> BTreeMap<u32, Table> {
746 self.collect_tables_inner(false)
747 }
748
749 fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap<u32, Table> {
750 let mut tables = BTreeMap::new();
751 for fragment in self.fragments.values() {
752 stream_graph_visitor::visit_stream_node_tables_inner(
753 &mut fragment.nodes.clone(),
754 internal_tables_only,
755 true,
756 |table, _| {
757 let table_id = table.id;
758 tables
759 .try_insert(table_id, table.clone())
760 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
761 },
762 );
763 }
764 tables
765 }
766
767 pub fn internal_table_ids(&self) -> Vec<u32> {
769 self.fragments
770 .values()
771 .flat_map(|f| f.state_table_ids.clone())
772 .filter(|&t| t != self.stream_job_id.table_id)
773 .collect_vec()
774 }
775
776 pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
778 self.fragments
779 .values()
780 .flat_map(|f| f.state_table_ids.clone())
781 }
782
783 pub fn fill_expr_context(mut self) -> Self {
785 self.fragments.values_mut().for_each(|fragment| {
786 fragment.actors.iter_mut().for_each(|actor| {
787 if actor.expr_context.is_none() {
788 actor.expr_context = Some(self.ctx.to_expr_context());
789 }
790 });
791 });
792 self
793 }
794}
795
796#[derive(Debug, Clone, Copy, PartialEq, Eq)]
797pub enum BackfillUpstreamType {
798 MView,
799 Values,
800 Source,
801}
802
803impl BackfillUpstreamType {
804 pub fn from_fragment_type_mask(mask: u32) -> Self {
805 let is_mview = (mask & FragmentTypeFlag::StreamScan as u32) != 0;
806 let is_values = (mask & FragmentTypeFlag::Values as u32) != 0;
807 let is_source = (mask & FragmentTypeFlag::SourceScan as u32) != 0;
808
809 debug_assert!(
812 is_mview as u8 + is_values as u8 + is_source as u8 == 1,
813 "a backfill fragment should either be mview, value or source, found {:?}",
814 mask
815 );
816
817 if is_mview {
818 BackfillUpstreamType::MView
819 } else if is_values {
820 BackfillUpstreamType::Values
821 } else if is_source {
822 BackfillUpstreamType::Source
823 } else {
824 unreachable!("invalid fragment type mask: {}", mask);
825 }
826 }
827}