1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
21use risingwave_common::hash::{
22 ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
25use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
26use risingwave_pb::catalog::Table;
27use risingwave_pb::common::{ActorInfo, PbActorLocation};
28use risingwave_pb::meta::table_fragments::actor_status::ActorState;
29use risingwave_pb::meta::table_fragments::fragment::{
30 FragmentDistributionType, PbFragmentDistributionType,
31};
32use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
33use risingwave_pb::meta::table_parallelism::{
34 FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
35 PbParallelism,
36};
37use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
38use risingwave_pb::plan_common::PbExprContext;
39use risingwave_pb::stream_plan::stream_node::NodeBody;
40use risingwave_pb::stream_plan::{
41 DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
42 PbStreamContext, StreamNode,
43};
44
45use super::{ActorId, FragmentId};
46
47#[derive(Debug, Copy, Clone, Eq, PartialEq)]
49pub enum TableParallelism {
50 Adaptive,
52 Fixed(usize),
55 Custom,
62}
63
64impl From<PbTableParallelism> for TableParallelism {
65 fn from(value: PbTableParallelism) -> Self {
66 use Parallelism::*;
67 match &value.parallelism {
68 Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
69 Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
70 Some(Custom(_)) => Self::Custom,
71 _ => unreachable!(),
72 }
73 }
74}
75
76impl From<TableParallelism> for PbTableParallelism {
77 fn from(value: TableParallelism) -> Self {
78 use TableParallelism::*;
79
80 let parallelism = match value {
81 Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
82 Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
83 parallelism: n as u32,
84 }),
85 Custom => PbParallelism::Custom(PbCustomParallelism {}),
86 };
87
88 Self {
89 parallelism: Some(parallelism),
90 }
91 }
92}
93
94impl From<StreamingParallelism> for TableParallelism {
95 fn from(value: StreamingParallelism) -> Self {
96 match value {
97 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
98 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
99 StreamingParallelism::Custom => TableParallelism::Custom,
100 }
101 }
102}
103
104impl From<TableParallelism> for StreamingParallelism {
105 fn from(value: TableParallelism) -> Self {
106 match value {
107 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
108 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
109 TableParallelism::Custom => StreamingParallelism::Custom,
110 }
111 }
112}
113
114pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
115pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
116pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
117pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
118
119pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
120pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
122pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
125
126#[derive(Debug, Clone)]
127pub struct DownstreamFragmentRelation {
128 pub downstream_fragment_id: FragmentId,
129 pub dispatcher_type: DispatcherType,
130 pub dist_key_indices: Vec<u32>,
131 pub output_mapping: PbDispatchOutputMapping,
132}
133
134impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
135 fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
136 Self {
137 downstream_fragment_id: fragment_id,
138 dispatcher_type: dispatch.get_type().unwrap().into(),
139 dist_key_indices: dispatch.dist_key_indices,
140 output_mapping: dispatch.output_mapping.unwrap(),
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
146pub struct StreamJobFragmentsToCreate {
147 pub inner: StreamJobFragments,
148 pub downstreams: FragmentDownstreamRelation,
149}
150
151impl Deref for StreamJobFragmentsToCreate {
152 type Target = StreamJobFragments;
153
154 fn deref(&self) -> &Self::Target {
155 &self.inner
156 }
157}
158
159#[derive(Clone, Debug)]
160pub struct StreamActor {
161 pub actor_id: u32,
162 pub fragment_id: u32,
163 pub vnode_bitmap: Option<Bitmap>,
164 pub mview_definition: String,
165 pub expr_context: Option<PbExprContext>,
166}
167
168impl StreamActor {
169 fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
170 PbStreamActor {
171 actor_id: self.actor_id,
172 fragment_id: self.fragment_id,
173 dispatcher: dispatchers.collect(),
174 vnode_bitmap: self
175 .vnode_bitmap
176 .as_ref()
177 .map(|bitmap| bitmap.to_protobuf()),
178 mview_definition: self.mview_definition.clone(),
179 expr_context: self.expr_context.clone(),
180 }
181 }
182}
183
184#[derive(Clone, Debug, Default)]
185pub struct Fragment {
186 pub fragment_id: FragmentId,
187 pub fragment_type_mask: FragmentTypeMask,
188 pub distribution_type: PbFragmentDistributionType,
189 pub actors: Vec<StreamActor>,
190 pub state_table_ids: Vec<u32>,
191 pub maybe_vnode_count: Option<u32>,
192 pub nodes: StreamNode,
193}
194
195impl Fragment {
196 pub fn to_protobuf(
197 &self,
198 upstream_fragments: impl Iterator<Item = FragmentId>,
199 dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
200 ) -> PbFragment {
201 PbFragment {
202 fragment_id: self.fragment_id,
203 fragment_type_mask: self.fragment_type_mask.into(),
204 distribution_type: self.distribution_type as _,
205 actors: self
206 .actors
207 .iter()
208 .map(|actor| {
209 actor.to_protobuf(
210 dispatchers
211 .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
212 .into_iter()
213 .flatten()
214 .cloned(),
215 )
216 })
217 .collect(),
218 state_table_ids: self.state_table_ids.clone(),
219 upstream_fragment_ids: upstream_fragments.collect(),
220 maybe_vnode_count: self.maybe_vnode_count,
221 nodes: Some(self.nodes.clone()),
222 }
223 }
224}
225
226impl VnodeCountCompat for Fragment {
227 fn vnode_count_inner(&self) -> VnodeCount {
228 VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
229 }
230}
231
232impl IsSingleton for Fragment {
233 fn is_singleton(&self) -> bool {
234 matches!(self.distribution_type, FragmentDistributionType::Single)
235 }
236}
237
238#[derive(Debug, Clone)]
244pub struct StreamJobFragments {
245 pub stream_job_id: TableId,
247
248 pub state: State,
250
251 pub fragments: BTreeMap<FragmentId, Fragment>,
253
254 pub actor_status: BTreeMap<ActorId, ActorStatus>,
256
257 pub ctx: StreamContext,
259
260 pub assigned_parallelism: TableParallelism,
262
263 pub max_parallelism: usize,
274}
275
276#[derive(Debug, Clone, Default)]
277pub struct StreamContext {
278 pub timezone: Option<String>,
280}
281
282impl StreamContext {
283 pub fn to_protobuf(&self) -> PbStreamContext {
284 PbStreamContext {
285 timezone: self.timezone.clone().unwrap_or("".into()),
286 }
287 }
288
289 pub fn to_expr_context(&self) -> PbExprContext {
290 PbExprContext {
291 time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
293 strict_mode: false,
294 }
295 }
296
297 pub fn from_protobuf(prost: &PbStreamContext) -> Self {
298 Self {
299 timezone: if prost.get_timezone().is_empty() {
300 None
301 } else {
302 Some(prost.get_timezone().clone())
303 },
304 }
305 }
306}
307
308impl StreamJobFragments {
309 pub fn to_protobuf(
310 &self,
311 fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
312 fragment_dispatchers: &FragmentActorDispatchers,
313 ) -> PbTableFragments {
314 PbTableFragments {
315 table_id: self.stream_job_id.table_id(),
316 state: self.state as _,
317 fragments: self
318 .fragments
319 .iter()
320 .map(|(id, fragment)| {
321 (
322 *id,
323 fragment.to_protobuf(
324 fragment_upstreams.get(id).into_iter().flatten().cloned(),
325 fragment_dispatchers.get(&(*id as _)),
326 ),
327 )
328 })
329 .collect(),
330 actor_status: self.actor_status.clone().into_iter().collect(),
331 ctx: Some(self.ctx.to_protobuf()),
332 parallelism: Some(self.assigned_parallelism.into()),
333 node_label: "".to_owned(),
334 backfill_done: true,
335 max_parallelism: Some(self.max_parallelism as _),
336 }
337 }
338}
339
340pub type StreamJobActorsToCreate = HashMap<
341 WorkerId,
342 HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>, HashSet<u32>)>,
343>;
344
345impl StreamJobFragments {
346 pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
348 Self::new(
349 table_id,
350 fragments,
351 &BTreeMap::new(),
352 StreamContext::default(),
353 TableParallelism::Adaptive,
354 VirtualNode::COUNT_FOR_TEST,
355 )
356 }
357
358 pub fn new(
361 stream_job_id: TableId,
362 fragments: BTreeMap<FragmentId, Fragment>,
363 actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
364 ctx: StreamContext,
365 table_parallelism: TableParallelism,
366 max_parallelism: usize,
367 ) -> Self {
368 let actor_status = actor_locations
369 .iter()
370 .map(|(&actor_id, alignment_id)| {
371 (
372 actor_id,
373 ActorStatus {
374 location: PbActorLocation::from_worker(alignment_id.worker_id()),
375 state: ActorState::Inactive as i32,
376 },
377 )
378 })
379 .collect();
380
381 Self {
382 stream_job_id,
383 state: State::Initial,
384 fragments,
385 actor_status,
386 ctx,
387 assigned_parallelism: table_parallelism,
388 max_parallelism,
389 }
390 }
391
392 pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
393 self.fragments.keys().cloned()
394 }
395
396 pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
397 self.fragments.values()
398 }
399
400 pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
401 self.fragments
402 .get(&fragment_id)
403 .map(|f| f.actors.as_slice())
404 .unwrap_or_default()
405 }
406
407 pub fn stream_job_id(&self) -> TableId {
409 self.stream_job_id
410 }
411
412 pub fn timezone(&self) -> Option<String> {
414 self.ctx.timezone.clone()
415 }
416
417 pub fn is_created(&self) -> bool {
419 self.state == State::Created
420 }
421
422 pub fn update_actors_state(&mut self, state: ActorState) {
424 for actor_status in self.actor_status.values_mut() {
425 actor_status.set_state(state);
426 }
427 }
428
429 pub fn actor_ids(&self) -> Vec<ActorId> {
431 self.fragments
432 .values()
433 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
434 .collect()
435 }
436
437 pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
438 self.fragments
439 .values()
440 .flat_map(|fragment| {
441 fragment
442 .actors
443 .iter()
444 .map(|actor| (actor.actor_id, fragment.fragment_id))
445 })
446 .collect()
447 }
448
449 #[cfg(test)]
451 pub fn actors(&self) -> Vec<StreamActor> {
452 self.fragments
453 .values()
454 .flat_map(|fragment| fragment.actors.clone())
455 .collect()
456 }
457
458 #[cfg(test)]
460 pub fn mview_fragment_ids(&self) -> Vec<FragmentId> {
461 self.fragments
462 .values()
463 .filter(move |fragment| {
464 fragment
465 .fragment_type_mask
466 .contains(FragmentTypeFlag::Mview)
467 })
468 .map(|fragment| fragment.fragment_id)
469 .collect()
470 }
471
472 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
473 Self::tracking_progress_actor_ids_impl(self.fragments.values().map(|fragment| {
474 (
475 fragment.fragment_type_mask,
476 fragment.actors.iter().map(|actor| actor.actor_id),
477 )
478 }))
479 }
480
481 pub fn tracking_progress_actor_ids_impl(
483 fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
484 ) -> Vec<(ActorId, BackfillUpstreamType)> {
485 let mut actor_ids = vec![];
486 for (fragment_type_mask, actors) in fragments {
487 if fragment_type_mask.contains(FragmentTypeFlag::CdcFilter) {
488 return vec![];
491 }
492 if fragment_type_mask.contains_any([
493 FragmentTypeFlag::Values,
494 FragmentTypeFlag::StreamScan,
495 FragmentTypeFlag::SourceScan,
496 FragmentTypeFlag::LocalityProvider,
497 ]) {
498 actor_ids.extend(actors.map(|actor_id| {
499 (
500 actor_id,
501 BackfillUpstreamType::from_fragment_type_mask(fragment_type_mask),
502 )
503 }));
504 }
505 }
506 actor_ids
507 }
508
509 pub fn root_fragment(&self) -> Option<Fragment> {
510 self.mview_fragment()
511 .or_else(|| self.sink_fragment())
512 .or_else(|| self.source_fragment())
513 }
514
515 pub fn mview_fragment(&self) -> Option<Fragment> {
517 self.fragments
518 .values()
519 .find(|fragment| {
520 fragment
521 .fragment_type_mask
522 .contains(FragmentTypeFlag::Mview)
523 })
524 .cloned()
525 }
526
527 pub fn source_fragment(&self) -> Option<Fragment> {
528 self.fragments
529 .values()
530 .find(|fragment| {
531 fragment
532 .fragment_type_mask
533 .contains(FragmentTypeFlag::Source)
534 })
535 .cloned()
536 }
537
538 pub fn sink_fragment(&self) -> Option<Fragment> {
539 self.fragments
540 .values()
541 .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
542 .cloned()
543 }
544
545 pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
548 let mut source_fragments = HashMap::new();
549
550 for fragment in self.fragments() {
551 {
552 if let Some(source_id) = fragment.nodes.find_stream_source() {
553 source_fragments
554 .entry(source_id as SourceId)
555 .or_insert(BTreeSet::new())
556 .insert(fragment.fragment_id as FragmentId);
557 }
558 }
559 }
560 source_fragments
561 }
562
563 pub fn source_backfill_fragments(
564 &self,
565 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
566 Self::source_backfill_fragments_impl(
567 self.fragments
568 .iter()
569 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
570 )
571 }
572
573 pub fn source_backfill_fragments_impl(
578 fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
579 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
580 let mut source_backfill_fragments = HashMap::new();
581
582 for (fragment_id, fragment_node) in fragments {
583 {
584 if let Some((source_id, upstream_source_fragment_id)) =
585 fragment_node.find_source_backfill()
586 {
587 source_backfill_fragments
588 .entry(source_id as SourceId)
589 .or_insert(BTreeSet::new())
590 .insert((fragment_id, upstream_source_fragment_id));
591 }
592 }
593 }
594 source_backfill_fragments
595 }
596
597 pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
600 let mut union_fragment_id = None;
601 for (fragment_id, fragment) in &self.fragments {
602 {
603 {
604 visit_stream_node_body(&fragment.nodes, |body| {
605 if let NodeBody::Union(_) = body {
606 if let Some(union_fragment_id) = union_fragment_id.as_mut() {
607 assert_eq!(*union_fragment_id, *fragment_id);
609 } else {
610 union_fragment_id = Some(*fragment_id);
611 }
612 }
613 })
614 }
615 }
616 }
617
618 let union_fragment_id =
619 union_fragment_id.expect("fragment of placeholder merger not found");
620
621 (self
622 .fragments
623 .get_mut(&union_fragment_id)
624 .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
625 }
626
627 fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
629 let table_id = match stream_node.node_body.as_ref() {
630 Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
631 Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
632 _ => None,
633 };
634 if let Some(table_id) = table_id {
635 table_ids.entry(table_id).or_default().add_assign(1);
636 }
637
638 for child in &stream_node.input {
639 Self::resolve_dependent_table(child, table_ids);
640 }
641 }
642
643 pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
644 Self::upstream_table_counts_impl(self.fragments.values().map(|fragment| &fragment.nodes))
645 }
646
647 pub fn upstream_table_counts_impl(
649 fragment_nodes: impl Iterator<Item = &StreamNode>,
650 ) -> HashMap<TableId, usize> {
651 let mut table_ids = HashMap::new();
652 fragment_nodes.for_each(|node| {
653 Self::resolve_dependent_table(node, &mut table_ids);
654 });
655
656 table_ids
657 }
658
659 pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
661 let mut map = BTreeMap::default();
662 for (&actor_id, actor_status) in &self.actor_status {
663 let node_id = actor_status.worker_id() as WorkerId;
664 map.entry(node_id)
665 .or_insert_with(Vec::new)
666 .push((actor_id, actor_status.state()));
667 }
668 map
669 }
670
671 pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
673 let mut map = BTreeMap::default();
674 for (&actor_id, actor_status) in &self.actor_status {
675 let node_id = actor_status.worker_id() as WorkerId;
676 map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
677 }
678 map
679 }
680
681 pub fn active_actors(&self) -> Vec<StreamActor> {
683 let mut actors = vec![];
684 for fragment in self.fragments.values() {
685 for actor in &fragment.actors {
686 if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
687 continue;
688 }
689 actors.push(actor.clone());
690 }
691 }
692 actors
693 }
694
695 pub fn actors_to_create(
696 &self,
697 ) -> impl Iterator<
698 Item = (
699 FragmentId,
700 &StreamNode,
701 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
702 ),
703 > + '_ {
704 self.fragments.values().map(move |fragment| {
705 (
706 fragment.fragment_id,
707 &fragment.nodes,
708 fragment.actors.iter().map(move |actor| {
709 let worker_id = self
710 .actor_status
711 .get(&actor.actor_id)
712 .expect("should exist")
713 .worker_id() as WorkerId;
714 (actor, worker_id)
715 }),
716 )
717 })
718 }
719
720 pub fn mv_table_id(&self) -> Option<u32> {
721 if self
722 .fragments
723 .values()
724 .flat_map(|f| f.state_table_ids.iter())
725 .any(|table_id| *table_id == self.stream_job_id.table_id)
726 {
727 Some(self.stream_job_id.table_id)
728 } else {
729 None
730 }
731 }
732
733 pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<u32, Table> {
734 let mut tables = BTreeMap::new();
735 for fragment in fragments {
736 stream_graph_visitor::visit_stream_node_tables_inner(
737 &mut fragment.nodes.clone(),
738 false,
739 true,
740 |table, _| {
741 let table_id = table.id;
742 tables
743 .try_insert(table_id, table.clone())
744 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
745 },
746 );
747 }
748 tables
749 }
750
751 pub fn internal_table_ids(&self) -> Vec<u32> {
753 self.fragments
754 .values()
755 .flat_map(|f| f.state_table_ids.clone())
756 .filter(|&t| t != self.stream_job_id.table_id)
757 .collect_vec()
758 }
759
760 pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
762 self.fragments
763 .values()
764 .flat_map(|f| f.state_table_ids.clone())
765 }
766
767 pub fn fill_expr_context(mut self) -> Self {
769 self.fragments.values_mut().for_each(|fragment| {
770 fragment.actors.iter_mut().for_each(|actor| {
771 if actor.expr_context.is_none() {
772 actor.expr_context = Some(self.ctx.to_expr_context());
773 }
774 });
775 });
776 self
777 }
778}
779
780#[derive(Debug, Clone, Copy, PartialEq, Eq)]
781pub enum BackfillUpstreamType {
782 MView,
783 Values,
784 Source,
785 LocalityProvider,
786}
787
788impl BackfillUpstreamType {
789 pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
790 let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
791 let is_values = mask.contains(FragmentTypeFlag::Values);
792 let is_source = mask.contains(FragmentTypeFlag::SourceScan);
793 let is_locality_provider = mask.contains(FragmentTypeFlag::LocalityProvider);
794
795 debug_assert!(
798 is_mview as u8 + is_values as u8 + is_source as u8 + is_locality_provider as u8 == 1,
799 "a backfill fragment should either be mview, value, source, or locality provider, found {:?}",
800 mask
801 );
802
803 if is_mview {
804 BackfillUpstreamType::MView
805 } else if is_values {
806 BackfillUpstreamType::Values
807 } else if is_source {
808 BackfillUpstreamType::Source
809 } else if is_locality_provider {
810 BackfillUpstreamType::LocalityProvider
811 } else {
812 unreachable!("invalid fragment type mask: {:?}", mask);
813 }
814 }
815}