risingwave_common/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::fmt::Binary;
23use std::sync::Arc;
24
25pub use column::*;
26pub use external_table::*;
27use futures::stream::BoxStream;
28pub use internal_table::*;
29use parse_display::Display;
30pub use physical_table::*;
31use risingwave_pb::catalog::table::PbEngine;
32use risingwave_pb::catalog::{
33    CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
34    StreamJobStatus as PbStreamJobStatus,
35};
36use risingwave_pb::plan_common::ColumnDescVersion;
37pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
38
39use crate::array::DataChunk;
40pub use crate::constants::hummock;
41use crate::error::BoxedError;
42pub use crate::id::TableId;
43
44/// The global version of the catalog.
45pub type CatalogVersion = u64;
46
47/// The version number of the per-table catalog.
48pub type TableVersionId = u64;
49/// The default version ID for a new table.
50pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
51/// The version number of the per-source catalog.
52pub type SourceVersionId = u64;
53/// The default version ID for a new source.
54pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
55
56pub const DEFAULT_DATABASE_NAME: &str = "dev";
57pub const DEFAULT_SCHEMA_NAME: &str = "public";
58pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
59pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
60pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
61pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
62pub const DEFAULT_SUPER_USER: &str = "root";
63pub const DEFAULT_SUPER_USER_ID: u32 = 1;
64// This is for compatibility with customized utils for PostgreSQL.
65pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
66pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
67
68// This is the default superuser for admin, which is used only for cloud control plane.
69pub const DEFAULT_SUPER_USER_FOR_ADMIN: &str = "rwadmin";
70pub const DEFAULT_SUPER_USER_FOR_ADMIN_ID: u32 = 3;
71
72pub const NON_RESERVED_USER_ID: i32 = 11;
73
74pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
75pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
76
77pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
78
79pub const SYSTEM_SCHEMAS: [&str; 3] = [
80    PG_CATALOG_SCHEMA_NAME,
81    INFORMATION_SCHEMA_SCHEMA_NAME,
82    RW_CATALOG_SCHEMA_NAME,
83];
84pub fn is_system_schema(schema_name: &str) -> bool {
85    SYSTEM_SCHEMAS.contains(&schema_name)
86}
87
88pub fn is_reserved_admin_user(user_name: &str) -> bool {
89    user_name == DEFAULT_SUPER_USER_FOR_ADMIN
90}
91
92pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
93
94/// When there is no primary key specified while creating source, will use
95/// the message key as primary key in `BYTEA` type with this name.
96/// Note: the field has version to track, please refer to [`default_key_column_name_version_mapping`]
97pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
98
99pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
100    match version {
101        ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
102        _ => DEFAULT_KEY_COLUMN_NAME,
103    }
104}
105
106/// For kafka source, we attach a hidden column [`KAFKA_TIMESTAMP_COLUMN_NAME`] to it, so that we
107/// can limit the timestamp range when querying it directly with batch query. The column type is
108/// [`crate::types::DataType::Timestamptz`]. For more details, please refer to
109/// [this rfc](https://github.com/risingwavelabs/rfcs/pull/20).
110pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
111
112/// RisingWave iceberg table engine will create the column `_risingwave_iceberg_row_id` in the iceberg table.
113///
114/// Iceberg V3 spec use `_row_id` as a reserved column name for row lineage, so if the table without primary key,
115/// we can't use `_row_id` directly for iceberg, so use `_risingwave_iceberg_row_id` instead.
116pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
117
118pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
119/// The column ID preserved for the row ID column.
120pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
121
122/// The column ID offset for user-defined columns.
123///
124/// All IDs of user-defined columns must be greater or equal to this value.
125pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
126
127pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
128pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
129
130pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
131pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
132pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
133
134pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
135/// The number of columns output by the cdc source job
136/// see [`ColumnCatalog::debezium_cdc_source_cols()`] for details
137pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
138pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
139
140/// The local system catalog reader in the frontend node.
141pub trait SysCatalogReader: Sync + Send + 'static {
142    /// Reads the data of the system catalog table.
143    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
144}
145
146pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
147
148pub type ObjectId = u32;
149
150#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
151#[display("{database_id}")]
152pub struct DatabaseId {
153    pub database_id: u32,
154}
155
156impl DatabaseId {
157    pub const fn new(database_id: u32) -> Self {
158        DatabaseId { database_id }
159    }
160
161    pub fn placeholder() -> Self {
162        DatabaseId {
163            database_id: OBJECT_ID_PLACEHOLDER,
164        }
165    }
166}
167
168impl From<u32> for DatabaseId {
169    fn from(id: u32) -> Self {
170        Self::new(id)
171    }
172}
173
174impl From<&u32> for DatabaseId {
175    fn from(id: &u32) -> Self {
176        Self::new(*id)
177    }
178}
179
180impl From<DatabaseId> for u32 {
181    fn from(id: DatabaseId) -> Self {
182        id.database_id
183    }
184}
185
186#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)]
187#[display("{schema_id}")]
188pub struct SchemaId {
189    pub schema_id: u32,
190}
191
192impl SchemaId {
193    pub fn new(schema_id: u32) -> Self {
194        SchemaId { schema_id }
195    }
196
197    pub fn placeholder() -> Self {
198        SchemaId {
199            schema_id: OBJECT_ID_PLACEHOLDER,
200        }
201    }
202}
203
204impl From<u32> for SchemaId {
205    fn from(id: u32) -> Self {
206        Self::new(id)
207    }
208}
209
210impl From<&u32> for SchemaId {
211    fn from(id: &u32) -> Self {
212        Self::new(*id)
213    }
214}
215
216impl From<SchemaId> for u32 {
217    fn from(id: SchemaId) -> Self {
218        id.schema_id
219    }
220}
221
222impl TableId {
223    /// Sometimes the id field is filled later, we use this value for better debugging.
224    pub const fn placeholder() -> Self {
225        TableId {
226            inner: OBJECT_ID_PLACEHOLDER,
227        }
228    }
229
230    pub fn is_placeholder(&self) -> bool {
231        self.inner == OBJECT_ID_PLACEHOLDER
232    }
233}
234
235#[derive(Clone, Debug, PartialEq, Default, Copy)]
236pub struct TableOption {
237    pub retention_seconds: Option<u32>, // second
238}
239
240impl From<&risingwave_pb::hummock::TableOption> for TableOption {
241    fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
242        Self {
243            retention_seconds: table_option.retention_seconds,
244        }
245    }
246}
247
248impl From<&TableOption> for risingwave_pb::hummock::TableOption {
249    fn from(table_option: &TableOption) -> Self {
250        Self {
251            retention_seconds: table_option.retention_seconds,
252        }
253    }
254}
255
256impl TableOption {
257    pub fn new(retention_seconds: Option<u32>) -> Self {
258        // now we only support ttl for TableOption
259        TableOption { retention_seconds }
260    }
261}
262
263#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq)]
264#[display("{index_id}")]
265pub struct IndexId {
266    pub index_id: u32,
267}
268
269impl IndexId {
270    pub const fn new(index_id: u32) -> Self {
271        IndexId { index_id }
272    }
273
274    /// Sometimes the id field is filled later, we use this value for better debugging.
275    pub const fn placeholder() -> Self {
276        IndexId {
277            index_id: OBJECT_ID_PLACEHOLDER,
278        }
279    }
280
281    pub fn index_id(&self) -> u32 {
282        self.index_id
283    }
284}
285
286impl From<u32> for IndexId {
287    fn from(id: u32) -> Self {
288        Self::new(id)
289    }
290}
291impl From<IndexId> for u32 {
292    fn from(id: IndexId) -> Self {
293        id.index_id
294    }
295}
296
297#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
298pub struct FunctionId(pub u32);
299
300impl FunctionId {
301    pub const fn new(id: u32) -> Self {
302        FunctionId(id)
303    }
304
305    pub const fn placeholder() -> Self {
306        FunctionId(OBJECT_ID_PLACEHOLDER)
307    }
308
309    pub fn function_id(&self) -> u32 {
310        self.0
311    }
312}
313
314impl From<u32> for FunctionId {
315    fn from(id: u32) -> Self {
316        Self::new(id)
317    }
318}
319
320impl From<&u32> for FunctionId {
321    fn from(id: &u32) -> Self {
322        Self::new(*id)
323    }
324}
325
326impl From<FunctionId> for u32 {
327    fn from(id: FunctionId) -> Self {
328        id.0
329    }
330}
331
332#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
333#[display("{user_id}")]
334pub struct UserId {
335    pub user_id: u32,
336}
337
338impl UserId {
339    pub const fn new(user_id: u32) -> Self {
340        UserId { user_id }
341    }
342
343    pub const fn placeholder() -> Self {
344        UserId {
345            user_id: OBJECT_ID_PLACEHOLDER,
346        }
347    }
348}
349
350impl From<u32> for UserId {
351    fn from(id: u32) -> Self {
352        Self::new(id)
353    }
354}
355
356impl From<&u32> for UserId {
357    fn from(id: &u32) -> Self {
358        Self::new(*id)
359    }
360}
361
362impl From<UserId> for u32 {
363    fn from(id: UserId) -> Self {
364        id.user_id
365    }
366}
367
368#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
369pub struct ConnectionId(pub u32);
370
371impl ConnectionId {
372    pub const fn new(id: u32) -> Self {
373        ConnectionId(id)
374    }
375
376    pub const fn placeholder() -> Self {
377        ConnectionId(OBJECT_ID_PLACEHOLDER)
378    }
379
380    pub fn connection_id(&self) -> u32 {
381        self.0
382    }
383}
384
385impl From<u32> for ConnectionId {
386    fn from(id: u32) -> Self {
387        Self::new(id)
388    }
389}
390
391impl From<&u32> for ConnectionId {
392    fn from(id: &u32) -> Self {
393        Self::new(*id)
394    }
395}
396
397impl From<ConnectionId> for u32 {
398    fn from(id: ConnectionId) -> Self {
399        id.0
400    }
401}
402
403#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
404pub struct SecretId(pub u32);
405
406impl SecretId {
407    pub const fn new(id: u32) -> Self {
408        SecretId(id)
409    }
410
411    pub const fn placeholder() -> Self {
412        SecretId(OBJECT_ID_PLACEHOLDER)
413    }
414
415    pub fn secret_id(&self) -> u32 {
416        self.0
417    }
418}
419
420impl From<u32> for SecretId {
421    fn from(id: u32) -> Self {
422        Self::new(id)
423    }
424}
425
426impl From<&u32> for SecretId {
427    fn from(id: &u32) -> Self {
428        Self::new(*id)
429    }
430}
431
432impl From<SecretId> for u32 {
433    fn from(id: SecretId) -> Self {
434        id.0
435    }
436}
437
438#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
439pub enum ConflictBehavior {
440    #[default]
441    NoCheck,
442    Overwrite,
443    IgnoreConflict,
444    DoUpdateIfNotNull,
445}
446
447#[macro_export]
448macro_rules! _checked_conflict_behaviors {
449    () => {
450        ConflictBehavior::Overwrite
451            | ConflictBehavior::IgnoreConflict
452            | ConflictBehavior::DoUpdateIfNotNull
453    };
454}
455pub use _checked_conflict_behaviors as checked_conflict_behaviors;
456
457impl ConflictBehavior {
458    pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
459        match tb_conflict_behavior {
460            PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
461            PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
462            PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
463            // This is for backward compatibility, in the previous version
464            // `HandleConflictBehavior::Unspecified` represented `NoCheck`, so just treat it as `NoCheck`.
465            PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
466                ConflictBehavior::NoCheck
467            }
468        }
469    }
470
471    pub fn to_protobuf(self) -> PbHandleConflictBehavior {
472        match self {
473            ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
474            ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
475            ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
476            ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
477        }
478    }
479
480    pub fn debug_to_string(self) -> String {
481        match self {
482            ConflictBehavior::NoCheck => "NoCheck".to_owned(),
483            ConflictBehavior::Overwrite => "Overwrite".to_owned(),
484            ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
485            ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
486        }
487    }
488}
489
490#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
491pub enum Engine {
492    #[default]
493    Hummock,
494    Iceberg,
495}
496
497impl Engine {
498    pub fn from_protobuf(engine: &PbEngine) -> Self {
499        match engine {
500            PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
501            PbEngine::Iceberg => Engine::Iceberg,
502        }
503    }
504
505    pub fn to_protobuf(self) -> PbEngine {
506        match self {
507            Engine::Hummock => PbEngine::Hummock,
508            Engine::Iceberg => PbEngine::Iceberg,
509        }
510    }
511
512    pub fn debug_to_string(self) -> String {
513        match self {
514            Engine::Hummock => "Hummock".to_owned(),
515            Engine::Iceberg => "Iceberg".to_owned(),
516        }
517    }
518}
519
520#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
521pub enum StreamJobStatus {
522    #[default]
523    Creating,
524    Created,
525}
526
527impl StreamJobStatus {
528    pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
529        match stream_job_status {
530            PbStreamJobStatus::Creating => StreamJobStatus::Creating,
531            PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
532        }
533    }
534
535    pub fn to_proto(self) -> PbStreamJobStatus {
536        match self {
537            StreamJobStatus::Creating => PbStreamJobStatus::Creating,
538            StreamJobStatus::Created => PbStreamJobStatus::Created,
539        }
540    }
541}
542
543#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord, Default)]
544pub enum CreateType {
545    #[default]
546    Foreground,
547    Background,
548}
549
550impl CreateType {
551    pub fn from_proto(pb_create_type: PbCreateType) -> Self {
552        match pb_create_type {
553            PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
554            PbCreateType::Background => CreateType::Background,
555        }
556    }
557
558    pub fn to_proto(self) -> PbCreateType {
559        match self {
560            CreateType::Foreground => PbCreateType::Foreground,
561            CreateType::Background => PbCreateType::Background,
562        }
563    }
564}
565
566#[derive(Clone, Debug)]
567pub enum AlterDatabaseParam {
568    // Barrier related parameters, per database.
569    // None represents the default value, which means it follows `SystemParams`.
570    BarrierIntervalMs(Option<u32>),
571    CheckpointFrequency(Option<u64>),
572}
573
574macro_rules! for_all_fragment_type_flags {
575    () => {
576        for_all_fragment_type_flags! {
577            {
578                Source,
579                Mview,
580                Sink,
581                Now,
582                StreamScan,
583                BarrierRecv,
584                Values,
585                Dml,
586                CdcFilter,
587                Skipped1,
588                SourceScan,
589                SnapshotBackfillStreamScan,
590                FsFetch,
591                CrossDbSnapshotBackfillStreamScan,
592                StreamCdcScan,
593                VectorIndexWrite,
594                UpstreamSinkUnion,
595                LocalityProvider
596            },
597            {},
598            0
599        }
600    };
601    (
602        {},
603        {
604            $(
605                {$flag:ident, $index:expr}
606            ),*
607        },
608        $next_index:expr
609    ) => {
610        #[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq)]
611        #[repr(u32)]
612        pub enum FragmentTypeFlag {
613            $(
614                $flag = (1 << $index),
615            )*
616        }
617
618        pub const FRAGMENT_TYPE_FLAG_LIST: [FragmentTypeFlag; $next_index] = [
619            $(
620                FragmentTypeFlag::$flag,
621            )*
622        ];
623
624        impl TryFrom<u32> for FragmentTypeFlag {
625            type Error = String;
626
627            fn try_from(value: u32) -> Result<Self, Self::Error> {
628                match value {
629                    $(
630                        value if value == (FragmentTypeFlag::$flag as u32) => Ok(FragmentTypeFlag::$flag),
631                    )*
632                    _ => Err(format!("Invalid FragmentTypeFlag value: {}", value)),
633                }
634            }
635        }
636
637        impl FragmentTypeFlag {
638            pub fn as_str_name(&self) -> &'static str {
639                match self {
640                    $(
641                        FragmentTypeFlag::$flag => paste::paste!{stringify!( [< $flag:snake:upper >] )},
642                    )*
643                }
644            }
645        }
646    };
647    (
648        {$first:ident $(, $rest:ident)*},
649        {
650            $(
651                {$flag:ident, $index:expr}
652            ),*
653        },
654        $next_index:expr
655    ) => {
656        for_all_fragment_type_flags! {
657            {$($rest),*},
658            {
659                $({$flag, $index},)*
660                {$first, $next_index}
661            },
662            $next_index + 1
663        }
664    };
665}
666
667for_all_fragment_type_flags!();
668
669impl FragmentTypeFlag {
670    pub fn raw_flag(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> u32 {
671        flags.into_iter().fold(0, |acc, flag| acc | (flag as u32))
672    }
673
674    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
675    pub fn backfill_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
676        [FragmentTypeFlag::SourceScan, FragmentTypeFlag::StreamScan].into_iter()
677    }
678
679    /// Fragments that may be affected by `SOURCE_RATE_LIMIT`.
680    /// Note: for `FsFetch`, old fragments don't have this flag set, so don't use this to check.
681    pub fn source_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
682        [FragmentTypeFlag::Source, FragmentTypeFlag::FsFetch].into_iter()
683    }
684
685    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
686    pub fn sink_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
687        [FragmentTypeFlag::Sink].into_iter()
688    }
689
690    /// Note: this doesn't include `FsFetch` created in old versions.
691    pub fn rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
692        Self::backfill_rate_limit_fragments()
693            .chain(Self::source_rate_limit_fragments())
694            .chain(Self::sink_rate_limit_fragments())
695    }
696
697    pub fn dml_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
698        [FragmentTypeFlag::Dml].into_iter()
699    }
700}
701
702#[derive(Clone, Copy, Debug, Hash, PartialOrd, PartialEq, Eq, Default)]
703pub struct FragmentTypeMask(u32);
704
705impl Binary for FragmentTypeMask {
706    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707        write!(f, "{:b}", self.0)
708    }
709}
710
711impl From<i32> for FragmentTypeMask {
712    fn from(value: i32) -> Self {
713        Self(value as u32)
714    }
715}
716
717impl From<u32> for FragmentTypeMask {
718    fn from(value: u32) -> Self {
719        Self(value)
720    }
721}
722
723impl From<FragmentTypeMask> for u32 {
724    fn from(value: FragmentTypeMask) -> Self {
725        value.0
726    }
727}
728
729impl From<FragmentTypeMask> for i32 {
730    fn from(value: FragmentTypeMask) -> Self {
731        value.0 as _
732    }
733}
734
735impl FragmentTypeMask {
736    pub fn empty() -> Self {
737        FragmentTypeMask(0)
738    }
739
740    pub fn add(&mut self, flag: FragmentTypeFlag) {
741        self.0 |= flag as u32;
742    }
743
744    pub fn contains_any(&self, flags: impl IntoIterator<Item = FragmentTypeFlag>) -> bool {
745        let flag = FragmentTypeFlag::raw_flag(flags);
746        (self.0 & flag) != 0
747    }
748
749    pub fn contains(&self, flag: FragmentTypeFlag) -> bool {
750        self.contains_any([flag])
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use itertools::Itertools;
757    use risingwave_common::catalog::FRAGMENT_TYPE_FLAG_LIST;
758
759    use crate::catalog::FragmentTypeFlag;
760
761    #[test]
762    fn test_all_fragment_type_flag() {
763        expect_test::expect![[r#"
764            [
765                (
766                    Source,
767                    1,
768                    "SOURCE",
769                ),
770                (
771                    Mview,
772                    2,
773                    "MVIEW",
774                ),
775                (
776                    Sink,
777                    4,
778                    "SINK",
779                ),
780                (
781                    Now,
782                    8,
783                    "NOW",
784                ),
785                (
786                    StreamScan,
787                    16,
788                    "STREAM_SCAN",
789                ),
790                (
791                    BarrierRecv,
792                    32,
793                    "BARRIER_RECV",
794                ),
795                (
796                    Values,
797                    64,
798                    "VALUES",
799                ),
800                (
801                    Dml,
802                    128,
803                    "DML",
804                ),
805                (
806                    CdcFilter,
807                    256,
808                    "CDC_FILTER",
809                ),
810                (
811                    Skipped1,
812                    512,
813                    "SKIPPED1",
814                ),
815                (
816                    SourceScan,
817                    1024,
818                    "SOURCE_SCAN",
819                ),
820                (
821                    SnapshotBackfillStreamScan,
822                    2048,
823                    "SNAPSHOT_BACKFILL_STREAM_SCAN",
824                ),
825                (
826                    FsFetch,
827                    4096,
828                    "FS_FETCH",
829                ),
830                (
831                    CrossDbSnapshotBackfillStreamScan,
832                    8192,
833                    "CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN",
834                ),
835                (
836                    StreamCdcScan,
837                    16384,
838                    "STREAM_CDC_SCAN",
839                ),
840                (
841                    VectorIndexWrite,
842                    32768,
843                    "VECTOR_INDEX_WRITE",
844                ),
845                (
846                    UpstreamSinkUnion,
847                    65536,
848                    "UPSTREAM_SINK_UNION",
849                ),
850                (
851                    LocalityProvider,
852                    131072,
853                    "LOCALITY_PROVIDER",
854                ),
855            ]
856        "#]]
857        .assert_debug_eq(
858            &FRAGMENT_TYPE_FLAG_LIST
859                .into_iter()
860                .map(|flag| (flag, flag as u32, flag.as_str_name()))
861                .collect_vec(),
862        );
863        for flag in FRAGMENT_TYPE_FLAG_LIST {
864            assert_eq!(FragmentTypeFlag::try_from(flag as u32).unwrap(), flag);
865        }
866    }
867}