1mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::fmt::Binary;
23use std::sync::Arc;
24
25pub use column::*;
26pub use external_table::*;
27use futures::stream::BoxStream;
28pub use internal_table::*;
29use parse_display::Display;
30pub use physical_table::*;
31use risingwave_pb::catalog::table::PbEngine;
32use risingwave_pb::catalog::{
33 CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
34 StreamJobStatus as PbStreamJobStatus,
35};
36use risingwave_pb::plan_common::ColumnDescVersion;
37pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
38use serde::{Deserialize, Serialize};
39
40use crate::array::DataChunk;
41pub use crate::constants::hummock;
42use crate::error::BoxedError;
43
44pub type CatalogVersion = u64;
46
47pub type TableVersionId = u64;
49pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
51pub type SourceVersionId = u64;
53pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
55
56pub const DEFAULT_DATABASE_NAME: &str = "dev";
57pub const DEFAULT_SCHEMA_NAME: &str = "public";
58pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
59pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
60pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
61pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
62pub const DEFAULT_SUPER_USER: &str = "root";
63pub const DEFAULT_SUPER_USER_ID: u32 = 1;
64pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
66pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
67
68pub const DEFAULT_SUPER_USER_FOR_ADMIN: &str = "rwadmin";
70pub const DEFAULT_SUPER_USER_FOR_ADMIN_ID: u32 = 3;
71
72pub const NON_RESERVED_USER_ID: i32 = 11;
73
74pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
75pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
76
77pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
78
79pub const SYSTEM_SCHEMAS: [&str; 3] = [
80 PG_CATALOG_SCHEMA_NAME,
81 INFORMATION_SCHEMA_SCHEMA_NAME,
82 RW_CATALOG_SCHEMA_NAME,
83];
84pub fn is_system_schema(schema_name: &str) -> bool {
85 SYSTEM_SCHEMAS.contains(&schema_name)
86}
87
88pub fn is_reserved_admin_user(user_name: &str) -> bool {
89 user_name == DEFAULT_SUPER_USER_FOR_ADMIN
90}
91
92pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
93
94pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
98
99pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
100 match version {
101 ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
102 _ => DEFAULT_KEY_COLUMN_NAME,
103 }
104}
105
106pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
111
112pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
117
118pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
119pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
121
122pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
126
127pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
128pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
129
130pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
131pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
132pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
133
134pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
135pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
138pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
139
140pub trait SysCatalogReader: Sync + Send + 'static {
142 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
144}
145
146pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
147
148pub type ObjectId = u32;
149
150#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
151#[display("{database_id}")]
152pub struct DatabaseId {
153 pub database_id: u32,
154}
155
156impl DatabaseId {
157 pub const fn new(database_id: u32) -> Self {
158 DatabaseId { database_id }
159 }
160
161 pub fn placeholder() -> Self {
162 DatabaseId {
163 database_id: OBJECT_ID_PLACEHOLDER,
164 }
165 }
166}
167
168impl From<u32> for DatabaseId {
169 fn from(id: u32) -> Self {
170 Self::new(id)
171 }
172}
173
174impl From<&u32> for DatabaseId {
175 fn from(id: &u32) -> Self {
176 Self::new(*id)
177 }
178}
179
180impl From<DatabaseId> for u32 {
181 fn from(id: DatabaseId) -> Self {
182 id.database_id
183 }
184}
185
186#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)]
187#[display("{schema_id}")]
188pub struct SchemaId {
189 pub schema_id: u32,
190}
191
192impl SchemaId {
193 pub fn new(schema_id: u32) -> Self {
194 SchemaId { schema_id }
195 }
196
197 pub fn placeholder() -> Self {
198 SchemaId {
199 schema_id: OBJECT_ID_PLACEHOLDER,
200 }
201 }
202}
203
204impl From<u32> for SchemaId {
205 fn from(id: u32) -> Self {
206 Self::new(id)
207 }
208}
209
210impl From<&u32> for SchemaId {
211 fn from(id: &u32) -> Self {
212 Self::new(*id)
213 }
214}
215
216impl From<SchemaId> for u32 {
217 fn from(id: SchemaId) -> Self {
218 id.schema_id
219 }
220}
221
222#[derive(
223 Clone,
224 Copy,
225 Debug,
226 Display,
227 Default,
228 Hash,
229 PartialOrd,
230 PartialEq,
231 Eq,
232 Ord,
233 Serialize,
234 Deserialize,
235)]
236#[display("{table_id}")]
237pub struct TableId {
238 pub table_id: u32,
239}
240
241impl TableId {
242 pub const fn new(table_id: u32) -> Self {
243 TableId { table_id }
244 }
245
246 pub const fn placeholder() -> Self {
248 TableId {
249 table_id: OBJECT_ID_PLACEHOLDER,
250 }
251 }
252
253 pub fn table_id(&self) -> u32 {
254 self.table_id
255 }
256}
257
258impl From<u32> for TableId {
259 fn from(id: u32) -> Self {
260 Self::new(id)
261 }
262}
263
264impl From<&u32> for TableId {
265 fn from(id: &u32) -> Self {
266 Self::new(*id)
267 }
268}
269
270impl From<TableId> for u32 {
271 fn from(id: TableId) -> Self {
272 id.table_id
273 }
274}
275
276#[derive(Clone, Debug, PartialEq, Default, Copy)]
277pub struct TableOption {
278 pub retention_seconds: Option<u32>, }
280
281impl From<&risingwave_pb::hummock::TableOption> for TableOption {
282 fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
283 Self {
284 retention_seconds: table_option.retention_seconds,
285 }
286 }
287}
288
289impl From<&TableOption> for risingwave_pb::hummock::TableOption {
290 fn from(table_option: &TableOption) -> Self {
291 Self {
292 retention_seconds: table_option.retention_seconds,
293 }
294 }
295}
296
297impl TableOption {
298 pub fn new(retention_seconds: Option<u32>) -> Self {
299 TableOption { retention_seconds }
301 }
302}
303
304#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq)]
305#[display("{index_id}")]
306pub struct IndexId {
307 pub index_id: u32,
308}
309
310impl IndexId {
311 pub const fn new(index_id: u32) -> Self {
312 IndexId { index_id }
313 }
314
315 pub const fn placeholder() -> Self {
317 IndexId {
318 index_id: OBJECT_ID_PLACEHOLDER,
319 }
320 }
321
322 pub fn index_id(&self) -> u32 {
323 self.index_id
324 }
325}
326
327impl From<u32> for IndexId {
328 fn from(id: u32) -> Self {
329 Self::new(id)
330 }
331}
332impl From<IndexId> for u32 {
333 fn from(id: IndexId) -> Self {
334 id.index_id
335 }
336}
337
338#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
339pub struct FunctionId(pub u32);
340
341impl FunctionId {
342 pub const fn new(id: u32) -> Self {
343 FunctionId(id)
344 }
345
346 pub const fn placeholder() -> Self {
347 FunctionId(OBJECT_ID_PLACEHOLDER)
348 }
349
350 pub fn function_id(&self) -> u32 {
351 self.0
352 }
353}
354
355impl From<u32> for FunctionId {
356 fn from(id: u32) -> Self {
357 Self::new(id)
358 }
359}
360
361impl From<&u32> for FunctionId {
362 fn from(id: &u32) -> Self {
363 Self::new(*id)
364 }
365}
366
367impl From<FunctionId> for u32 {
368 fn from(id: FunctionId) -> Self {
369 id.0
370 }
371}
372
373#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
374#[display("{user_id}")]
375pub struct UserId {
376 pub user_id: u32,
377}
378
379impl UserId {
380 pub const fn new(user_id: u32) -> Self {
381 UserId { user_id }
382 }
383
384 pub const fn placeholder() -> Self {
385 UserId {
386 user_id: OBJECT_ID_PLACEHOLDER,
387 }
388 }
389}
390
391impl From<u32> for UserId {
392 fn from(id: u32) -> Self {
393 Self::new(id)
394 }
395}
396
397impl From<&u32> for UserId {
398 fn from(id: &u32) -> Self {
399 Self::new(*id)
400 }
401}
402
403impl From<UserId> for u32 {
404 fn from(id: UserId) -> Self {
405 id.user_id
406 }
407}
408
409#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
410pub struct ConnectionId(pub u32);
411
412impl ConnectionId {
413 pub const fn new(id: u32) -> Self {
414 ConnectionId(id)
415 }
416
417 pub const fn placeholder() -> Self {
418 ConnectionId(OBJECT_ID_PLACEHOLDER)
419 }
420
421 pub fn connection_id(&self) -> u32 {
422 self.0
423 }
424}
425
426impl From<u32> for ConnectionId {
427 fn from(id: u32) -> Self {
428 Self::new(id)
429 }
430}
431
432impl From<&u32> for ConnectionId {
433 fn from(id: &u32) -> Self {
434 Self::new(*id)
435 }
436}
437
438impl From<ConnectionId> for u32 {
439 fn from(id: ConnectionId) -> Self {
440 id.0
441 }
442}
443
444#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
445pub struct SecretId(pub u32);
446
447impl SecretId {
448 pub const fn new(id: u32) -> Self {
449 SecretId(id)
450 }
451
452 pub const fn placeholder() -> Self {
453 SecretId(OBJECT_ID_PLACEHOLDER)
454 }
455
456 pub fn secret_id(&self) -> u32 {
457 self.0
458 }
459}
460
461impl From<u32> for SecretId {
462 fn from(id: u32) -> Self {
463 Self::new(id)
464 }
465}
466
467impl From<&u32> for SecretId {
468 fn from(id: &u32) -> Self {
469 Self::new(*id)
470 }
471}
472
473impl From<SecretId> for u32 {
474 fn from(id: SecretId) -> Self {
475 id.0
476 }
477}
478
479#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
480pub enum ConflictBehavior {
481 #[default]
482 NoCheck,
483 Overwrite,
484 IgnoreConflict,
485 DoUpdateIfNotNull,
486}
487
488#[macro_export]
489macro_rules! _checked_conflict_behaviors {
490 () => {
491 ConflictBehavior::Overwrite
492 | ConflictBehavior::IgnoreConflict
493 | ConflictBehavior::DoUpdateIfNotNull
494 };
495}
496pub use _checked_conflict_behaviors as checked_conflict_behaviors;
497
498impl ConflictBehavior {
499 pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
500 match tb_conflict_behavior {
501 PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
502 PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
503 PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
504 PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
507 ConflictBehavior::NoCheck
508 }
509 }
510 }
511
512 pub fn to_protobuf(self) -> PbHandleConflictBehavior {
513 match self {
514 ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
515 ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
516 ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
517 ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
518 }
519 }
520
521 pub fn debug_to_string(self) -> String {
522 match self {
523 ConflictBehavior::NoCheck => "NoCheck".to_owned(),
524 ConflictBehavior::Overwrite => "Overwrite".to_owned(),
525 ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
526 ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
527 }
528 }
529}
530
531#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
532pub enum Engine {
533 #[default]
534 Hummock,
535 Iceberg,
536}
537
538impl Engine {
539 pub fn from_protobuf(engine: &PbEngine) -> Self {
540 match engine {
541 PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
542 PbEngine::Iceberg => Engine::Iceberg,
543 }
544 }
545
546 pub fn to_protobuf(self) -> PbEngine {
547 match self {
548 Engine::Hummock => PbEngine::Hummock,
549 Engine::Iceberg => PbEngine::Iceberg,
550 }
551 }
552
553 pub fn debug_to_string(self) -> String {
554 match self {
555 Engine::Hummock => "Hummock".to_owned(),
556 Engine::Iceberg => "Iceberg".to_owned(),
557 }
558 }
559}
560
561#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
562pub enum StreamJobStatus {
563 #[default]
564 Creating,
565 Created,
566}
567
568impl StreamJobStatus {
569 pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
570 match stream_job_status {
571 PbStreamJobStatus::Creating => StreamJobStatus::Creating,
572 PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
573 }
574 }
575
576 pub fn to_proto(self) -> PbStreamJobStatus {
577 match self {
578 StreamJobStatus::Creating => PbStreamJobStatus::Creating,
579 StreamJobStatus::Created => PbStreamJobStatus::Created,
580 }
581 }
582}
583
584#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
585pub enum CreateType {
586 Foreground,
587 Background,
588}
589
590impl Default for CreateType {
591 fn default() -> Self {
592 Self::Foreground
593 }
594}
595
596impl CreateType {
597 pub fn from_proto(pb_create_type: PbCreateType) -> Self {
598 match pb_create_type {
599 PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
600 PbCreateType::Background => CreateType::Background,
601 }
602 }
603
604 pub fn to_proto(self) -> PbCreateType {
605 match self {
606 CreateType::Foreground => PbCreateType::Foreground,
607 CreateType::Background => PbCreateType::Background,
608 }
609 }
610}
611
612#[derive(Clone, Debug)]
613pub enum AlterDatabaseParam {
614 BarrierIntervalMs(Option<u32>),
617 CheckpointFrequency(Option<u64>),
618}
619
620macro_rules! for_all_fragment_type_flags {
621 () => {
622 for_all_fragment_type_flags! {
623 {
624 Source,
625 Mview,
626 Sink,
627 Now,
628 StreamScan,
629 BarrierRecv,
630 Values,
631 Dml,
632 CdcFilter,
633 Skipped1,
634 SourceScan,
635 SnapshotBackfillStreamScan,
636 FsFetch,
637 CrossDbSnapshotBackfillStreamScan,
638 StreamCdcScan
639 },
640 {},
641 0
642 }
643 };
644 (
645 {},
646 {
647 $(
648 {$flag:ident, $index:expr}
649 ),*
650 },
651 $next_index:expr
652 ) => {
653 #[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq)]
654 #[repr(u32)]
655 pub enum FragmentTypeFlag {
656 $(
657 $flag = (1 << $index),
658 )*
659 }
660
661 pub const FRAGMENT_TYPE_FLAG_LIST: [FragmentTypeFlag; $next_index] = [
662 $(
663 FragmentTypeFlag::$flag,
664 )*
665 ];
666
667 impl TryFrom<u32> for FragmentTypeFlag {
668 type Error = String;
669
670 fn try_from(value: u32) -> Result<Self, Self::Error> {
671 match value {
672 $(
673 value if value == (FragmentTypeFlag::$flag as u32) => Ok(FragmentTypeFlag::$flag),
674 )*
675 _ => Err(format!("Invalid FragmentTypeFlag value: {}", value)),
676 }
677 }
678 }
679
680 impl FragmentTypeFlag {
681 pub fn as_str_name(&self) -> &'static str {
682 match self {
683 $(
684 FragmentTypeFlag::$flag => paste::paste!{stringify!( [< $flag:snake:upper >] )},
685 )*
686 }
687 }
688 }
689 };
690 (
691 {$first:ident $(, $rest:ident)*},
692 {
693 $(
694 {$flag:ident, $index:expr}
695 ),*
696 },
697 $next_index:expr
698 ) => {
699 for_all_fragment_type_flags! {
700 {$($rest),*},
701 {
702 $({$flag, $index},)*
703 {$first, $next_index}
704 },
705 $next_index + 1
706 }
707 };
708}
709
710for_all_fragment_type_flags!();
711
712impl FragmentTypeFlag {
713 pub fn raw_flag(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> u32 {
714 flags.into_iter().fold(0, |acc, flag| acc | (flag as u32))
715 }
716
717 pub fn backfill_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
719 [FragmentTypeFlag::SourceScan, FragmentTypeFlag::StreamScan].into_iter()
720 }
721
722 pub fn source_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
725 [FragmentTypeFlag::Source, FragmentTypeFlag::FsFetch].into_iter()
726 }
727
728 pub fn sink_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
730 [FragmentTypeFlag::Sink].into_iter()
731 }
732
733 pub fn rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
735 Self::backfill_rate_limit_fragments()
736 .chain(Self::source_rate_limit_fragments())
737 .chain(Self::sink_rate_limit_fragments())
738 }
739
740 pub fn dml_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
741 [FragmentTypeFlag::Dml].into_iter()
742 }
743}
744
745#[derive(Clone, Copy, Debug, Hash, PartialOrd, PartialEq, Eq, Default)]
746pub struct FragmentTypeMask(u32);
747
748impl Binary for FragmentTypeMask {
749 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
750 write!(f, "{:b}", self.0)
751 }
752}
753
754impl From<i32> for FragmentTypeMask {
755 fn from(value: i32) -> Self {
756 Self(value as u32)
757 }
758}
759
760impl From<u32> for FragmentTypeMask {
761 fn from(value: u32) -> Self {
762 Self(value)
763 }
764}
765
766impl From<FragmentTypeMask> for u32 {
767 fn from(value: FragmentTypeMask) -> Self {
768 value.0
769 }
770}
771
772impl From<FragmentTypeMask> for i32 {
773 fn from(value: FragmentTypeMask) -> Self {
774 value.0 as _
775 }
776}
777
778impl FragmentTypeMask {
779 pub fn empty() -> Self {
780 FragmentTypeMask(0)
781 }
782
783 pub fn add(&mut self, flag: FragmentTypeFlag) {
784 self.0 |= flag as u32;
785 }
786
787 pub fn contains_any(&self, flags: impl IntoIterator<Item = FragmentTypeFlag>) -> bool {
788 let flag = FragmentTypeFlag::raw_flag(flags);
789 (self.0 & flag) != 0
790 }
791
792 pub fn contains(&self, flag: FragmentTypeFlag) -> bool {
793 self.contains_any([flag])
794 }
795}
796
797#[cfg(test)]
798mod tests {
799 use itertools::Itertools;
800 use risingwave_common::catalog::FRAGMENT_TYPE_FLAG_LIST;
801
802 use crate::catalog::FragmentTypeFlag;
803
804 #[test]
805 fn test_all_fragment_type_flag() {
806 expect_test::expect![[r#"
807 [
808 (
809 Source,
810 1,
811 "SOURCE",
812 ),
813 (
814 Mview,
815 2,
816 "MVIEW",
817 ),
818 (
819 Sink,
820 4,
821 "SINK",
822 ),
823 (
824 Now,
825 8,
826 "NOW",
827 ),
828 (
829 StreamScan,
830 16,
831 "STREAM_SCAN",
832 ),
833 (
834 BarrierRecv,
835 32,
836 "BARRIER_RECV",
837 ),
838 (
839 Values,
840 64,
841 "VALUES",
842 ),
843 (
844 Dml,
845 128,
846 "DML",
847 ),
848 (
849 CdcFilter,
850 256,
851 "CDC_FILTER",
852 ),
853 (
854 Skipped1,
855 512,
856 "SKIPPED1",
857 ),
858 (
859 SourceScan,
860 1024,
861 "SOURCE_SCAN",
862 ),
863 (
864 SnapshotBackfillStreamScan,
865 2048,
866 "SNAPSHOT_BACKFILL_STREAM_SCAN",
867 ),
868 (
869 FsFetch,
870 4096,
871 "FS_FETCH",
872 ),
873 (
874 CrossDbSnapshotBackfillStreamScan,
875 8192,
876 "CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN",
877 ),
878 (
879 StreamCdcScan,
880 16384,
881 "STREAM_CDC_SCAN",
882 ),
883 ]
884 "#]]
885 .assert_debug_eq(
886 &FRAGMENT_TYPE_FLAG_LIST
887 .into_iter()
888 .map(|flag| (flag, flag as u32, flag.as_str_name()))
889 .collect_vec(),
890 );
891 for flag in FRAGMENT_TYPE_FLAG_LIST {
892 assert_eq!(FragmentTypeFlag::try_from(flag as u32).unwrap(), flag);
893 }
894 }
895}