1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::{
22 IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat, WorkerSlotId,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node};
25use risingwave_connector::source::SplitImpl;
26use risingwave_meta_model::actor_dispatcher::DispatcherType;
27use risingwave_meta_model::{SourceId, StreamingParallelism, WorkerId};
28use risingwave_pb::catalog::Table;
29use risingwave_pb::common::PbActorLocation;
30use risingwave_pb::meta::table_fragments::actor_status::ActorState;
31use risingwave_pb::meta::table_fragments::fragment::{
32 FragmentDistributionType, PbFragmentDistributionType,
33};
34use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
35use risingwave_pb::meta::table_parallelism::{
36 FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
37 PbParallelism,
38};
39use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
40use risingwave_pb::plan_common::PbExprContext;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::{
43 DispatchStrategy, Dispatcher, FragmentTypeFlag, PbDispatcher, PbStreamActor, PbStreamContext,
44 StreamNode,
45};
46
47use super::{ActorId, FragmentId};
48use crate::model::MetadataModelResult;
49use crate::stream::{SplitAssignment, build_actor_connector_splits};
50
51#[derive(Debug, Copy, Clone, Eq, PartialEq)]
53pub enum TableParallelism {
54 Adaptive,
56 Fixed(usize),
59 Custom,
66}
67
68impl From<PbTableParallelism> for TableParallelism {
69 fn from(value: PbTableParallelism) -> Self {
70 use Parallelism::*;
71 match &value.parallelism {
72 Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
73 Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
74 Some(Custom(_)) => Self::Custom,
75 _ => unreachable!(),
76 }
77 }
78}
79
80impl From<TableParallelism> for PbTableParallelism {
81 fn from(value: TableParallelism) -> Self {
82 use TableParallelism::*;
83
84 let parallelism = match value {
85 Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
86 Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
87 parallelism: n as u32,
88 }),
89 Custom => PbParallelism::Custom(PbCustomParallelism {}),
90 };
91
92 Self {
93 parallelism: Some(parallelism),
94 }
95 }
96}
97
98impl From<StreamingParallelism> for TableParallelism {
99 fn from(value: StreamingParallelism) -> Self {
100 match value {
101 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
102 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
103 StreamingParallelism::Custom => TableParallelism::Custom,
104 }
105 }
106}
107
108impl From<TableParallelism> for StreamingParallelism {
109 fn from(value: TableParallelism) -> Self {
110 match value {
111 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
112 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
113 TableParallelism::Custom => StreamingParallelism::Custom,
114 }
115 }
116}
117
118pub type ActorUpstreams = BTreeMap<FragmentId, HashSet<ActorId>>;
119pub type FragmentActorUpstreams = HashMap<ActorId, ActorUpstreams>;
120pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
121pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
122pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
123
124pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
125pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
127pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
130
131#[derive(Debug, Clone)]
132pub struct DownstreamFragmentRelation {
133 pub downstream_fragment_id: FragmentId,
134 pub dispatcher_type: DispatcherType,
135 pub dist_key_indices: Vec<u32>,
136 pub output_indices: Vec<u32>,
137}
138
139impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
140 fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
141 Self {
142 downstream_fragment_id: fragment_id,
143 dispatcher_type: dispatch.get_type().unwrap().into(),
144 dist_key_indices: dispatch.dist_key_indices,
145 output_indices: dispatch.output_indices,
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
151pub struct StreamJobFragmentsToCreate {
152 pub inner: StreamJobFragments,
153 pub downstreams: FragmentDownstreamRelation,
154}
155
156impl Deref for StreamJobFragmentsToCreate {
157 type Target = StreamJobFragments;
158
159 fn deref(&self) -> &Self::Target {
160 &self.inner
161 }
162}
163
164#[derive(Clone, Debug)]
165pub struct StreamActor {
166 pub actor_id: u32,
167 pub fragment_id: u32,
168 pub vnode_bitmap: Option<Bitmap>,
169 pub mview_definition: String,
170 pub expr_context: Option<PbExprContext>,
171}
172
173impl StreamActor {
174 fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
175 PbStreamActor {
176 actor_id: self.actor_id,
177 fragment_id: self.fragment_id,
178 dispatcher: dispatchers.collect(),
179 vnode_bitmap: self
180 .vnode_bitmap
181 .as_ref()
182 .map(|bitmap| bitmap.to_protobuf()),
183 mview_definition: self.mview_definition.clone(),
184 expr_context: self.expr_context.clone(),
185 }
186 }
187}
188
189#[derive(Clone, Debug, Default)]
190pub struct Fragment {
191 pub fragment_id: FragmentId,
192 pub fragment_type_mask: u32,
193 pub distribution_type: PbFragmentDistributionType,
194 pub actors: Vec<StreamActor>,
195 pub state_table_ids: Vec<u32>,
196 pub maybe_vnode_count: Option<u32>,
197 pub nodes: StreamNode,
198}
199
200impl Fragment {
201 pub fn to_protobuf(
202 &self,
203 upstream_fragments: impl Iterator<Item = FragmentId>,
204 dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
205 ) -> PbFragment {
206 PbFragment {
207 fragment_id: self.fragment_id,
208 fragment_type_mask: self.fragment_type_mask,
209 distribution_type: self.distribution_type as _,
210 actors: self
211 .actors
212 .iter()
213 .map(|actor| {
214 actor.to_protobuf(
215 dispatchers
216 .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
217 .into_iter()
218 .flatten()
219 .cloned(),
220 )
221 })
222 .collect(),
223 state_table_ids: self.state_table_ids.clone(),
224 upstream_fragment_ids: upstream_fragments.collect(),
225 maybe_vnode_count: self.maybe_vnode_count,
226 nodes: Some(self.nodes.clone()),
227 }
228 }
229}
230
231impl VnodeCountCompat for Fragment {
232 fn vnode_count_inner(&self) -> VnodeCount {
233 VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
234 }
235}
236
237impl IsSingleton for Fragment {
238 fn is_singleton(&self) -> bool {
239 matches!(self.distribution_type, FragmentDistributionType::Single)
240 }
241}
242
243#[derive(Debug, Clone)]
249pub struct StreamJobFragments {
250 pub stream_job_id: TableId,
252
253 pub state: State,
255
256 pub fragments: BTreeMap<FragmentId, Fragment>,
258
259 pub actor_status: BTreeMap<ActorId, ActorStatus>,
261
262 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
265
266 pub ctx: StreamContext,
268
269 pub assigned_parallelism: TableParallelism,
271
272 pub max_parallelism: usize,
283}
284
285#[derive(Debug, Clone, Default)]
286pub struct StreamContext {
287 pub timezone: Option<String>,
289}
290
291impl StreamContext {
292 pub fn to_protobuf(&self) -> PbStreamContext {
293 PbStreamContext {
294 timezone: self.timezone.clone().unwrap_or("".into()),
295 }
296 }
297
298 pub fn to_expr_context(&self) -> PbExprContext {
299 PbExprContext {
300 time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
302 strict_mode: false,
303 }
304 }
305
306 pub fn from_protobuf(prost: &PbStreamContext) -> Self {
307 Self {
308 timezone: if prost.get_timezone().is_empty() {
309 None
310 } else {
311 Some(prost.get_timezone().clone())
312 },
313 }
314 }
315}
316
317impl StreamJobFragments {
318 pub fn to_protobuf(
319 &self,
320 fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
321 fragment_dispatchers: &FragmentActorDispatchers,
322 ) -> PbTableFragments {
323 PbTableFragments {
324 table_id: self.stream_job_id.table_id(),
325 state: self.state as _,
326 fragments: self
327 .fragments
328 .iter()
329 .map(|(id, fragment)| {
330 (
331 *id,
332 fragment.to_protobuf(
333 fragment_upstreams.get(id).into_iter().flatten().cloned(),
334 fragment_dispatchers.get(&(*id as _)),
335 ),
336 )
337 })
338 .collect(),
339 actor_status: self.actor_status.clone().into_iter().collect(),
340 actor_splits: build_actor_connector_splits(&self.actor_splits),
341 ctx: Some(self.ctx.to_protobuf()),
342 parallelism: Some(self.assigned_parallelism.into()),
343 node_label: "".to_owned(),
344 backfill_done: true,
345 max_parallelism: Some(self.max_parallelism as _),
346 }
347 }
348}
349
350pub type StreamJobActorsToCreate =
351 HashMap<WorkerId, HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>)>>;
352
353impl StreamJobFragments {
354 pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
356 Self::new(
357 table_id,
358 fragments,
359 &BTreeMap::new(),
360 StreamContext::default(),
361 TableParallelism::Adaptive,
362 VirtualNode::COUNT_FOR_TEST,
363 )
364 }
365
366 pub fn new(
369 stream_job_id: TableId,
370 fragments: BTreeMap<FragmentId, Fragment>,
371 actor_locations: &BTreeMap<ActorId, WorkerSlotId>,
372 ctx: StreamContext,
373 table_parallelism: TableParallelism,
374 max_parallelism: usize,
375 ) -> Self {
376 let actor_status = actor_locations
377 .iter()
378 .map(|(&actor_id, worker_slot_id)| {
379 (
380 actor_id,
381 ActorStatus {
382 location: PbActorLocation::from_worker(worker_slot_id.worker_id()),
383 state: ActorState::Inactive as i32,
384 },
385 )
386 })
387 .collect();
388
389 Self {
390 stream_job_id,
391 state: State::Initial,
392 fragments,
393 actor_status,
394 actor_splits: HashMap::default(),
395 ctx,
396 assigned_parallelism: table_parallelism,
397 max_parallelism,
398 }
399 }
400
401 pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
402 self.fragments.keys().cloned()
403 }
404
405 pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
406 self.fragments.values()
407 }
408
409 pub fn stream_job_id(&self) -> TableId {
411 self.stream_job_id
412 }
413
414 pub fn state(&self) -> State {
416 self.state
417 }
418
419 pub fn timezone(&self) -> Option<String> {
421 self.ctx.timezone.clone()
422 }
423
424 pub fn is_created(&self) -> bool {
426 self.state == State::Created
427 }
428
429 pub fn is_initial(&self) -> bool {
431 self.state == State::Initial
432 }
433
434 pub fn set_state(&mut self, state: State) {
436 self.state = state;
437 }
438
439 pub fn update_actors_state(&mut self, state: ActorState) {
441 for actor_status in self.actor_status.values_mut() {
442 actor_status.set_state(state);
443 }
444 }
445
446 pub fn set_actor_splits_by_split_assignment(&mut self, split_assignment: SplitAssignment) {
447 self.actor_splits = split_assignment.into_values().flatten().collect();
448 }
449
450 pub fn actor_ids(&self) -> Vec<ActorId> {
452 self.fragments
453 .values()
454 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
455 .collect()
456 }
457
458 pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
459 self.fragments
460 .values()
461 .flat_map(|fragment| {
462 fragment
463 .actors
464 .iter()
465 .map(|actor| (actor.actor_id, fragment.fragment_id))
466 })
467 .collect()
468 }
469
470 #[cfg(test)]
472 pub fn actors(&self) -> Vec<StreamActor> {
473 self.fragments
474 .values()
475 .flat_map(|fragment| fragment.actors.clone())
476 .collect()
477 }
478
479 pub fn filter_actor_ids(
481 &self,
482 check_type: impl Fn(u32) -> bool + 'static,
483 ) -> impl Iterator<Item = ActorId> + '_ {
484 self.fragments
485 .values()
486 .filter(move |fragment| check_type(fragment.fragment_type_mask))
487 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
488 }
489
490 pub fn mview_actor_ids(&self) -> Vec<ActorId> {
492 Self::filter_actor_ids(self, |fragment_type_mask| {
493 (fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0
494 })
495 .collect()
496 }
497
498 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
500 let mut actor_ids = vec![];
501 for fragment in self.fragments.values() {
502 if fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32 != 0 {
503 return vec![];
506 }
507 if (fragment.fragment_type_mask
508 & (FragmentTypeFlag::Values as u32
509 | FragmentTypeFlag::StreamScan as u32
510 | FragmentTypeFlag::SourceScan as u32))
511 != 0
512 {
513 actor_ids.extend(fragment.actors.iter().map(|actor| {
514 (
515 actor.actor_id,
516 BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
517 )
518 }));
519 }
520 }
521 actor_ids
522 }
523
524 pub fn root_fragment(&self) -> Option<Fragment> {
525 self.mview_fragment()
526 .or_else(|| self.sink_fragment())
527 .or_else(|| self.source_fragment())
528 }
529
530 pub fn mview_fragment(&self) -> Option<Fragment> {
532 self.fragments
533 .values()
534 .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0)
535 .cloned()
536 }
537
538 pub fn source_fragment(&self) -> Option<Fragment> {
539 self.fragments
540 .values()
541 .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Source as u32) != 0)
542 .cloned()
543 }
544
545 pub fn sink_fragment(&self) -> Option<Fragment> {
546 self.fragments
547 .values()
548 .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Sink as u32) != 0)
549 .cloned()
550 }
551
552 pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
553 Self::filter_actor_ids(self, |mask| {
554 (mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0
555 })
556 .collect()
557 }
558
559 pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
562 let mut source_fragments = HashMap::new();
563
564 for fragment in self.fragments() {
565 {
566 if let Some(source_id) = fragment.nodes.find_stream_source() {
567 source_fragments
568 .entry(source_id as SourceId)
569 .or_insert(BTreeSet::new())
570 .insert(fragment.fragment_id as FragmentId);
571 }
572 }
573 }
574 source_fragments
575 }
576
577 pub fn source_backfill_fragments(
582 &self,
583 ) -> MetadataModelResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
584 let mut source_backfill_fragments = HashMap::new();
585
586 for fragment in self.fragments() {
587 {
588 if let Some((source_id, upstream_source_fragment_id)) =
589 fragment.nodes.find_source_backfill()
590 {
591 source_backfill_fragments
592 .entry(source_id as SourceId)
593 .or_insert(BTreeSet::new())
594 .insert((fragment.fragment_id, upstream_source_fragment_id));
595 }
596 }
597 }
598 Ok(source_backfill_fragments)
599 }
600
601 pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
604 let mut union_fragment_id = None;
605 for (fragment_id, fragment) in &self.fragments {
606 {
607 {
608 visit_stream_node(&fragment.nodes, |body| {
609 if let NodeBody::Union(_) = body {
610 if let Some(union_fragment_id) = union_fragment_id.as_mut() {
611 assert_eq!(*union_fragment_id, *fragment_id);
613 } else {
614 union_fragment_id = Some(*fragment_id);
615 }
616 }
617 })
618 }
619 }
620 }
621
622 let union_fragment_id =
623 union_fragment_id.expect("fragment of placeholder merger not found");
624
625 (self
626 .fragments
627 .get_mut(&union_fragment_id)
628 .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
629 }
630
631 fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
633 let table_id = match stream_node.node_body.as_ref() {
634 Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
635 Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
636 _ => None,
637 };
638 if let Some(table_id) = table_id {
639 table_ids.entry(table_id).or_default().add_assign(1);
640 }
641
642 for child in &stream_node.input {
643 Self::resolve_dependent_table(child, table_ids);
644 }
645 }
646
647 pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
649 let mut table_ids = HashMap::new();
650 self.fragments.values().for_each(|fragment| {
651 Self::resolve_dependent_table(&fragment.nodes, &mut table_ids);
652 });
653
654 table_ids
655 }
656
657 pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
659 let mut map = BTreeMap::default();
660 for (&actor_id, actor_status) in &self.actor_status {
661 let node_id = actor_status.worker_id() as WorkerId;
662 map.entry(node_id)
663 .or_insert_with(Vec::new)
664 .push((actor_id, actor_status.state()));
665 }
666 map
667 }
668
669 pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
671 let mut map = BTreeMap::default();
672 for (&actor_id, actor_status) in &self.actor_status {
673 let node_id = actor_status.worker_id() as WorkerId;
674 map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
675 }
676 map
677 }
678
679 pub fn active_actors(&self) -> Vec<StreamActor> {
681 let mut actors = vec![];
682 for fragment in self.fragments.values() {
683 for actor in &fragment.actors {
684 if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
685 continue;
686 }
687 actors.push(actor.clone());
688 }
689 }
690 actors
691 }
692
693 pub fn actors_to_create(
694 &self,
695 ) -> impl Iterator<
696 Item = (
697 FragmentId,
698 &StreamNode,
699 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
700 ),
701 > + '_ {
702 self.fragments.values().map(move |fragment| {
703 (
704 fragment.fragment_id,
705 &fragment.nodes,
706 fragment.actors.iter().map(move |actor| {
707 let worker_id = self
708 .actor_status
709 .get(&actor.actor_id)
710 .expect("should exist")
711 .worker_id() as WorkerId;
712 (actor, worker_id)
713 }),
714 )
715 })
716 }
717
718 pub fn mv_table_id(&self) -> Option<u32> {
719 if self
720 .fragments
721 .values()
722 .flat_map(|f| f.state_table_ids.iter())
723 .any(|table_id| *table_id == self.stream_job_id.table_id)
724 {
725 Some(self.stream_job_id.table_id)
726 } else {
727 None
728 }
729 }
730
731 pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
736 self.collect_tables_inner(true)
737 }
738
739 pub fn all_tables(&self) -> BTreeMap<u32, Table> {
741 self.collect_tables_inner(false)
742 }
743
744 fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap<u32, Table> {
745 let mut tables = BTreeMap::new();
746 for fragment in self.fragments.values() {
747 stream_graph_visitor::visit_stream_node_tables_inner(
748 &mut fragment.nodes.clone(),
749 internal_tables_only,
750 true,
751 |table, _| {
752 let table_id = table.id;
753 tables
754 .try_insert(table_id, table.clone())
755 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
756 },
757 );
758 }
759 tables
760 }
761
762 pub fn internal_table_ids(&self) -> Vec<u32> {
764 self.fragments
765 .values()
766 .flat_map(|f| f.state_table_ids.clone())
767 .filter(|&t| t != self.stream_job_id.table_id)
768 .collect_vec()
769 }
770
771 pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
773 self.fragments
774 .values()
775 .flat_map(|f| f.state_table_ids.clone())
776 }
777
778 pub fn fill_expr_context(mut self) -> Self {
780 self.fragments.values_mut().for_each(|fragment| {
781 fragment.actors.iter_mut().for_each(|actor| {
782 if actor.expr_context.is_none() {
783 actor.expr_context = Some(self.ctx.to_expr_context());
784 }
785 });
786 });
787 self
788 }
789}
790
791#[derive(Debug, Clone, Copy, PartialEq, Eq)]
792pub enum BackfillUpstreamType {
793 MView,
794 Values,
795 Source,
796}
797
798impl BackfillUpstreamType {
799 pub fn from_fragment_type_mask(mask: u32) -> Self {
800 let is_mview = (mask & FragmentTypeFlag::StreamScan as u32) != 0;
801 let is_values = (mask & FragmentTypeFlag::Values as u32) != 0;
802 let is_source = (mask & FragmentTypeFlag::SourceScan as u32) != 0;
803
804 debug_assert!(
807 is_mview as u8 + is_values as u8 + is_source as u8 == 1,
808 "a backfill fragment should either be mview, value or source, found {:?}",
809 mask
810 );
811
812 if is_mview {
813 BackfillUpstreamType::MView
814 } else if is_values {
815 BackfillUpstreamType::Values
816 } else if is_source {
817 BackfillUpstreamType::Source
818 } else {
819 unreachable!("invalid fragment type mask: {}", mask);
820 }
821 }
822}