risingwave_common/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::fmt::Binary;
23use std::sync::Arc;
24
25pub use column::*;
26pub use external_table::*;
27use futures::stream::BoxStream;
28pub use internal_table::*;
29use parse_display::Display;
30pub use physical_table::*;
31use risingwave_pb::catalog::table::PbEngine;
32use risingwave_pb::catalog::{
33    CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
34    StreamJobStatus as PbStreamJobStatus,
35};
36use risingwave_pb::plan_common::ColumnDescVersion;
37pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
38use serde::{Deserialize, Serialize};
39
40use crate::array::DataChunk;
41pub use crate::constants::hummock;
42use crate::error::BoxedError;
43
44/// The global version of the catalog.
45pub type CatalogVersion = u64;
46
47/// The version number of the per-table catalog.
48pub type TableVersionId = u64;
49/// The default version ID for a new table.
50pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
51/// The version number of the per-source catalog.
52pub type SourceVersionId = u64;
53/// The default version ID for a new source.
54pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
55
56pub const DEFAULT_DATABASE_NAME: &str = "dev";
57pub const DEFAULT_SCHEMA_NAME: &str = "public";
58pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
59pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
60pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
61pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
62pub const DEFAULT_SUPER_USER: &str = "root";
63pub const DEFAULT_SUPER_USER_ID: u32 = 1;
64// This is for compatibility with customized utils for PostgreSQL.
65pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
66pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
67
68// This is the default superuser for admin, which is used only for cloud control plane.
69pub const DEFAULT_SUPER_USER_FOR_ADMIN: &str = "rwadmin";
70pub const DEFAULT_SUPER_USER_FOR_ADMIN_ID: u32 = 3;
71
72pub const NON_RESERVED_USER_ID: i32 = 11;
73
74pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
75pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
76
77pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
78
79pub const SYSTEM_SCHEMAS: [&str; 3] = [
80    PG_CATALOG_SCHEMA_NAME,
81    INFORMATION_SCHEMA_SCHEMA_NAME,
82    RW_CATALOG_SCHEMA_NAME,
83];
84pub fn is_system_schema(schema_name: &str) -> bool {
85    SYSTEM_SCHEMAS.contains(&schema_name)
86}
87
88pub fn is_reserved_admin_user(user_name: &str) -> bool {
89    user_name == DEFAULT_SUPER_USER_FOR_ADMIN
90}
91
92pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
93
94/// When there is no primary key specified while creating source, will use
95/// the message key as primary key in `BYTEA` type with this name.
96/// Note: the field has version to track, please refer to [`default_key_column_name_version_mapping`]
97pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
98
99pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
100    match version {
101        ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
102        _ => DEFAULT_KEY_COLUMN_NAME,
103    }
104}
105
106/// For kafka source, we attach a hidden column [`KAFKA_TIMESTAMP_COLUMN_NAME`] to it, so that we
107/// can limit the timestamp range when querying it directly with batch query. The column type is
108/// [`crate::types::DataType::Timestamptz`]. For more details, please refer to
109/// [this rfc](https://github.com/risingwavelabs/rfcs/pull/20).
110pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
111
112/// RisingWave iceberg table engine will create the column `_risingwave_iceberg_row_id` in the iceberg table.
113///
114/// Iceberg V3 spec use `_row_id` as a reserved column name for row lineage, so if the table without primary key,
115/// we can't use `_row_id` directly for iceberg, so use `_risingwave_iceberg_row_id` instead.
116pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
117
118pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
119/// The column ID preserved for the row ID column.
120pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
121
122/// The column ID offset for user-defined columns.
123///
124/// All IDs of user-defined columns must be greater or equal to this value.
125pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
126
127pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
128pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
129
130pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
131pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
132pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
133
134pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
135/// The number of columns output by the cdc source job
136/// see [`ColumnCatalog::debezium_cdc_source_cols()`] for details
137pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
138pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
139
140/// The local system catalog reader in the frontend node.
141pub trait SysCatalogReader: Sync + Send + 'static {
142    /// Reads the data of the system catalog table.
143    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
144}
145
146pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
147
148pub type ObjectId = u32;
149
150#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
151#[display("{database_id}")]
152pub struct DatabaseId {
153    pub database_id: u32,
154}
155
156impl DatabaseId {
157    pub const fn new(database_id: u32) -> Self {
158        DatabaseId { database_id }
159    }
160
161    pub fn placeholder() -> Self {
162        DatabaseId {
163            database_id: OBJECT_ID_PLACEHOLDER,
164        }
165    }
166}
167
168impl From<u32> for DatabaseId {
169    fn from(id: u32) -> Self {
170        Self::new(id)
171    }
172}
173
174impl From<&u32> for DatabaseId {
175    fn from(id: &u32) -> Self {
176        Self::new(*id)
177    }
178}
179
180impl From<DatabaseId> for u32 {
181    fn from(id: DatabaseId) -> Self {
182        id.database_id
183    }
184}
185
186#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)]
187#[display("{schema_id}")]
188pub struct SchemaId {
189    pub schema_id: u32,
190}
191
192impl SchemaId {
193    pub fn new(schema_id: u32) -> Self {
194        SchemaId { schema_id }
195    }
196
197    pub fn placeholder() -> Self {
198        SchemaId {
199            schema_id: OBJECT_ID_PLACEHOLDER,
200        }
201    }
202}
203
204impl From<u32> for SchemaId {
205    fn from(id: u32) -> Self {
206        Self::new(id)
207    }
208}
209
210impl From<&u32> for SchemaId {
211    fn from(id: &u32) -> Self {
212        Self::new(*id)
213    }
214}
215
216impl From<SchemaId> for u32 {
217    fn from(id: SchemaId) -> Self {
218        id.schema_id
219    }
220}
221
222#[derive(
223    Clone,
224    Copy,
225    Debug,
226    Display,
227    Default,
228    Hash,
229    PartialOrd,
230    PartialEq,
231    Eq,
232    Ord,
233    Serialize,
234    Deserialize,
235)]
236#[display("{table_id}")]
237pub struct TableId {
238    pub table_id: u32,
239}
240
241impl TableId {
242    pub const fn new(table_id: u32) -> Self {
243        TableId { table_id }
244    }
245
246    /// Sometimes the id field is filled later, we use this value for better debugging.
247    pub const fn placeholder() -> Self {
248        TableId {
249            table_id: OBJECT_ID_PLACEHOLDER,
250        }
251    }
252
253    pub fn table_id(&self) -> u32 {
254        self.table_id
255    }
256}
257
258impl From<u32> for TableId {
259    fn from(id: u32) -> Self {
260        Self::new(id)
261    }
262}
263
264impl From<&u32> for TableId {
265    fn from(id: &u32) -> Self {
266        Self::new(*id)
267    }
268}
269
270impl From<TableId> for u32 {
271    fn from(id: TableId) -> Self {
272        id.table_id
273    }
274}
275
276#[derive(Clone, Debug, PartialEq, Default, Copy)]
277pub struct TableOption {
278    pub retention_seconds: Option<u32>, // second
279}
280
281impl From<&risingwave_pb::hummock::TableOption> for TableOption {
282    fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
283        Self {
284            retention_seconds: table_option.retention_seconds,
285        }
286    }
287}
288
289impl From<&TableOption> for risingwave_pb::hummock::TableOption {
290    fn from(table_option: &TableOption) -> Self {
291        Self {
292            retention_seconds: table_option.retention_seconds,
293        }
294    }
295}
296
297impl TableOption {
298    pub fn new(retention_seconds: Option<u32>) -> Self {
299        // now we only support ttl for TableOption
300        TableOption { retention_seconds }
301    }
302}
303
304#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq)]
305#[display("{index_id}")]
306pub struct IndexId {
307    pub index_id: u32,
308}
309
310impl IndexId {
311    pub const fn new(index_id: u32) -> Self {
312        IndexId { index_id }
313    }
314
315    /// Sometimes the id field is filled later, we use this value for better debugging.
316    pub const fn placeholder() -> Self {
317        IndexId {
318            index_id: OBJECT_ID_PLACEHOLDER,
319        }
320    }
321
322    pub fn index_id(&self) -> u32 {
323        self.index_id
324    }
325}
326
327impl From<u32> for IndexId {
328    fn from(id: u32) -> Self {
329        Self::new(id)
330    }
331}
332impl From<IndexId> for u32 {
333    fn from(id: IndexId) -> Self {
334        id.index_id
335    }
336}
337
338#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
339pub struct FunctionId(pub u32);
340
341impl FunctionId {
342    pub const fn new(id: u32) -> Self {
343        FunctionId(id)
344    }
345
346    pub const fn placeholder() -> Self {
347        FunctionId(OBJECT_ID_PLACEHOLDER)
348    }
349
350    pub fn function_id(&self) -> u32 {
351        self.0
352    }
353}
354
355impl From<u32> for FunctionId {
356    fn from(id: u32) -> Self {
357        Self::new(id)
358    }
359}
360
361impl From<&u32> for FunctionId {
362    fn from(id: &u32) -> Self {
363        Self::new(*id)
364    }
365}
366
367impl From<FunctionId> for u32 {
368    fn from(id: FunctionId) -> Self {
369        id.0
370    }
371}
372
373#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
374#[display("{user_id}")]
375pub struct UserId {
376    pub user_id: u32,
377}
378
379impl UserId {
380    pub const fn new(user_id: u32) -> Self {
381        UserId { user_id }
382    }
383
384    pub const fn placeholder() -> Self {
385        UserId {
386            user_id: OBJECT_ID_PLACEHOLDER,
387        }
388    }
389}
390
391impl From<u32> for UserId {
392    fn from(id: u32) -> Self {
393        Self::new(id)
394    }
395}
396
397impl From<&u32> for UserId {
398    fn from(id: &u32) -> Self {
399        Self::new(*id)
400    }
401}
402
403impl From<UserId> for u32 {
404    fn from(id: UserId) -> Self {
405        id.user_id
406    }
407}
408
409#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
410pub struct ConnectionId(pub u32);
411
412impl ConnectionId {
413    pub const fn new(id: u32) -> Self {
414        ConnectionId(id)
415    }
416
417    pub const fn placeholder() -> Self {
418        ConnectionId(OBJECT_ID_PLACEHOLDER)
419    }
420
421    pub fn connection_id(&self) -> u32 {
422        self.0
423    }
424}
425
426impl From<u32> for ConnectionId {
427    fn from(id: u32) -> Self {
428        Self::new(id)
429    }
430}
431
432impl From<&u32> for ConnectionId {
433    fn from(id: &u32) -> Self {
434        Self::new(*id)
435    }
436}
437
438impl From<ConnectionId> for u32 {
439    fn from(id: ConnectionId) -> Self {
440        id.0
441    }
442}
443
444#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
445pub struct SecretId(pub u32);
446
447impl SecretId {
448    pub const fn new(id: u32) -> Self {
449        SecretId(id)
450    }
451
452    pub const fn placeholder() -> Self {
453        SecretId(OBJECT_ID_PLACEHOLDER)
454    }
455
456    pub fn secret_id(&self) -> u32 {
457        self.0
458    }
459}
460
461impl From<u32> for SecretId {
462    fn from(id: u32) -> Self {
463        Self::new(id)
464    }
465}
466
467impl From<&u32> for SecretId {
468    fn from(id: &u32) -> Self {
469        Self::new(*id)
470    }
471}
472
473impl From<SecretId> for u32 {
474    fn from(id: SecretId) -> Self {
475        id.0
476    }
477}
478
479#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
480pub enum ConflictBehavior {
481    #[default]
482    NoCheck,
483    Overwrite,
484    IgnoreConflict,
485    DoUpdateIfNotNull,
486}
487
488#[macro_export]
489macro_rules! _checked_conflict_behaviors {
490    () => {
491        ConflictBehavior::Overwrite
492            | ConflictBehavior::IgnoreConflict
493            | ConflictBehavior::DoUpdateIfNotNull
494    };
495}
496pub use _checked_conflict_behaviors as checked_conflict_behaviors;
497
498impl ConflictBehavior {
499    pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
500        match tb_conflict_behavior {
501            PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
502            PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
503            PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
504            // This is for backward compatibility, in the previous version
505            // `HandleConflictBehavior::Unspecified` represented `NoCheck`, so just treat it as `NoCheck`.
506            PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
507                ConflictBehavior::NoCheck
508            }
509        }
510    }
511
512    pub fn to_protobuf(self) -> PbHandleConflictBehavior {
513        match self {
514            ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
515            ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
516            ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
517            ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
518        }
519    }
520
521    pub fn debug_to_string(self) -> String {
522        match self {
523            ConflictBehavior::NoCheck => "NoCheck".to_owned(),
524            ConflictBehavior::Overwrite => "Overwrite".to_owned(),
525            ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
526            ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
527        }
528    }
529}
530
531#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
532pub enum Engine {
533    #[default]
534    Hummock,
535    Iceberg,
536}
537
538impl Engine {
539    pub fn from_protobuf(engine: &PbEngine) -> Self {
540        match engine {
541            PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
542            PbEngine::Iceberg => Engine::Iceberg,
543        }
544    }
545
546    pub fn to_protobuf(self) -> PbEngine {
547        match self {
548            Engine::Hummock => PbEngine::Hummock,
549            Engine::Iceberg => PbEngine::Iceberg,
550        }
551    }
552
553    pub fn debug_to_string(self) -> String {
554        match self {
555            Engine::Hummock => "Hummock".to_owned(),
556            Engine::Iceberg => "Iceberg".to_owned(),
557        }
558    }
559}
560
561#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
562pub enum StreamJobStatus {
563    #[default]
564    Creating,
565    Created,
566}
567
568impl StreamJobStatus {
569    pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
570        match stream_job_status {
571            PbStreamJobStatus::Creating => StreamJobStatus::Creating,
572            PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
573        }
574    }
575
576    pub fn to_proto(self) -> PbStreamJobStatus {
577        match self {
578            StreamJobStatus::Creating => PbStreamJobStatus::Creating,
579            StreamJobStatus::Created => PbStreamJobStatus::Created,
580        }
581    }
582}
583
584#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
585pub enum CreateType {
586    Foreground,
587    Background,
588}
589
590impl Default for CreateType {
591    fn default() -> Self {
592        Self::Foreground
593    }
594}
595
596impl CreateType {
597    pub fn from_proto(pb_create_type: PbCreateType) -> Self {
598        match pb_create_type {
599            PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
600            PbCreateType::Background => CreateType::Background,
601        }
602    }
603
604    pub fn to_proto(self) -> PbCreateType {
605        match self {
606            CreateType::Foreground => PbCreateType::Foreground,
607            CreateType::Background => PbCreateType::Background,
608        }
609    }
610}
611
612#[derive(Clone, Debug)]
613pub enum AlterDatabaseParam {
614    // Barrier related parameters, per database.
615    // None represents the default value, which means it follows `SystemParams`.
616    BarrierIntervalMs(Option<u32>),
617    CheckpointFrequency(Option<u64>),
618}
619
620macro_rules! for_all_fragment_type_flags {
621    () => {
622        for_all_fragment_type_flags! {
623            {
624                Source,
625                Mview,
626                Sink,
627                Now,
628                StreamScan,
629                BarrierRecv,
630                Values,
631                Dml,
632                CdcFilter,
633                Skipped1,
634                SourceScan,
635                SnapshotBackfillStreamScan,
636                FsFetch,
637                CrossDbSnapshotBackfillStreamScan
638            },
639            {},
640            0
641        }
642    };
643    (
644        {},
645        {
646            $(
647                {$flag:ident, $index:expr}
648            ),*
649        },
650        $next_index:expr
651    ) => {
652        #[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq)]
653        #[repr(u32)]
654        pub enum FragmentTypeFlag {
655            $(
656                $flag = (1 << $index),
657            )*
658        }
659
660        pub const FRAGMENT_TYPE_FLAG_LIST: [FragmentTypeFlag; $next_index] = [
661            $(
662                FragmentTypeFlag::$flag,
663            )*
664        ];
665
666        impl TryFrom<u32> for FragmentTypeFlag {
667            type Error = String;
668
669            fn try_from(value: u32) -> Result<Self, Self::Error> {
670                match value {
671                    $(
672                        value if value == (FragmentTypeFlag::$flag as u32) => Ok(FragmentTypeFlag::$flag),
673                    )*
674                    _ => Err(format!("Invalid FragmentTypeFlag value: {}", value)),
675                }
676            }
677        }
678
679        impl FragmentTypeFlag {
680            pub fn as_str_name(&self) -> &'static str {
681                match self {
682                    $(
683                        FragmentTypeFlag::$flag => paste::paste!{stringify!( [< $flag:snake:upper >] )},
684                    )*
685                }
686            }
687        }
688    };
689    (
690        {$first:ident $(, $rest:ident)*},
691        {
692            $(
693                {$flag:ident, $index:expr}
694            ),*
695        },
696        $next_index:expr
697    ) => {
698        for_all_fragment_type_flags! {
699            {$($rest),*},
700            {
701                $({$flag, $index},)*
702                {$first, $next_index}
703            },
704            $next_index + 1
705        }
706    };
707}
708
709for_all_fragment_type_flags!();
710
711impl FragmentTypeFlag {
712    pub fn raw_flag(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> u32 {
713        flags.into_iter().fold(0, |acc, flag| acc | (flag as u32))
714    }
715
716    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
717    pub fn backfill_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
718        [FragmentTypeFlag::SourceScan, FragmentTypeFlag::StreamScan].into_iter()
719    }
720
721    /// Fragments that may be affected by `SOURCE_RATE_LIMIT`.
722    /// Note: for `FsFetch`, old fragments don't have this flag set, so don't use this to check.
723    pub fn source_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
724        [FragmentTypeFlag::Source, FragmentTypeFlag::FsFetch].into_iter()
725    }
726
727    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
728    pub fn sink_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
729        [FragmentTypeFlag::Sink].into_iter()
730    }
731
732    /// Note: this doesn't include `FsFetch` created in old versions.
733    pub fn rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
734        Self::backfill_rate_limit_fragments()
735            .chain(Self::source_rate_limit_fragments())
736            .chain(Self::sink_rate_limit_fragments())
737    }
738
739    pub fn dml_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
740        [FragmentTypeFlag::Dml].into_iter()
741    }
742}
743
744#[derive(Clone, Copy, Debug, Hash, PartialOrd, PartialEq, Eq, Default)]
745pub struct FragmentTypeMask(u32);
746
747impl Binary for FragmentTypeMask {
748    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
749        write!(f, "{:b}", self.0)
750    }
751}
752
753impl From<i32> for FragmentTypeMask {
754    fn from(value: i32) -> Self {
755        Self(value as u32)
756    }
757}
758
759impl From<u32> for FragmentTypeMask {
760    fn from(value: u32) -> Self {
761        Self(value)
762    }
763}
764
765impl From<FragmentTypeMask> for u32 {
766    fn from(value: FragmentTypeMask) -> Self {
767        value.0
768    }
769}
770
771impl From<FragmentTypeMask> for i32 {
772    fn from(value: FragmentTypeMask) -> Self {
773        value.0 as _
774    }
775}
776
777impl FragmentTypeMask {
778    pub fn empty() -> Self {
779        FragmentTypeMask(0)
780    }
781
782    pub fn add(&mut self, flag: FragmentTypeFlag) {
783        self.0 |= flag as u32;
784    }
785
786    pub fn contains_any(&self, flags: impl IntoIterator<Item = FragmentTypeFlag>) -> bool {
787        let flag = FragmentTypeFlag::raw_flag(flags);
788        (self.0 & flag) != 0
789    }
790
791    pub fn contains(&self, flag: FragmentTypeFlag) -> bool {
792        self.contains_any([flag])
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use itertools::Itertools;
799    use risingwave_common::catalog::FRAGMENT_TYPE_FLAG_LIST;
800
801    use crate::catalog::FragmentTypeFlag;
802
803    #[test]
804    fn test_all_fragment_type_flag() {
805        expect_test::expect![[r#"
806            [
807                (
808                    Source,
809                    1,
810                    "SOURCE",
811                ),
812                (
813                    Mview,
814                    2,
815                    "MVIEW",
816                ),
817                (
818                    Sink,
819                    4,
820                    "SINK",
821                ),
822                (
823                    Now,
824                    8,
825                    "NOW",
826                ),
827                (
828                    StreamScan,
829                    16,
830                    "STREAM_SCAN",
831                ),
832                (
833                    BarrierRecv,
834                    32,
835                    "BARRIER_RECV",
836                ),
837                (
838                    Values,
839                    64,
840                    "VALUES",
841                ),
842                (
843                    Dml,
844                    128,
845                    "DML",
846                ),
847                (
848                    CdcFilter,
849                    256,
850                    "CDC_FILTER",
851                ),
852                (
853                    Skipped1,
854                    512,
855                    "SKIPPED1",
856                ),
857                (
858                    SourceScan,
859                    1024,
860                    "SOURCE_SCAN",
861                ),
862                (
863                    SnapshotBackfillStreamScan,
864                    2048,
865                    "SNAPSHOT_BACKFILL_STREAM_SCAN",
866                ),
867                (
868                    FsFetch,
869                    4096,
870                    "FS_FETCH",
871                ),
872                (
873                    CrossDbSnapshotBackfillStreamScan,
874                    8192,
875                    "CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN",
876                ),
877            ]
878        "#]]
879        .assert_debug_eq(
880            &FRAGMENT_TYPE_FLAG_LIST
881                .into_iter()
882                .map(|flag| (flag, flag as u32, flag.as_str_name()))
883                .collect_vec(),
884        );
885        for flag in FRAGMENT_TYPE_FLAG_LIST {
886            assert_eq!(FragmentTypeFlag::try_from(flag as u32).unwrap(), flag);
887        }
888    }
889}