risingwave_common/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::fmt::Binary;
23use std::sync::Arc;
24
25pub use column::*;
26pub use external_table::*;
27use futures::stream::BoxStream;
28pub use internal_table::*;
29use parse_display::Display;
30pub use physical_table::*;
31use risingwave_pb::catalog::table::PbEngine;
32use risingwave_pb::catalog::{
33    CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
34    StreamJobStatus as PbStreamJobStatus,
35};
36use risingwave_pb::plan_common::ColumnDescVersion;
37pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
38
39use crate::array::DataChunk;
40pub use crate::constants::hummock;
41use crate::error::BoxedError;
42pub use crate::id::{
43    ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, TableId,
44};
45
46/// The global version of the catalog.
47pub type CatalogVersion = u64;
48
49/// The version number of the per-table catalog.
50pub type TableVersionId = u64;
51/// The default version ID for a new table.
52pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
53/// The version number of the per-source catalog.
54pub type SourceVersionId = u64;
55/// The default version ID for a new source.
56pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
57
58pub const DEFAULT_DATABASE_NAME: &str = "dev";
59pub const DEFAULT_SCHEMA_NAME: &str = "public";
60pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
61pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
62pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
63pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
64pub const DEFAULT_SUPER_USER: &str = "root";
65pub const DEFAULT_SUPER_USER_ID: u32 = 1;
66// This is for compatibility with customized utils for PostgreSQL.
67pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
68pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
69
70// This is the default superuser for admin, which is used only for cloud control plane.
71pub const DEFAULT_SUPER_USER_FOR_ADMIN: &str = "rwadmin";
72pub const DEFAULT_SUPER_USER_FOR_ADMIN_ID: u32 = 3;
73
74pub const NON_RESERVED_USER_ID: i32 = 11;
75
76pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
77pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
78
79pub use risingwave_pb::id::OBJECT_ID_PLACEHOLDER;
80
81pub const SYSTEM_SCHEMAS: [&str; 3] = [
82    PG_CATALOG_SCHEMA_NAME,
83    INFORMATION_SCHEMA_SCHEMA_NAME,
84    RW_CATALOG_SCHEMA_NAME,
85];
86pub fn is_system_schema(schema_name: &str) -> bool {
87    SYSTEM_SCHEMAS.contains(&schema_name)
88}
89
90pub fn is_reserved_admin_user(user_name: &str) -> bool {
91    user_name == DEFAULT_SUPER_USER_FOR_ADMIN
92}
93
94pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
95
96/// When there is no primary key specified while creating source, will use
97/// the message key as primary key in `BYTEA` type with this name.
98/// Note: the field has version to track, please refer to [`default_key_column_name_version_mapping`]
99pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
100
101pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
102    match version {
103        ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
104        _ => DEFAULT_KEY_COLUMN_NAME,
105    }
106}
107
108/// For kafka source, we attach a hidden column [`KAFKA_TIMESTAMP_COLUMN_NAME`] to it, so that we
109/// can limit the timestamp range when querying it directly with batch query. The column type is
110/// [`crate::types::DataType::Timestamptz`]. For more details, please refer to
111/// [this rfc](https://github.com/risingwavelabs/rfcs/pull/20).
112pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
113
114/// RisingWave iceberg table engine will create the column `_risingwave_iceberg_row_id` in the iceberg table.
115///
116/// Iceberg V3 spec use `_row_id` as a reserved column name for row lineage, so if the table without primary key,
117/// we can't use `_row_id` directly for iceberg, so use `_risingwave_iceberg_row_id` instead.
118pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
119
120pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
121/// The column ID preserved for the row ID column.
122pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
123
124/// The column ID offset for user-defined columns.
125///
126/// All IDs of user-defined columns must be greater or equal to this value.
127pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
128
129pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
130pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
131
132pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
133pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
134pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
135
136pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
137/// The number of columns output by the cdc source job
138/// see [`ColumnCatalog::debezium_cdc_source_cols()`] for details
139pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
140pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
141
142pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
143pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
144
145/// The local system catalog reader in the frontend node.
146pub trait SysCatalogReader: Sync + Send + 'static {
147    /// Reads the data of the system catalog table.
148    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
149}
150
151pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
152
153#[derive(Clone, Debug, PartialEq, Default, Copy)]
154pub struct TableOption {
155    pub retention_seconds: Option<u32>, // second
156}
157
158impl From<&risingwave_pb::hummock::TableOption> for TableOption {
159    fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
160        Self {
161            retention_seconds: table_option.retention_seconds,
162        }
163    }
164}
165
166impl From<&TableOption> for risingwave_pb::hummock::TableOption {
167    fn from(table_option: &TableOption) -> Self {
168        Self {
169            retention_seconds: table_option.retention_seconds,
170        }
171    }
172}
173
174impl TableOption {
175    pub fn new(retention_seconds: Option<u32>) -> Self {
176        // now we only support ttl for TableOption
177        TableOption { retention_seconds }
178    }
179}
180
181#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
182#[display("{user_id}")]
183pub struct UserId {
184    pub user_id: u32,
185}
186
187impl UserId {
188    pub const fn new(user_id: u32) -> Self {
189        UserId { user_id }
190    }
191
192    pub const fn placeholder() -> Self {
193        UserId {
194            user_id: OBJECT_ID_PLACEHOLDER,
195        }
196    }
197}
198
199impl From<u32> for UserId {
200    fn from(id: u32) -> Self {
201        Self::new(id)
202    }
203}
204
205impl From<&u32> for UserId {
206    fn from(id: &u32) -> Self {
207        Self::new(*id)
208    }
209}
210
211impl From<UserId> for u32 {
212    fn from(id: UserId) -> Self {
213        id.user_id
214    }
215}
216
217#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
218pub enum ConflictBehavior {
219    #[default]
220    NoCheck,
221    Overwrite,
222    IgnoreConflict,
223    DoUpdateIfNotNull,
224}
225
226#[macro_export]
227macro_rules! _checked_conflict_behaviors {
228    () => {
229        ConflictBehavior::Overwrite
230            | ConflictBehavior::IgnoreConflict
231            | ConflictBehavior::DoUpdateIfNotNull
232    };
233}
234pub use _checked_conflict_behaviors as checked_conflict_behaviors;
235
236impl ConflictBehavior {
237    pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
238        match tb_conflict_behavior {
239            PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
240            PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
241            PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
242            // This is for backward compatibility, in the previous version
243            // `HandleConflictBehavior::Unspecified` represented `NoCheck`, so just treat it as `NoCheck`.
244            PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
245                ConflictBehavior::NoCheck
246            }
247        }
248    }
249
250    pub fn to_protobuf(self) -> PbHandleConflictBehavior {
251        match self {
252            ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
253            ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
254            ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
255            ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
256        }
257    }
258
259    pub fn debug_to_string(self) -> String {
260        match self {
261            ConflictBehavior::NoCheck => "NoCheck".to_owned(),
262            ConflictBehavior::Overwrite => "Overwrite".to_owned(),
263            ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
264            ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
265        }
266    }
267}
268
269#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
270pub enum Engine {
271    #[default]
272    Hummock,
273    Iceberg,
274}
275
276impl Engine {
277    pub fn from_protobuf(engine: &PbEngine) -> Self {
278        match engine {
279            PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
280            PbEngine::Iceberg => Engine::Iceberg,
281        }
282    }
283
284    pub fn to_protobuf(self) -> PbEngine {
285        match self {
286            Engine::Hummock => PbEngine::Hummock,
287            Engine::Iceberg => PbEngine::Iceberg,
288        }
289    }
290
291    pub fn debug_to_string(self) -> String {
292        match self {
293            Engine::Hummock => "Hummock".to_owned(),
294            Engine::Iceberg => "Iceberg".to_owned(),
295        }
296    }
297}
298
299#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
300pub enum StreamJobStatus {
301    #[default]
302    Creating,
303    Created,
304}
305
306impl StreamJobStatus {
307    pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
308        match stream_job_status {
309            PbStreamJobStatus::Creating => StreamJobStatus::Creating,
310            PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
311        }
312    }
313
314    pub fn to_proto(self) -> PbStreamJobStatus {
315        match self {
316            StreamJobStatus::Creating => PbStreamJobStatus::Creating,
317            StreamJobStatus::Created => PbStreamJobStatus::Created,
318        }
319    }
320}
321
322#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord, Default)]
323pub enum CreateType {
324    #[default]
325    Foreground,
326    Background,
327}
328
329impl CreateType {
330    pub fn from_proto(pb_create_type: PbCreateType) -> Self {
331        match pb_create_type {
332            PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
333            PbCreateType::Background => CreateType::Background,
334        }
335    }
336
337    pub fn to_proto(self) -> PbCreateType {
338        match self {
339            CreateType::Foreground => PbCreateType::Foreground,
340            CreateType::Background => PbCreateType::Background,
341        }
342    }
343}
344
345#[derive(Clone, Debug)]
346pub enum AlterDatabaseParam {
347    // Barrier related parameters, per database.
348    // None represents the default value, which means it follows `SystemParams`.
349    BarrierIntervalMs(Option<u32>),
350    CheckpointFrequency(Option<u64>),
351}
352
353macro_rules! for_all_fragment_type_flags {
354    () => {
355        for_all_fragment_type_flags! {
356            {
357                Source,
358                Mview,
359                Sink,
360                Now,
361                StreamScan,
362                BarrierRecv,
363                Values,
364                Dml,
365                CdcFilter,
366                Skipped1,
367                SourceScan,
368                SnapshotBackfillStreamScan,
369                FsFetch,
370                CrossDbSnapshotBackfillStreamScan,
371                StreamCdcScan,
372                VectorIndexWrite,
373                UpstreamSinkUnion,
374                LocalityProvider
375            },
376            {},
377            0
378        }
379    };
380    (
381        {},
382        {
383            $(
384                {$flag:ident, $index:expr}
385            ),*
386        },
387        $next_index:expr
388    ) => {
389        #[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq)]
390        #[repr(u32)]
391        pub enum FragmentTypeFlag {
392            $(
393                $flag = (1 << $index),
394            )*
395        }
396
397        pub const FRAGMENT_TYPE_FLAG_LIST: [FragmentTypeFlag; $next_index] = [
398            $(
399                FragmentTypeFlag::$flag,
400            )*
401        ];
402
403        impl TryFrom<u32> for FragmentTypeFlag {
404            type Error = String;
405
406            fn try_from(value: u32) -> Result<Self, Self::Error> {
407                match value {
408                    $(
409                        value if value == (FragmentTypeFlag::$flag as u32) => Ok(FragmentTypeFlag::$flag),
410                    )*
411                    _ => Err(format!("Invalid FragmentTypeFlag value: {}", value)),
412                }
413            }
414        }
415
416        impl FragmentTypeFlag {
417            pub fn as_str_name(&self) -> &'static str {
418                match self {
419                    $(
420                        FragmentTypeFlag::$flag => paste::paste!{stringify!( [< $flag:snake:upper >] )},
421                    )*
422                }
423            }
424        }
425    };
426    (
427        {$first:ident $(, $rest:ident)*},
428        {
429            $(
430                {$flag:ident, $index:expr}
431            ),*
432        },
433        $next_index:expr
434    ) => {
435        for_all_fragment_type_flags! {
436            {$($rest),*},
437            {
438                $({$flag, $index},)*
439                {$first, $next_index}
440            },
441            $next_index + 1
442        }
443    };
444}
445
446for_all_fragment_type_flags!();
447
448impl FragmentTypeFlag {
449    pub fn raw_flag(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> u32 {
450        flags.into_iter().fold(0, |acc, flag| acc | (flag as u32))
451    }
452
453    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
454    pub fn backfill_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
455        [FragmentTypeFlag::SourceScan, FragmentTypeFlag::StreamScan].into_iter()
456    }
457
458    /// Fragments that may be affected by `SOURCE_RATE_LIMIT`.
459    /// Note: for `FsFetch`, old fragments don't have this flag set, so don't use this to check.
460    pub fn source_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
461        [FragmentTypeFlag::Source, FragmentTypeFlag::FsFetch].into_iter()
462    }
463
464    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
465    pub fn sink_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
466        [FragmentTypeFlag::Sink].into_iter()
467    }
468
469    /// Note: this doesn't include `FsFetch` created in old versions.
470    pub fn rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
471        Self::backfill_rate_limit_fragments()
472            .chain(Self::source_rate_limit_fragments())
473            .chain(Self::sink_rate_limit_fragments())
474    }
475
476    pub fn dml_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
477        [FragmentTypeFlag::Dml].into_iter()
478    }
479}
480
481#[derive(Clone, Copy, Debug, Hash, PartialOrd, PartialEq, Eq, Default)]
482pub struct FragmentTypeMask(u32);
483
484impl Binary for FragmentTypeMask {
485    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486        write!(f, "{:b}", self.0)
487    }
488}
489
490impl From<i32> for FragmentTypeMask {
491    fn from(value: i32) -> Self {
492        Self(value as u32)
493    }
494}
495
496impl From<u32> for FragmentTypeMask {
497    fn from(value: u32) -> Self {
498        Self(value)
499    }
500}
501
502impl From<FragmentTypeMask> for u32 {
503    fn from(value: FragmentTypeMask) -> Self {
504        value.0
505    }
506}
507
508impl From<FragmentTypeMask> for i32 {
509    fn from(value: FragmentTypeMask) -> Self {
510        value.0 as _
511    }
512}
513
514impl FragmentTypeMask {
515    pub fn empty() -> Self {
516        FragmentTypeMask(0)
517    }
518
519    pub fn add(&mut self, flag: FragmentTypeFlag) {
520        self.0 |= flag as u32;
521    }
522
523    pub fn contains_any(&self, flags: impl IntoIterator<Item = FragmentTypeFlag>) -> bool {
524        let flag = FragmentTypeFlag::raw_flag(flags);
525        (self.0 & flag) != 0
526    }
527
528    pub fn contains(&self, flag: FragmentTypeFlag) -> bool {
529        self.contains_any([flag])
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use itertools::Itertools;
536    use risingwave_common::catalog::FRAGMENT_TYPE_FLAG_LIST;
537
538    use crate::catalog::FragmentTypeFlag;
539
540    #[test]
541    fn test_all_fragment_type_flag() {
542        expect_test::expect![[r#"
543            [
544                (
545                    Source,
546                    1,
547                    "SOURCE",
548                ),
549                (
550                    Mview,
551                    2,
552                    "MVIEW",
553                ),
554                (
555                    Sink,
556                    4,
557                    "SINK",
558                ),
559                (
560                    Now,
561                    8,
562                    "NOW",
563                ),
564                (
565                    StreamScan,
566                    16,
567                    "STREAM_SCAN",
568                ),
569                (
570                    BarrierRecv,
571                    32,
572                    "BARRIER_RECV",
573                ),
574                (
575                    Values,
576                    64,
577                    "VALUES",
578                ),
579                (
580                    Dml,
581                    128,
582                    "DML",
583                ),
584                (
585                    CdcFilter,
586                    256,
587                    "CDC_FILTER",
588                ),
589                (
590                    Skipped1,
591                    512,
592                    "SKIPPED1",
593                ),
594                (
595                    SourceScan,
596                    1024,
597                    "SOURCE_SCAN",
598                ),
599                (
600                    SnapshotBackfillStreamScan,
601                    2048,
602                    "SNAPSHOT_BACKFILL_STREAM_SCAN",
603                ),
604                (
605                    FsFetch,
606                    4096,
607                    "FS_FETCH",
608                ),
609                (
610                    CrossDbSnapshotBackfillStreamScan,
611                    8192,
612                    "CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN",
613                ),
614                (
615                    StreamCdcScan,
616                    16384,
617                    "STREAM_CDC_SCAN",
618                ),
619                (
620                    VectorIndexWrite,
621                    32768,
622                    "VECTOR_INDEX_WRITE",
623                ),
624                (
625                    UpstreamSinkUnion,
626                    65536,
627                    "UPSTREAM_SINK_UNION",
628                ),
629                (
630                    LocalityProvider,
631                    131072,
632                    "LOCALITY_PROVIDER",
633                ),
634            ]
635        "#]]
636        .assert_debug_eq(
637            &FRAGMENT_TYPE_FLAG_LIST
638                .into_iter()
639                .map(|flag| (flag, flag as u32, flag.as_str_name()))
640                .collect_vec(),
641        );
642        for flag in FRAGMENT_TYPE_FLAG_LIST {
643            assert_eq!(FragmentTypeFlag::try_from(flag as u32).unwrap(), flag);
644        }
645    }
646}