risingwave_common/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::fmt::Binary;
23use std::sync::Arc;
24
25pub use column::*;
26pub use external_table::*;
27use futures::stream::BoxStream;
28pub use internal_table::*;
29use parse_display::Display;
30pub use physical_table::*;
31use risingwave_pb::catalog::table::PbEngine;
32use risingwave_pb::catalog::{
33    CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
34    StreamJobStatus as PbStreamJobStatus,
35};
36use risingwave_pb::plan_common::ColumnDescVersion;
37pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
38use serde::{Deserialize, Serialize};
39
40use crate::array::DataChunk;
41pub use crate::constants::hummock;
42use crate::error::BoxedError;
43
44/// The global version of the catalog.
45pub type CatalogVersion = u64;
46
47/// The version number of the per-table catalog.
48pub type TableVersionId = u64;
49/// The default version ID for a new table.
50pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
51/// The version number of the per-source catalog.
52pub type SourceVersionId = u64;
53/// The default version ID for a new source.
54pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
55
56pub const DEFAULT_DATABASE_NAME: &str = "dev";
57pub const DEFAULT_SCHEMA_NAME: &str = "public";
58pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
59pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
60pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
61pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
62pub const DEFAULT_SUPER_USER: &str = "root";
63pub const DEFAULT_SUPER_USER_ID: u32 = 1;
64// This is for compatibility with customized utils for PostgreSQL.
65pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
66pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
67
68// This is the default superuser for admin, which is used only for cloud control plane.
69pub const DEFAULT_SUPER_USER_FOR_ADMIN: &str = "rwadmin";
70pub const DEFAULT_SUPER_USER_FOR_ADMIN_ID: u32 = 3;
71
72pub const NON_RESERVED_USER_ID: i32 = 11;
73
74pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
75pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
76
77pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
78
79pub const SYSTEM_SCHEMAS: [&str; 3] = [
80    PG_CATALOG_SCHEMA_NAME,
81    INFORMATION_SCHEMA_SCHEMA_NAME,
82    RW_CATALOG_SCHEMA_NAME,
83];
84pub fn is_system_schema(schema_name: &str) -> bool {
85    SYSTEM_SCHEMAS.contains(&schema_name)
86}
87
88pub fn is_reserved_admin_user(user_name: &str) -> bool {
89    user_name == DEFAULT_SUPER_USER_FOR_ADMIN
90}
91
92pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
93
94/// When there is no primary key specified while creating source, will use
95/// the message key as primary key in `BYTEA` type with this name.
96/// Note: the field has version to track, please refer to [`default_key_column_name_version_mapping`]
97pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
98
99pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
100    match version {
101        ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
102        _ => DEFAULT_KEY_COLUMN_NAME,
103    }
104}
105
106/// For kafka source, we attach a hidden column [`KAFKA_TIMESTAMP_COLUMN_NAME`] to it, so that we
107/// can limit the timestamp range when querying it directly with batch query. The column type is
108/// [`crate::types::DataType::Timestamptz`]. For more details, please refer to
109/// [this rfc](https://github.com/risingwavelabs/rfcs/pull/20).
110pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
111
112/// RisingWave iceberg table engine will create the column `_risingwave_iceberg_row_id` in the iceberg table.
113///
114/// Iceberg V3 spec use `_row_id` as a reserved column name for row lineage, so if the table without primary key,
115/// we can't use `_row_id` directly for iceberg, so use `_risingwave_iceberg_row_id` instead.
116pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
117
118pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
119/// The column ID preserved for the row ID column.
120pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
121
122/// The column ID offset for user-defined columns.
123///
124/// All IDs of user-defined columns must be greater or equal to this value.
125pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
126
127pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
128pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
129
130pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
131pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
132pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
133
134pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
135/// The number of columns output by the cdc source job
136/// see [`ColumnCatalog::debezium_cdc_source_cols()`] for details
137pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
138pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
139
140/// The local system catalog reader in the frontend node.
141pub trait SysCatalogReader: Sync + Send + 'static {
142    /// Reads the data of the system catalog table.
143    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
144}
145
146pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
147
148pub type ObjectId = u32;
149
150#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
151#[display("{database_id}")]
152pub struct DatabaseId {
153    pub database_id: u32,
154}
155
156impl DatabaseId {
157    pub const fn new(database_id: u32) -> Self {
158        DatabaseId { database_id }
159    }
160
161    pub fn placeholder() -> Self {
162        DatabaseId {
163            database_id: OBJECT_ID_PLACEHOLDER,
164        }
165    }
166}
167
168impl From<u32> for DatabaseId {
169    fn from(id: u32) -> Self {
170        Self::new(id)
171    }
172}
173
174impl From<&u32> for DatabaseId {
175    fn from(id: &u32) -> Self {
176        Self::new(*id)
177    }
178}
179
180impl From<DatabaseId> for u32 {
181    fn from(id: DatabaseId) -> Self {
182        id.database_id
183    }
184}
185
186#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)]
187#[display("{schema_id}")]
188pub struct SchemaId {
189    pub schema_id: u32,
190}
191
192impl SchemaId {
193    pub fn new(schema_id: u32) -> Self {
194        SchemaId { schema_id }
195    }
196
197    pub fn placeholder() -> Self {
198        SchemaId {
199            schema_id: OBJECT_ID_PLACEHOLDER,
200        }
201    }
202}
203
204impl From<u32> for SchemaId {
205    fn from(id: u32) -> Self {
206        Self::new(id)
207    }
208}
209
210impl From<&u32> for SchemaId {
211    fn from(id: &u32) -> Self {
212        Self::new(*id)
213    }
214}
215
216impl From<SchemaId> for u32 {
217    fn from(id: SchemaId) -> Self {
218        id.schema_id
219    }
220}
221
222#[derive(
223    Clone,
224    Copy,
225    Debug,
226    Display,
227    Default,
228    Hash,
229    PartialOrd,
230    PartialEq,
231    Eq,
232    Ord,
233    Serialize,
234    Deserialize,
235)]
236#[display("{table_id}")]
237pub struct TableId {
238    pub table_id: u32,
239}
240
241impl TableId {
242    pub const fn new(table_id: u32) -> Self {
243        TableId { table_id }
244    }
245
246    /// Sometimes the id field is filled later, we use this value for better debugging.
247    pub const fn placeholder() -> Self {
248        TableId {
249            table_id: OBJECT_ID_PLACEHOLDER,
250        }
251    }
252
253    pub fn table_id(&self) -> u32 {
254        self.table_id
255    }
256}
257
258impl From<u32> for TableId {
259    fn from(id: u32) -> Self {
260        Self::new(id)
261    }
262}
263
264impl From<&u32> for TableId {
265    fn from(id: &u32) -> Self {
266        Self::new(*id)
267    }
268}
269
270impl From<TableId> for u32 {
271    fn from(id: TableId) -> Self {
272        id.table_id
273    }
274}
275
276#[derive(Clone, Debug, PartialEq, Default, Copy)]
277pub struct TableOption {
278    pub retention_seconds: Option<u32>, // second
279}
280
281impl From<&risingwave_pb::hummock::TableOption> for TableOption {
282    fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
283        Self {
284            retention_seconds: table_option.retention_seconds,
285        }
286    }
287}
288
289impl From<&TableOption> for risingwave_pb::hummock::TableOption {
290    fn from(table_option: &TableOption) -> Self {
291        Self {
292            retention_seconds: table_option.retention_seconds,
293        }
294    }
295}
296
297impl TableOption {
298    pub fn new(retention_seconds: Option<u32>) -> Self {
299        // now we only support ttl for TableOption
300        TableOption { retention_seconds }
301    }
302}
303
304#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq)]
305#[display("{index_id}")]
306pub struct IndexId {
307    pub index_id: u32,
308}
309
310impl IndexId {
311    pub const fn new(index_id: u32) -> Self {
312        IndexId { index_id }
313    }
314
315    /// Sometimes the id field is filled later, we use this value for better debugging.
316    pub const fn placeholder() -> Self {
317        IndexId {
318            index_id: OBJECT_ID_PLACEHOLDER,
319        }
320    }
321
322    pub fn index_id(&self) -> u32 {
323        self.index_id
324    }
325}
326
327impl From<u32> for IndexId {
328    fn from(id: u32) -> Self {
329        Self::new(id)
330    }
331}
332impl From<IndexId> for u32 {
333    fn from(id: IndexId) -> Self {
334        id.index_id
335    }
336}
337
338#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
339pub struct FunctionId(pub u32);
340
341impl FunctionId {
342    pub const fn new(id: u32) -> Self {
343        FunctionId(id)
344    }
345
346    pub const fn placeholder() -> Self {
347        FunctionId(OBJECT_ID_PLACEHOLDER)
348    }
349
350    pub fn function_id(&self) -> u32 {
351        self.0
352    }
353}
354
355impl From<u32> for FunctionId {
356    fn from(id: u32) -> Self {
357        Self::new(id)
358    }
359}
360
361impl From<&u32> for FunctionId {
362    fn from(id: &u32) -> Self {
363        Self::new(*id)
364    }
365}
366
367impl From<FunctionId> for u32 {
368    fn from(id: FunctionId) -> Self {
369        id.0
370    }
371}
372
373#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
374#[display("{user_id}")]
375pub struct UserId {
376    pub user_id: u32,
377}
378
379impl UserId {
380    pub const fn new(user_id: u32) -> Self {
381        UserId { user_id }
382    }
383
384    pub const fn placeholder() -> Self {
385        UserId {
386            user_id: OBJECT_ID_PLACEHOLDER,
387        }
388    }
389}
390
391impl From<u32> for UserId {
392    fn from(id: u32) -> Self {
393        Self::new(id)
394    }
395}
396
397impl From<&u32> for UserId {
398    fn from(id: &u32) -> Self {
399        Self::new(*id)
400    }
401}
402
403impl From<UserId> for u32 {
404    fn from(id: UserId) -> Self {
405        id.user_id
406    }
407}
408
409#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
410pub struct ConnectionId(pub u32);
411
412impl ConnectionId {
413    pub const fn new(id: u32) -> Self {
414        ConnectionId(id)
415    }
416
417    pub const fn placeholder() -> Self {
418        ConnectionId(OBJECT_ID_PLACEHOLDER)
419    }
420
421    pub fn connection_id(&self) -> u32 {
422        self.0
423    }
424}
425
426impl From<u32> for ConnectionId {
427    fn from(id: u32) -> Self {
428        Self::new(id)
429    }
430}
431
432impl From<&u32> for ConnectionId {
433    fn from(id: &u32) -> Self {
434        Self::new(*id)
435    }
436}
437
438impl From<ConnectionId> for u32 {
439    fn from(id: ConnectionId) -> Self {
440        id.0
441    }
442}
443
444#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
445pub struct SecretId(pub u32);
446
447impl SecretId {
448    pub const fn new(id: u32) -> Self {
449        SecretId(id)
450    }
451
452    pub const fn placeholder() -> Self {
453        SecretId(OBJECT_ID_PLACEHOLDER)
454    }
455
456    pub fn secret_id(&self) -> u32 {
457        self.0
458    }
459}
460
461impl From<u32> for SecretId {
462    fn from(id: u32) -> Self {
463        Self::new(id)
464    }
465}
466
467impl From<&u32> for SecretId {
468    fn from(id: &u32) -> Self {
469        Self::new(*id)
470    }
471}
472
473impl From<SecretId> for u32 {
474    fn from(id: SecretId) -> Self {
475        id.0
476    }
477}
478
479#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
480pub enum ConflictBehavior {
481    #[default]
482    NoCheck,
483    Overwrite,
484    IgnoreConflict,
485    DoUpdateIfNotNull,
486}
487
488#[macro_export]
489macro_rules! _checked_conflict_behaviors {
490    () => {
491        ConflictBehavior::Overwrite
492            | ConflictBehavior::IgnoreConflict
493            | ConflictBehavior::DoUpdateIfNotNull
494    };
495}
496pub use _checked_conflict_behaviors as checked_conflict_behaviors;
497
498impl ConflictBehavior {
499    pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
500        match tb_conflict_behavior {
501            PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
502            PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
503            PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
504            // This is for backward compatibility, in the previous version
505            // `HandleConflictBehavior::Unspecified` represented `NoCheck`, so just treat it as `NoCheck`.
506            PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
507                ConflictBehavior::NoCheck
508            }
509        }
510    }
511
512    pub fn to_protobuf(self) -> PbHandleConflictBehavior {
513        match self {
514            ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
515            ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
516            ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
517            ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
518        }
519    }
520
521    pub fn debug_to_string(self) -> String {
522        match self {
523            ConflictBehavior::NoCheck => "NoCheck".to_owned(),
524            ConflictBehavior::Overwrite => "Overwrite".to_owned(),
525            ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
526            ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
527        }
528    }
529}
530
531#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
532pub enum Engine {
533    #[default]
534    Hummock,
535    Iceberg,
536}
537
538impl Engine {
539    pub fn from_protobuf(engine: &PbEngine) -> Self {
540        match engine {
541            PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
542            PbEngine::Iceberg => Engine::Iceberg,
543        }
544    }
545
546    pub fn to_protobuf(self) -> PbEngine {
547        match self {
548            Engine::Hummock => PbEngine::Hummock,
549            Engine::Iceberg => PbEngine::Iceberg,
550        }
551    }
552
553    pub fn debug_to_string(self) -> String {
554        match self {
555            Engine::Hummock => "Hummock".to_owned(),
556            Engine::Iceberg => "Iceberg".to_owned(),
557        }
558    }
559}
560
561#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
562pub enum StreamJobStatus {
563    #[default]
564    Creating,
565    Created,
566}
567
568impl StreamJobStatus {
569    pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
570        match stream_job_status {
571            PbStreamJobStatus::Creating => StreamJobStatus::Creating,
572            PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
573        }
574    }
575
576    pub fn to_proto(self) -> PbStreamJobStatus {
577        match self {
578            StreamJobStatus::Creating => PbStreamJobStatus::Creating,
579            StreamJobStatus::Created => PbStreamJobStatus::Created,
580        }
581    }
582}
583
584#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
585pub enum CreateType {
586    Foreground,
587    Background,
588}
589
590impl Default for CreateType {
591    fn default() -> Self {
592        Self::Foreground
593    }
594}
595
596impl CreateType {
597    pub fn from_proto(pb_create_type: PbCreateType) -> Self {
598        match pb_create_type {
599            PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
600            PbCreateType::Background => CreateType::Background,
601        }
602    }
603
604    pub fn to_proto(self) -> PbCreateType {
605        match self {
606            CreateType::Foreground => PbCreateType::Foreground,
607            CreateType::Background => PbCreateType::Background,
608        }
609    }
610}
611
612#[derive(Clone, Debug)]
613pub enum AlterDatabaseParam {
614    // Barrier related parameters, per database.
615    // None represents the default value, which means it follows `SystemParams`.
616    BarrierIntervalMs(Option<u32>),
617    CheckpointFrequency(Option<u64>),
618}
619
620macro_rules! for_all_fragment_type_flags {
621    () => {
622        for_all_fragment_type_flags! {
623            {
624                Source,
625                Mview,
626                Sink,
627                Now,
628                StreamScan,
629                BarrierRecv,
630                Values,
631                Dml,
632                CdcFilter,
633                Skipped1,
634                SourceScan,
635                SnapshotBackfillStreamScan,
636                FsFetch,
637                CrossDbSnapshotBackfillStreamScan,
638                StreamCdcScan
639            },
640            {},
641            0
642        }
643    };
644    (
645        {},
646        {
647            $(
648                {$flag:ident, $index:expr}
649            ),*
650        },
651        $next_index:expr
652    ) => {
653        #[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq)]
654        #[repr(u32)]
655        pub enum FragmentTypeFlag {
656            $(
657                $flag = (1 << $index),
658            )*
659        }
660
661        pub const FRAGMENT_TYPE_FLAG_LIST: [FragmentTypeFlag; $next_index] = [
662            $(
663                FragmentTypeFlag::$flag,
664            )*
665        ];
666
667        impl TryFrom<u32> for FragmentTypeFlag {
668            type Error = String;
669
670            fn try_from(value: u32) -> Result<Self, Self::Error> {
671                match value {
672                    $(
673                        value if value == (FragmentTypeFlag::$flag as u32) => Ok(FragmentTypeFlag::$flag),
674                    )*
675                    _ => Err(format!("Invalid FragmentTypeFlag value: {}", value)),
676                }
677            }
678        }
679
680        impl FragmentTypeFlag {
681            pub fn as_str_name(&self) -> &'static str {
682                match self {
683                    $(
684                        FragmentTypeFlag::$flag => paste::paste!{stringify!( [< $flag:snake:upper >] )},
685                    )*
686                }
687            }
688        }
689    };
690    (
691        {$first:ident $(, $rest:ident)*},
692        {
693            $(
694                {$flag:ident, $index:expr}
695            ),*
696        },
697        $next_index:expr
698    ) => {
699        for_all_fragment_type_flags! {
700            {$($rest),*},
701            {
702                $({$flag, $index},)*
703                {$first, $next_index}
704            },
705            $next_index + 1
706        }
707    };
708}
709
710for_all_fragment_type_flags!();
711
712impl FragmentTypeFlag {
713    pub fn raw_flag(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> u32 {
714        flags.into_iter().fold(0, |acc, flag| acc | (flag as u32))
715    }
716
717    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
718    pub fn backfill_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
719        [FragmentTypeFlag::SourceScan, FragmentTypeFlag::StreamScan].into_iter()
720    }
721
722    /// Fragments that may be affected by `SOURCE_RATE_LIMIT`.
723    /// Note: for `FsFetch`, old fragments don't have this flag set, so don't use this to check.
724    pub fn source_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
725        [FragmentTypeFlag::Source, FragmentTypeFlag::FsFetch].into_iter()
726    }
727
728    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
729    pub fn sink_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
730        [FragmentTypeFlag::Sink].into_iter()
731    }
732
733    /// Note: this doesn't include `FsFetch` created in old versions.
734    pub fn rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
735        Self::backfill_rate_limit_fragments()
736            .chain(Self::source_rate_limit_fragments())
737            .chain(Self::sink_rate_limit_fragments())
738    }
739
740    pub fn dml_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
741        [FragmentTypeFlag::Dml].into_iter()
742    }
743}
744
745#[derive(Clone, Copy, Debug, Hash, PartialOrd, PartialEq, Eq, Default)]
746pub struct FragmentTypeMask(u32);
747
748impl Binary for FragmentTypeMask {
749    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
750        write!(f, "{:b}", self.0)
751    }
752}
753
754impl From<i32> for FragmentTypeMask {
755    fn from(value: i32) -> Self {
756        Self(value as u32)
757    }
758}
759
760impl From<u32> for FragmentTypeMask {
761    fn from(value: u32) -> Self {
762        Self(value)
763    }
764}
765
766impl From<FragmentTypeMask> for u32 {
767    fn from(value: FragmentTypeMask) -> Self {
768        value.0
769    }
770}
771
772impl From<FragmentTypeMask> for i32 {
773    fn from(value: FragmentTypeMask) -> Self {
774        value.0 as _
775    }
776}
777
778impl FragmentTypeMask {
779    pub fn empty() -> Self {
780        FragmentTypeMask(0)
781    }
782
783    pub fn add(&mut self, flag: FragmentTypeFlag) {
784        self.0 |= flag as u32;
785    }
786
787    pub fn contains_any(&self, flags: impl IntoIterator<Item = FragmentTypeFlag>) -> bool {
788        let flag = FragmentTypeFlag::raw_flag(flags);
789        (self.0 & flag) != 0
790    }
791
792    pub fn contains(&self, flag: FragmentTypeFlag) -> bool {
793        self.contains_any([flag])
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use itertools::Itertools;
800    use risingwave_common::catalog::FRAGMENT_TYPE_FLAG_LIST;
801
802    use crate::catalog::FragmentTypeFlag;
803
804    #[test]
805    fn test_all_fragment_type_flag() {
806        expect_test::expect![[r#"
807            [
808                (
809                    Source,
810                    1,
811                    "SOURCE",
812                ),
813                (
814                    Mview,
815                    2,
816                    "MVIEW",
817                ),
818                (
819                    Sink,
820                    4,
821                    "SINK",
822                ),
823                (
824                    Now,
825                    8,
826                    "NOW",
827                ),
828                (
829                    StreamScan,
830                    16,
831                    "STREAM_SCAN",
832                ),
833                (
834                    BarrierRecv,
835                    32,
836                    "BARRIER_RECV",
837                ),
838                (
839                    Values,
840                    64,
841                    "VALUES",
842                ),
843                (
844                    Dml,
845                    128,
846                    "DML",
847                ),
848                (
849                    CdcFilter,
850                    256,
851                    "CDC_FILTER",
852                ),
853                (
854                    Skipped1,
855                    512,
856                    "SKIPPED1",
857                ),
858                (
859                    SourceScan,
860                    1024,
861                    "SOURCE_SCAN",
862                ),
863                (
864                    SnapshotBackfillStreamScan,
865                    2048,
866                    "SNAPSHOT_BACKFILL_STREAM_SCAN",
867                ),
868                (
869                    FsFetch,
870                    4096,
871                    "FS_FETCH",
872                ),
873                (
874                    CrossDbSnapshotBackfillStreamScan,
875                    8192,
876                    "CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN",
877                ),
878                (
879                    StreamCdcScan,
880                    16384,
881                    "STREAM_CDC_SCAN",
882                ),
883            ]
884        "#]]
885        .assert_debug_eq(
886            &FRAGMENT_TYPE_FLAG_LIST
887                .into_iter()
888                .map(|flag| (flag, flag as u32, flag.as_str_name()))
889                .collect_vec(),
890        );
891        for flag in FRAGMENT_TYPE_FLAG_LIST {
892            assert_eq!(FragmentTypeFlag::try_from(flag as u32).unwrap(), flag);
893        }
894    }
895}