1mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::fmt::Binary;
23use std::sync::Arc;
24
25pub use column::*;
26pub use external_table::*;
27use futures::stream::BoxStream;
28pub use internal_table::*;
29use parse_display::Display;
30pub use physical_table::*;
31use risingwave_pb::catalog::table::PbEngine;
32use risingwave_pb::catalog::{
33 CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
34 StreamJobStatus as PbStreamJobStatus,
35};
36use risingwave_pb::plan_common::ColumnDescVersion;
37pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
38use serde::{Deserialize, Serialize};
39
40use crate::array::DataChunk;
41pub use crate::constants::hummock;
42use crate::error::BoxedError;
43
44pub type CatalogVersion = u64;
46
47pub type TableVersionId = u64;
49pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
51pub type SourceVersionId = u64;
53pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
55
56pub const DEFAULT_DATABASE_NAME: &str = "dev";
57pub const DEFAULT_SCHEMA_NAME: &str = "public";
58pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
59pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
60pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
61pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
62pub const DEFAULT_SUPER_USER: &str = "root";
63pub const DEFAULT_SUPER_USER_ID: u32 = 1;
64pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
66pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
67
68pub const DEFAULT_SUPER_USER_FOR_ADMIN: &str = "rwadmin";
70pub const DEFAULT_SUPER_USER_FOR_ADMIN_ID: u32 = 3;
71
72pub const NON_RESERVED_USER_ID: i32 = 11;
73
74pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
75pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
76
77pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
78
79pub const SYSTEM_SCHEMAS: [&str; 3] = [
80 PG_CATALOG_SCHEMA_NAME,
81 INFORMATION_SCHEMA_SCHEMA_NAME,
82 RW_CATALOG_SCHEMA_NAME,
83];
84pub fn is_system_schema(schema_name: &str) -> bool {
85 SYSTEM_SCHEMAS.contains(&schema_name)
86}
87
88pub fn is_reserved_admin_user(user_name: &str) -> bool {
89 user_name == DEFAULT_SUPER_USER_FOR_ADMIN
90}
91
92pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
93
94pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
98
99pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
100 match version {
101 ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
102 _ => DEFAULT_KEY_COLUMN_NAME,
103 }
104}
105
106pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
111
112pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
117
118pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
119pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
121
122pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
126
127pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
128pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
129
130pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
131pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
132pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
133
134pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
135pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
138pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
139
140pub trait SysCatalogReader: Sync + Send + 'static {
142 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
144}
145
146pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
147
148pub type ObjectId = u32;
149
150#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
151#[display("{database_id}")]
152pub struct DatabaseId {
153 pub database_id: u32,
154}
155
156impl DatabaseId {
157 pub const fn new(database_id: u32) -> Self {
158 DatabaseId { database_id }
159 }
160
161 pub fn placeholder() -> Self {
162 DatabaseId {
163 database_id: OBJECT_ID_PLACEHOLDER,
164 }
165 }
166}
167
168impl From<u32> for DatabaseId {
169 fn from(id: u32) -> Self {
170 Self::new(id)
171 }
172}
173
174impl From<&u32> for DatabaseId {
175 fn from(id: &u32) -> Self {
176 Self::new(*id)
177 }
178}
179
180impl From<DatabaseId> for u32 {
181 fn from(id: DatabaseId) -> Self {
182 id.database_id
183 }
184}
185
186#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)]
187#[display("{schema_id}")]
188pub struct SchemaId {
189 pub schema_id: u32,
190}
191
192impl SchemaId {
193 pub fn new(schema_id: u32) -> Self {
194 SchemaId { schema_id }
195 }
196
197 pub fn placeholder() -> Self {
198 SchemaId {
199 schema_id: OBJECT_ID_PLACEHOLDER,
200 }
201 }
202}
203
204impl From<u32> for SchemaId {
205 fn from(id: u32) -> Self {
206 Self::new(id)
207 }
208}
209
210impl From<&u32> for SchemaId {
211 fn from(id: &u32) -> Self {
212 Self::new(*id)
213 }
214}
215
216impl From<SchemaId> for u32 {
217 fn from(id: SchemaId) -> Self {
218 id.schema_id
219 }
220}
221
222#[derive(
223 Clone,
224 Copy,
225 Debug,
226 Display,
227 Default,
228 Hash,
229 PartialOrd,
230 PartialEq,
231 Eq,
232 Ord,
233 Serialize,
234 Deserialize,
235)]
236#[display("{table_id}")]
237pub struct TableId {
238 pub table_id: u32,
239}
240
241impl TableId {
242 pub const fn new(table_id: u32) -> Self {
243 TableId { table_id }
244 }
245
246 pub const fn placeholder() -> Self {
248 TableId {
249 table_id: OBJECT_ID_PLACEHOLDER,
250 }
251 }
252
253 pub fn table_id(&self) -> u32 {
254 self.table_id
255 }
256}
257
258impl From<u32> for TableId {
259 fn from(id: u32) -> Self {
260 Self::new(id)
261 }
262}
263
264impl From<&u32> for TableId {
265 fn from(id: &u32) -> Self {
266 Self::new(*id)
267 }
268}
269
270impl From<TableId> for u32 {
271 fn from(id: TableId) -> Self {
272 id.table_id
273 }
274}
275
276#[derive(Clone, Debug, PartialEq, Default, Copy)]
277pub struct TableOption {
278 pub retention_seconds: Option<u32>, }
280
281impl From<&risingwave_pb::hummock::TableOption> for TableOption {
282 fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
283 Self {
284 retention_seconds: table_option.retention_seconds,
285 }
286 }
287}
288
289impl From<&TableOption> for risingwave_pb::hummock::TableOption {
290 fn from(table_option: &TableOption) -> Self {
291 Self {
292 retention_seconds: table_option.retention_seconds,
293 }
294 }
295}
296
297impl TableOption {
298 pub fn new(retention_seconds: Option<u32>) -> Self {
299 TableOption { retention_seconds }
301 }
302}
303
304#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq)]
305#[display("{index_id}")]
306pub struct IndexId {
307 pub index_id: u32,
308}
309
310impl IndexId {
311 pub const fn new(index_id: u32) -> Self {
312 IndexId { index_id }
313 }
314
315 pub const fn placeholder() -> Self {
317 IndexId {
318 index_id: OBJECT_ID_PLACEHOLDER,
319 }
320 }
321
322 pub fn index_id(&self) -> u32 {
323 self.index_id
324 }
325}
326
327impl From<u32> for IndexId {
328 fn from(id: u32) -> Self {
329 Self::new(id)
330 }
331}
332impl From<IndexId> for u32 {
333 fn from(id: IndexId) -> Self {
334 id.index_id
335 }
336}
337
338#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
339pub struct FunctionId(pub u32);
340
341impl FunctionId {
342 pub const fn new(id: u32) -> Self {
343 FunctionId(id)
344 }
345
346 pub const fn placeholder() -> Self {
347 FunctionId(OBJECT_ID_PLACEHOLDER)
348 }
349
350 pub fn function_id(&self) -> u32 {
351 self.0
352 }
353}
354
355impl From<u32> for FunctionId {
356 fn from(id: u32) -> Self {
357 Self::new(id)
358 }
359}
360
361impl From<&u32> for FunctionId {
362 fn from(id: &u32) -> Self {
363 Self::new(*id)
364 }
365}
366
367impl From<FunctionId> for u32 {
368 fn from(id: FunctionId) -> Self {
369 id.0
370 }
371}
372
373#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
374#[display("{user_id}")]
375pub struct UserId {
376 pub user_id: u32,
377}
378
379impl UserId {
380 pub const fn new(user_id: u32) -> Self {
381 UserId { user_id }
382 }
383
384 pub const fn placeholder() -> Self {
385 UserId {
386 user_id: OBJECT_ID_PLACEHOLDER,
387 }
388 }
389}
390
391impl From<u32> for UserId {
392 fn from(id: u32) -> Self {
393 Self::new(id)
394 }
395}
396
397impl From<&u32> for UserId {
398 fn from(id: &u32) -> Self {
399 Self::new(*id)
400 }
401}
402
403impl From<UserId> for u32 {
404 fn from(id: UserId) -> Self {
405 id.user_id
406 }
407}
408
409#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
410pub struct ConnectionId(pub u32);
411
412impl ConnectionId {
413 pub const fn new(id: u32) -> Self {
414 ConnectionId(id)
415 }
416
417 pub const fn placeholder() -> Self {
418 ConnectionId(OBJECT_ID_PLACEHOLDER)
419 }
420
421 pub fn connection_id(&self) -> u32 {
422 self.0
423 }
424}
425
426impl From<u32> for ConnectionId {
427 fn from(id: u32) -> Self {
428 Self::new(id)
429 }
430}
431
432impl From<&u32> for ConnectionId {
433 fn from(id: &u32) -> Self {
434 Self::new(*id)
435 }
436}
437
438impl From<ConnectionId> for u32 {
439 fn from(id: ConnectionId) -> Self {
440 id.0
441 }
442}
443
444#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
445pub struct SecretId(pub u32);
446
447impl SecretId {
448 pub const fn new(id: u32) -> Self {
449 SecretId(id)
450 }
451
452 pub const fn placeholder() -> Self {
453 SecretId(OBJECT_ID_PLACEHOLDER)
454 }
455
456 pub fn secret_id(&self) -> u32 {
457 self.0
458 }
459}
460
461impl From<u32> for SecretId {
462 fn from(id: u32) -> Self {
463 Self::new(id)
464 }
465}
466
467impl From<&u32> for SecretId {
468 fn from(id: &u32) -> Self {
469 Self::new(*id)
470 }
471}
472
473impl From<SecretId> for u32 {
474 fn from(id: SecretId) -> Self {
475 id.0
476 }
477}
478
479#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
480pub enum ConflictBehavior {
481 #[default]
482 NoCheck,
483 Overwrite,
484 IgnoreConflict,
485 DoUpdateIfNotNull,
486}
487
488#[macro_export]
489macro_rules! _checked_conflict_behaviors {
490 () => {
491 ConflictBehavior::Overwrite
492 | ConflictBehavior::IgnoreConflict
493 | ConflictBehavior::DoUpdateIfNotNull
494 };
495}
496pub use _checked_conflict_behaviors as checked_conflict_behaviors;
497
498impl ConflictBehavior {
499 pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
500 match tb_conflict_behavior {
501 PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
502 PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
503 PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
504 PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
507 ConflictBehavior::NoCheck
508 }
509 }
510 }
511
512 pub fn to_protobuf(self) -> PbHandleConflictBehavior {
513 match self {
514 ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
515 ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
516 ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
517 ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
518 }
519 }
520
521 pub fn debug_to_string(self) -> String {
522 match self {
523 ConflictBehavior::NoCheck => "NoCheck".to_owned(),
524 ConflictBehavior::Overwrite => "Overwrite".to_owned(),
525 ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
526 ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
527 }
528 }
529}
530
531#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
532pub enum Engine {
533 #[default]
534 Hummock,
535 Iceberg,
536}
537
538impl Engine {
539 pub fn from_protobuf(engine: &PbEngine) -> Self {
540 match engine {
541 PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
542 PbEngine::Iceberg => Engine::Iceberg,
543 }
544 }
545
546 pub fn to_protobuf(self) -> PbEngine {
547 match self {
548 Engine::Hummock => PbEngine::Hummock,
549 Engine::Iceberg => PbEngine::Iceberg,
550 }
551 }
552
553 pub fn debug_to_string(self) -> String {
554 match self {
555 Engine::Hummock => "Hummock".to_owned(),
556 Engine::Iceberg => "Iceberg".to_owned(),
557 }
558 }
559}
560
561#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
562pub enum StreamJobStatus {
563 #[default]
564 Creating,
565 Created,
566}
567
568impl StreamJobStatus {
569 pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
570 match stream_job_status {
571 PbStreamJobStatus::Creating => StreamJobStatus::Creating,
572 PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
573 }
574 }
575
576 pub fn to_proto(self) -> PbStreamJobStatus {
577 match self {
578 StreamJobStatus::Creating => PbStreamJobStatus::Creating,
579 StreamJobStatus::Created => PbStreamJobStatus::Created,
580 }
581 }
582}
583
584#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
585pub enum CreateType {
586 Foreground,
587 Background,
588}
589
590impl Default for CreateType {
591 fn default() -> Self {
592 Self::Foreground
593 }
594}
595
596impl CreateType {
597 pub fn from_proto(pb_create_type: PbCreateType) -> Self {
598 match pb_create_type {
599 PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
600 PbCreateType::Background => CreateType::Background,
601 }
602 }
603
604 pub fn to_proto(self) -> PbCreateType {
605 match self {
606 CreateType::Foreground => PbCreateType::Foreground,
607 CreateType::Background => PbCreateType::Background,
608 }
609 }
610}
611
612#[derive(Clone, Debug)]
613pub enum AlterDatabaseParam {
614 BarrierIntervalMs(Option<u32>),
617 CheckpointFrequency(Option<u64>),
618}
619
620macro_rules! for_all_fragment_type_flags {
621 () => {
622 for_all_fragment_type_flags! {
623 {
624 Source,
625 Mview,
626 Sink,
627 Now,
628 StreamScan,
629 BarrierRecv,
630 Values,
631 Dml,
632 CdcFilter,
633 Skipped1,
634 SourceScan,
635 SnapshotBackfillStreamScan,
636 FsFetch,
637 CrossDbSnapshotBackfillStreamScan
638 },
639 {},
640 0
641 }
642 };
643 (
644 {},
645 {
646 $(
647 {$flag:ident, $index:expr}
648 ),*
649 },
650 $next_index:expr
651 ) => {
652 #[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq)]
653 #[repr(u32)]
654 pub enum FragmentTypeFlag {
655 $(
656 $flag = (1 << $index),
657 )*
658 }
659
660 pub const FRAGMENT_TYPE_FLAG_LIST: [FragmentTypeFlag; $next_index] = [
661 $(
662 FragmentTypeFlag::$flag,
663 )*
664 ];
665
666 impl TryFrom<u32> for FragmentTypeFlag {
667 type Error = String;
668
669 fn try_from(value: u32) -> Result<Self, Self::Error> {
670 match value {
671 $(
672 value if value == (FragmentTypeFlag::$flag as u32) => Ok(FragmentTypeFlag::$flag),
673 )*
674 _ => Err(format!("Invalid FragmentTypeFlag value: {}", value)),
675 }
676 }
677 }
678
679 impl FragmentTypeFlag {
680 pub fn as_str_name(&self) -> &'static str {
681 match self {
682 $(
683 FragmentTypeFlag::$flag => paste::paste!{stringify!( [< $flag:snake:upper >] )},
684 )*
685 }
686 }
687 }
688 };
689 (
690 {$first:ident $(, $rest:ident)*},
691 {
692 $(
693 {$flag:ident, $index:expr}
694 ),*
695 },
696 $next_index:expr
697 ) => {
698 for_all_fragment_type_flags! {
699 {$($rest),*},
700 {
701 $({$flag, $index},)*
702 {$first, $next_index}
703 },
704 $next_index + 1
705 }
706 };
707}
708
709for_all_fragment_type_flags!();
710
711impl FragmentTypeFlag {
712 pub fn raw_flag(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> u32 {
713 flags.into_iter().fold(0, |acc, flag| acc | (flag as u32))
714 }
715
716 pub fn backfill_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
718 [FragmentTypeFlag::SourceScan, FragmentTypeFlag::StreamScan].into_iter()
719 }
720
721 pub fn source_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
724 [FragmentTypeFlag::Source, FragmentTypeFlag::FsFetch].into_iter()
725 }
726
727 pub fn sink_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
729 [FragmentTypeFlag::Sink].into_iter()
730 }
731
732 pub fn rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
734 Self::backfill_rate_limit_fragments()
735 .chain(Self::source_rate_limit_fragments())
736 .chain(Self::sink_rate_limit_fragments())
737 }
738
739 pub fn dml_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
740 [FragmentTypeFlag::Dml].into_iter()
741 }
742}
743
744#[derive(Clone, Copy, Debug, Hash, PartialOrd, PartialEq, Eq, Default)]
745pub struct FragmentTypeMask(u32);
746
747impl Binary for FragmentTypeMask {
748 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
749 write!(f, "{:b}", self.0)
750 }
751}
752
753impl From<i32> for FragmentTypeMask {
754 fn from(value: i32) -> Self {
755 Self(value as u32)
756 }
757}
758
759impl From<u32> for FragmentTypeMask {
760 fn from(value: u32) -> Self {
761 Self(value)
762 }
763}
764
765impl From<FragmentTypeMask> for u32 {
766 fn from(value: FragmentTypeMask) -> Self {
767 value.0
768 }
769}
770
771impl From<FragmentTypeMask> for i32 {
772 fn from(value: FragmentTypeMask) -> Self {
773 value.0 as _
774 }
775}
776
777impl FragmentTypeMask {
778 pub fn empty() -> Self {
779 FragmentTypeMask(0)
780 }
781
782 pub fn add(&mut self, flag: FragmentTypeFlag) {
783 self.0 |= flag as u32;
784 }
785
786 pub fn contains_any(&self, flags: impl IntoIterator<Item = FragmentTypeFlag>) -> bool {
787 let flag = FragmentTypeFlag::raw_flag(flags);
788 (self.0 & flag) != 0
789 }
790
791 pub fn contains(&self, flag: FragmentTypeFlag) -> bool {
792 self.contains_any([flag])
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use itertools::Itertools;
799 use risingwave_common::catalog::FRAGMENT_TYPE_FLAG_LIST;
800
801 use crate::catalog::FragmentTypeFlag;
802
803 #[test]
804 fn test_all_fragment_type_flag() {
805 expect_test::expect![[r#"
806 [
807 (
808 Source,
809 1,
810 "SOURCE",
811 ),
812 (
813 Mview,
814 2,
815 "MVIEW",
816 ),
817 (
818 Sink,
819 4,
820 "SINK",
821 ),
822 (
823 Now,
824 8,
825 "NOW",
826 ),
827 (
828 StreamScan,
829 16,
830 "STREAM_SCAN",
831 ),
832 (
833 BarrierRecv,
834 32,
835 "BARRIER_RECV",
836 ),
837 (
838 Values,
839 64,
840 "VALUES",
841 ),
842 (
843 Dml,
844 128,
845 "DML",
846 ),
847 (
848 CdcFilter,
849 256,
850 "CDC_FILTER",
851 ),
852 (
853 Skipped1,
854 512,
855 "SKIPPED1",
856 ),
857 (
858 SourceScan,
859 1024,
860 "SOURCE_SCAN",
861 ),
862 (
863 SnapshotBackfillStreamScan,
864 2048,
865 "SNAPSHOT_BACKFILL_STREAM_SCAN",
866 ),
867 (
868 FsFetch,
869 4096,
870 "FS_FETCH",
871 ),
872 (
873 CrossDbSnapshotBackfillStreamScan,
874 8192,
875 "CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN",
876 ),
877 ]
878 "#]]
879 .assert_debug_eq(
880 &FRAGMENT_TYPE_FLAG_LIST
881 .into_iter()
882 .map(|flag| (flag, flag as u32, flag.as_str_name()))
883 .collect_vec(),
884 );
885 for flag in FRAGMENT_TYPE_FLAG_LIST {
886 assert_eq!(FragmentTypeFlag::try_from(flag as u32).unwrap(), flag);
887 }
888 }
889}