risingwave_common/catalog/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::fmt::Binary;
23use std::sync::Arc;
24
25pub use column::*;
26pub use external_table::*;
27use futures::stream::BoxStream;
28pub use internal_table::*;
29use parse_display::Display;
30pub use physical_table::*;
31use risingwave_pb::catalog::table::PbEngine;
32use risingwave_pb::catalog::{
33    CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
34    StreamJobStatus as PbStreamJobStatus,
35};
36use risingwave_pb::plan_common::ColumnDescVersion;
37pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
38
39use crate::array::DataChunk;
40pub use crate::constants::hummock;
41use crate::error::BoxedError;
42pub use crate::id::*;
43
44/// The global version of the catalog.
45pub type CatalogVersion = u64;
46
47/// The version number of the per-table catalog.
48pub type TableVersionId = u64;
49/// The default version ID for a new table.
50pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
51/// The version number of the per-source catalog.
52pub type SourceVersionId = u64;
53/// The default version ID for a new source.
54pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
55
56pub const DEFAULT_DATABASE_NAME: &str = "dev";
57pub const DEFAULT_SCHEMA_NAME: &str = "public";
58pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
59pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
60pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
61pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
62pub const DEFAULT_SUPER_USER: &str = "root";
63pub const DEFAULT_SUPER_USER_ID: UserId = UserId::new(1);
64// This is for compatibility with customized utils for PostgreSQL.
65pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
66pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
67
68// This is the default superuser for admin, which is used only for cloud control plane.
69pub const DEFAULT_SUPER_USER_FOR_ADMIN: &str = "rwadmin";
70pub const DEFAULT_SUPER_USER_FOR_ADMIN_ID: UserId = UserId::new(3);
71
72pub const NON_RESERVED_USER_ID: UserId = UserId::new(11);
73
74pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
75pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
76
77pub use risingwave_pb::id::OBJECT_ID_PLACEHOLDER;
78
79pub const SYSTEM_SCHEMAS: [&str; 3] = [
80    PG_CATALOG_SCHEMA_NAME,
81    INFORMATION_SCHEMA_SCHEMA_NAME,
82    RW_CATALOG_SCHEMA_NAME,
83];
84pub fn is_system_schema(schema_name: &str) -> bool {
85    SYSTEM_SCHEMAS.contains(&schema_name)
86}
87
88pub fn is_reserved_admin_user(user_name: &str) -> bool {
89    user_name == DEFAULT_SUPER_USER_FOR_ADMIN
90}
91
92pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
93
94/// When there is no primary key specified while creating source, will use
95/// the message key as primary key in `BYTEA` type with this name.
96/// Note: the field has version to track, please refer to [`default_key_column_name_version_mapping`]
97pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
98
99pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
100    match version {
101        ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
102        _ => DEFAULT_KEY_COLUMN_NAME,
103    }
104}
105
106/// For kafka source, we attach a hidden column [`KAFKA_TIMESTAMP_COLUMN_NAME`] to it, so that we
107/// can limit the timestamp range when querying it directly with batch query. The column type is
108/// [`crate::types::DataType::Timestamptz`]. For more details, please refer to
109/// [this rfc](https://github.com/risingwavelabs/rfcs/pull/20).
110pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
111
112/// RisingWave iceberg table engine will create the column `_risingwave_iceberg_row_id` in the iceberg table.
113///
114/// Iceberg V3 spec use `_row_id` as a reserved column name for row lineage, so if the table without primary key,
115/// we can't use `_row_id` directly for iceberg, so use `_risingwave_iceberg_row_id` instead.
116pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
117
118pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
119/// The column ID preserved for the row ID column.
120pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
121
122/// The column ID offset for user-defined columns.
123///
124/// All IDs of user-defined columns must be greater or equal to this value.
125pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
126
127pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
128pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
129
130/// The column name for the projected row ID in `ProjectSet`.
131/// This is a hidden column used to track row indices when expanding set-returning functions.
132pub const PROJECTED_ROW_ID_COLUMN_NAME: &str = "_rw_projected_row_id";
133
134pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
135pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
136pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
137
138pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
139/// The number of columns output by the cdc source job
140/// see [`ColumnCatalog::debezium_cdc_source_cols()`] for details
141pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
142pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
143
144pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
145pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
146
147/// The local system catalog reader in the frontend node.
148pub trait SysCatalogReader: Sync + Send + 'static {
149    /// Reads the data of the system catalog table.
150    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
151}
152
153pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
154
155#[derive(Clone, Debug, PartialEq, Default, Copy)]
156pub struct TableOption {
157    pub retention_seconds: Option<u32>, // second
158}
159
160impl From<&risingwave_pb::hummock::TableOption> for TableOption {
161    fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
162        Self {
163            retention_seconds: table_option.retention_seconds,
164        }
165    }
166}
167
168impl From<&TableOption> for risingwave_pb::hummock::TableOption {
169    fn from(table_option: &TableOption) -> Self {
170        Self {
171            retention_seconds: table_option.retention_seconds,
172        }
173    }
174}
175
176impl TableOption {
177    pub fn new(retention_seconds: Option<u32>) -> Self {
178        // now we only support ttl for TableOption
179        TableOption { retention_seconds }
180    }
181}
182
183#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
184pub enum ConflictBehavior {
185    #[default]
186    NoCheck,
187    Overwrite,
188    IgnoreConflict,
189    DoUpdateIfNotNull,
190}
191
192#[macro_export]
193macro_rules! _checked_conflict_behaviors {
194    () => {
195        ConflictBehavior::Overwrite
196            | ConflictBehavior::IgnoreConflict
197            | ConflictBehavior::DoUpdateIfNotNull
198    };
199}
200pub use _checked_conflict_behaviors as checked_conflict_behaviors;
201
202impl ConflictBehavior {
203    pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
204        match tb_conflict_behavior {
205            PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
206            PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
207            PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
208            // This is for backward compatibility, in the previous version
209            // `HandleConflictBehavior::Unspecified` represented `NoCheck`, so just treat it as `NoCheck`.
210            PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
211                ConflictBehavior::NoCheck
212            }
213        }
214    }
215
216    pub fn to_protobuf(self) -> PbHandleConflictBehavior {
217        match self {
218            ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
219            ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
220            ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
221            ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
222        }
223    }
224
225    pub fn debug_to_string(self) -> String {
226        match self {
227            ConflictBehavior::NoCheck => "NoCheck".to_owned(),
228            ConflictBehavior::Overwrite => "Overwrite".to_owned(),
229            ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
230            ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
231        }
232    }
233}
234
235#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
236pub enum Engine {
237    #[default]
238    Hummock,
239    Iceberg,
240}
241
242impl Engine {
243    pub fn from_protobuf(engine: &PbEngine) -> Self {
244        match engine {
245            PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
246            PbEngine::Iceberg => Engine::Iceberg,
247        }
248    }
249
250    pub fn to_protobuf(self) -> PbEngine {
251        match self {
252            Engine::Hummock => PbEngine::Hummock,
253            Engine::Iceberg => PbEngine::Iceberg,
254        }
255    }
256
257    pub fn debug_to_string(self) -> String {
258        match self {
259            Engine::Hummock => "Hummock".to_owned(),
260            Engine::Iceberg => "Iceberg".to_owned(),
261        }
262    }
263}
264
265#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
266pub enum StreamJobStatus {
267    #[default]
268    Creating,
269    Created,
270}
271
272impl StreamJobStatus {
273    pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
274        match stream_job_status {
275            PbStreamJobStatus::Creating => StreamJobStatus::Creating,
276            PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
277        }
278    }
279
280    pub fn to_proto(self) -> PbStreamJobStatus {
281        match self {
282            StreamJobStatus::Creating => PbStreamJobStatus::Creating,
283            StreamJobStatus::Created => PbStreamJobStatus::Created,
284        }
285    }
286}
287
288#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord, Default)]
289pub enum CreateType {
290    #[default]
291    Foreground,
292    Background,
293}
294
295impl CreateType {
296    pub fn from_proto(pb_create_type: PbCreateType) -> Self {
297        match pb_create_type {
298            PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
299            PbCreateType::Background => CreateType::Background,
300        }
301    }
302
303    pub fn to_proto(self) -> PbCreateType {
304        match self {
305            CreateType::Foreground => PbCreateType::Foreground,
306            CreateType::Background => PbCreateType::Background,
307        }
308    }
309}
310
311#[derive(Clone, Debug)]
312pub enum AlterDatabaseParam {
313    // Barrier related parameters, per database.
314    // None represents the default value, which means it follows `SystemParams`.
315    BarrierIntervalMs(Option<u32>),
316    CheckpointFrequency(Option<u64>),
317}
318
319macro_rules! for_all_fragment_type_flags {
320    () => {
321        for_all_fragment_type_flags! {
322            {
323                Source,
324                Mview,
325                Sink,
326                Now,
327                StreamScan,
328                BarrierRecv,
329                Values,
330                Dml,
331                CdcFilter,
332                Skipped1,
333                SourceScan,
334                SnapshotBackfillStreamScan,
335                FsFetch,
336                CrossDbSnapshotBackfillStreamScan,
337                StreamCdcScan,
338                VectorIndexWrite,
339                UpstreamSinkUnion,
340                LocalityProvider
341            },
342            {},
343            0
344        }
345    };
346    (
347        {},
348        {
349            $(
350                {$flag:ident, $index:expr}
351            ),*
352        },
353        $next_index:expr
354    ) => {
355        #[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq)]
356        #[repr(u32)]
357        pub enum FragmentTypeFlag {
358            $(
359                $flag = (1 << $index),
360            )*
361        }
362
363        pub const FRAGMENT_TYPE_FLAG_LIST: [FragmentTypeFlag; $next_index] = [
364            $(
365                FragmentTypeFlag::$flag,
366            )*
367        ];
368
369        impl TryFrom<u32> for FragmentTypeFlag {
370            type Error = String;
371
372            fn try_from(value: u32) -> Result<Self, Self::Error> {
373                match value {
374                    $(
375                        value if value == (FragmentTypeFlag::$flag as u32) => Ok(FragmentTypeFlag::$flag),
376                    )*
377                    _ => Err(format!("Invalid FragmentTypeFlag value: {}", value)),
378                }
379            }
380        }
381
382        impl FragmentTypeFlag {
383            pub fn as_str_name(&self) -> &'static str {
384                match self {
385                    $(
386                        FragmentTypeFlag::$flag => paste::paste!{stringify!( [< $flag:snake:upper >] )},
387                    )*
388                }
389            }
390        }
391    };
392    (
393        {$first:ident $(, $rest:ident)*},
394        {
395            $(
396                {$flag:ident, $index:expr}
397            ),*
398        },
399        $next_index:expr
400    ) => {
401        for_all_fragment_type_flags! {
402            {$($rest),*},
403            {
404                $({$flag, $index},)*
405                {$first, $next_index}
406            },
407            $next_index + 1
408        }
409    };
410}
411
412for_all_fragment_type_flags!();
413
414impl FragmentTypeFlag {
415    pub fn raw_flag(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> u32 {
416        flags.into_iter().fold(0, |acc, flag| acc | (flag as u32))
417    }
418
419    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
420    pub fn backfill_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
421        [FragmentTypeFlag::SourceScan, FragmentTypeFlag::StreamScan].into_iter()
422    }
423
424    /// Fragments that may be affected by `SOURCE_RATE_LIMIT`.
425    /// Note: for `FsFetch`, old fragments don't have this flag set, so don't use this to check.
426    pub fn source_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
427        [FragmentTypeFlag::Source, FragmentTypeFlag::FsFetch].into_iter()
428    }
429
430    /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
431    pub fn sink_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
432        [FragmentTypeFlag::Sink].into_iter()
433    }
434
435    /// Note: this doesn't include `FsFetch` created in old versions.
436    pub fn rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
437        Self::backfill_rate_limit_fragments()
438            .chain(Self::source_rate_limit_fragments())
439            .chain(Self::sink_rate_limit_fragments())
440    }
441
442    pub fn dml_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
443        [FragmentTypeFlag::Dml].into_iter()
444    }
445}
446
447#[derive(Clone, Copy, Debug, Hash, PartialOrd, PartialEq, Eq, Default)]
448pub struct FragmentTypeMask(u32);
449
450impl Binary for FragmentTypeMask {
451    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452        write!(f, "{:b}", self.0)
453    }
454}
455
456impl From<i32> for FragmentTypeMask {
457    fn from(value: i32) -> Self {
458        Self(value as u32)
459    }
460}
461
462impl From<u32> for FragmentTypeMask {
463    fn from(value: u32) -> Self {
464        Self(value)
465    }
466}
467
468impl From<FragmentTypeMask> for u32 {
469    fn from(value: FragmentTypeMask) -> Self {
470        value.0
471    }
472}
473
474impl From<FragmentTypeMask> for i32 {
475    fn from(value: FragmentTypeMask) -> Self {
476        value.0 as _
477    }
478}
479
480impl FragmentTypeMask {
481    pub fn empty() -> Self {
482        FragmentTypeMask(0)
483    }
484
485    pub fn add(&mut self, flag: FragmentTypeFlag) {
486        self.0 |= flag as u32;
487    }
488
489    pub fn contains_any(&self, flags: impl IntoIterator<Item = FragmentTypeFlag>) -> bool {
490        let flag = FragmentTypeFlag::raw_flag(flags);
491        (self.0 & flag) != 0
492    }
493
494    pub fn contains(&self, flag: FragmentTypeFlag) -> bool {
495        self.contains_any([flag])
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use itertools::Itertools;
502    use risingwave_common::catalog::FRAGMENT_TYPE_FLAG_LIST;
503
504    use crate::catalog::FragmentTypeFlag;
505
506    #[test]
507    fn test_all_fragment_type_flag() {
508        expect_test::expect![[r#"
509            [
510                (
511                    Source,
512                    1,
513                    "SOURCE",
514                ),
515                (
516                    Mview,
517                    2,
518                    "MVIEW",
519                ),
520                (
521                    Sink,
522                    4,
523                    "SINK",
524                ),
525                (
526                    Now,
527                    8,
528                    "NOW",
529                ),
530                (
531                    StreamScan,
532                    16,
533                    "STREAM_SCAN",
534                ),
535                (
536                    BarrierRecv,
537                    32,
538                    "BARRIER_RECV",
539                ),
540                (
541                    Values,
542                    64,
543                    "VALUES",
544                ),
545                (
546                    Dml,
547                    128,
548                    "DML",
549                ),
550                (
551                    CdcFilter,
552                    256,
553                    "CDC_FILTER",
554                ),
555                (
556                    Skipped1,
557                    512,
558                    "SKIPPED1",
559                ),
560                (
561                    SourceScan,
562                    1024,
563                    "SOURCE_SCAN",
564                ),
565                (
566                    SnapshotBackfillStreamScan,
567                    2048,
568                    "SNAPSHOT_BACKFILL_STREAM_SCAN",
569                ),
570                (
571                    FsFetch,
572                    4096,
573                    "FS_FETCH",
574                ),
575                (
576                    CrossDbSnapshotBackfillStreamScan,
577                    8192,
578                    "CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN",
579                ),
580                (
581                    StreamCdcScan,
582                    16384,
583                    "STREAM_CDC_SCAN",
584                ),
585                (
586                    VectorIndexWrite,
587                    32768,
588                    "VECTOR_INDEX_WRITE",
589                ),
590                (
591                    UpstreamSinkUnion,
592                    65536,
593                    "UPSTREAM_SINK_UNION",
594                ),
595                (
596                    LocalityProvider,
597                    131072,
598                    "LOCALITY_PROVIDER",
599                ),
600            ]
601        "#]]
602        .assert_debug_eq(
603            &FRAGMENT_TYPE_FLAG_LIST
604                .into_iter()
605                .map(|flag| (flag, flag as u32, flag.as_str_name()))
606                .collect_vec(),
607        );
608        for flag in FRAGMENT_TYPE_FLAG_LIST {
609            assert_eq!(FragmentTypeFlag::try_from(flag as u32).unwrap(), flag);
610        }
611    }
612}