risingwave_common/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::sync::Arc;
23
24pub use column::*;
25pub use external_table::*;
26use futures::stream::BoxStream;
27pub use internal_table::*;
28use parse_display::Display;
29pub use physical_table::*;
30use risingwave_pb::catalog::table::PbEngine;
31use risingwave_pb::catalog::{
32    CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
33    StreamJobStatus as PbStreamJobStatus,
34};
35use risingwave_pb::plan_common::ColumnDescVersion;
36pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
37use serde::{Deserialize, Serialize};
38
39use crate::array::DataChunk;
40pub use crate::constants::hummock;
41use crate::error::BoxedError;
42
43/// The global version of the catalog.
44pub type CatalogVersion = u64;
45
46/// The version number of the per-table catalog.
47pub type TableVersionId = u64;
48/// The default version ID for a new table.
49pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
50/// The version number of the per-source catalog.
51pub type SourceVersionId = u64;
52/// The default version ID for a new source.
53pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
54
55pub const DEFAULT_DATABASE_NAME: &str = "dev";
56pub const DEFAULT_SCHEMA_NAME: &str = "public";
57pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
58pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
59pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
60pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
61pub const DEFAULT_SUPER_USER: &str = "root";
62pub const DEFAULT_SUPER_USER_ID: u32 = 1;
63// This is for compatibility with customized utils for PostgreSQL.
64pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
65pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
66
67pub const NON_RESERVED_USER_ID: i32 = 11;
68
69pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
70pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
71
72pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
73
74pub const SYSTEM_SCHEMAS: [&str; 3] = [
75    PG_CATALOG_SCHEMA_NAME,
76    INFORMATION_SCHEMA_SCHEMA_NAME,
77    RW_CATALOG_SCHEMA_NAME,
78];
79pub fn is_system_schema(schema_name: &str) -> bool {
80    SYSTEM_SCHEMAS.contains(&schema_name)
81}
82
83pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
84
85/// When there is no primary key specified while creating source, will use the
86/// the message key as primary key in `BYTEA` type with this name.
87/// Note: the field has version to track, please refer to [`default_key_column_name_version_mapping`]
88pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
89
90pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
91    match version {
92        ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
93        _ => DEFAULT_KEY_COLUMN_NAME,
94    }
95}
96
97/// For kafka source, we attach a hidden column [`KAFKA_TIMESTAMP_COLUMN_NAME`] to it, so that we
98/// can limit the timestamp range when querying it directly with batch query. The column type is
99/// [`crate::types::DataType::Timestamptz`]. For more details, please refer to
100/// [this rfc](https://github.com/risingwavelabs/rfcs/pull/20).
101pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
102
103/// RisingWave iceberg table engine will create the column `_risingwave_iceberg_row_id` in the iceberg table.
104///
105/// Iceberg V3 spec use `_row_id` as a reserved column name for row lineage, so if the table without primary key,
106/// we can't use `_row_id` directly for iceberg, so use `_risingwave_iceberg_row_id` instead.
107pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
108
109pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
110/// The column ID preserved for the row ID column.
111pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
112
113/// The column ID offset for user-defined columns.
114///
115/// All IDs of user-defined columns must be greater or equal to this value.
116pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
117
118pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
119pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
120
121pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
122pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
123pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
124
125pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
126/// The number of columns output by the cdc source job
127/// see [`ColumnCatalog::debezium_cdc_source_cols()`] for details
128pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
129pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
130
131/// The local system catalog reader in the frontend node.
132pub trait SysCatalogReader: Sync + Send + 'static {
133    /// Reads the data of the system catalog table.
134    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
135}
136
137pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
138
139pub type ObjectId = u32;
140
141#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
142#[display("{database_id}")]
143pub struct DatabaseId {
144    pub database_id: u32,
145}
146
147impl DatabaseId {
148    pub const fn new(database_id: u32) -> Self {
149        DatabaseId { database_id }
150    }
151
152    pub fn placeholder() -> Self {
153        DatabaseId {
154            database_id: OBJECT_ID_PLACEHOLDER,
155        }
156    }
157}
158
159impl From<u32> for DatabaseId {
160    fn from(id: u32) -> Self {
161        Self::new(id)
162    }
163}
164
165impl From<&u32> for DatabaseId {
166    fn from(id: &u32) -> Self {
167        Self::new(*id)
168    }
169}
170
171impl From<DatabaseId> for u32 {
172    fn from(id: DatabaseId) -> Self {
173        id.database_id
174    }
175}
176
177#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)]
178#[display("{schema_id}")]
179pub struct SchemaId {
180    pub schema_id: u32,
181}
182
183impl SchemaId {
184    pub fn new(schema_id: u32) -> Self {
185        SchemaId { schema_id }
186    }
187
188    pub fn placeholder() -> Self {
189        SchemaId {
190            schema_id: OBJECT_ID_PLACEHOLDER,
191        }
192    }
193}
194
195impl From<u32> for SchemaId {
196    fn from(id: u32) -> Self {
197        Self::new(id)
198    }
199}
200
201impl From<&u32> for SchemaId {
202    fn from(id: &u32) -> Self {
203        Self::new(*id)
204    }
205}
206
207impl From<SchemaId> for u32 {
208    fn from(id: SchemaId) -> Self {
209        id.schema_id
210    }
211}
212
213#[derive(
214    Clone,
215    Copy,
216    Debug,
217    Display,
218    Default,
219    Hash,
220    PartialOrd,
221    PartialEq,
222    Eq,
223    Ord,
224    Serialize,
225    Deserialize,
226)]
227#[display("{table_id}")]
228pub struct TableId {
229    pub table_id: u32,
230}
231
232impl TableId {
233    pub const fn new(table_id: u32) -> Self {
234        TableId { table_id }
235    }
236
237    /// Sometimes the id field is filled later, we use this value for better debugging.
238    pub const fn placeholder() -> Self {
239        TableId {
240            table_id: OBJECT_ID_PLACEHOLDER,
241        }
242    }
243
244    pub fn table_id(&self) -> u32 {
245        self.table_id
246    }
247}
248
249impl From<u32> for TableId {
250    fn from(id: u32) -> Self {
251        Self::new(id)
252    }
253}
254
255impl From<&u32> for TableId {
256    fn from(id: &u32) -> Self {
257        Self::new(*id)
258    }
259}
260
261impl From<TableId> for u32 {
262    fn from(id: TableId) -> Self {
263        id.table_id
264    }
265}
266
267#[derive(Clone, Debug, PartialEq, Default, Copy)]
268pub struct TableOption {
269    pub retention_seconds: Option<u32>, // second
270}
271
272impl From<&risingwave_pb::hummock::TableOption> for TableOption {
273    fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
274        Self {
275            retention_seconds: table_option.retention_seconds,
276        }
277    }
278}
279
280impl From<&TableOption> for risingwave_pb::hummock::TableOption {
281    fn from(table_option: &TableOption) -> Self {
282        Self {
283            retention_seconds: table_option.retention_seconds,
284        }
285    }
286}
287
288impl TableOption {
289    pub fn new(retention_seconds: Option<u32>) -> Self {
290        // now we only support ttl for TableOption
291        TableOption { retention_seconds }
292    }
293}
294
295#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq)]
296#[display("{index_id}")]
297pub struct IndexId {
298    pub index_id: u32,
299}
300
301impl IndexId {
302    pub const fn new(index_id: u32) -> Self {
303        IndexId { index_id }
304    }
305
306    /// Sometimes the id field is filled later, we use this value for better debugging.
307    pub const fn placeholder() -> Self {
308        IndexId {
309            index_id: OBJECT_ID_PLACEHOLDER,
310        }
311    }
312
313    pub fn index_id(&self) -> u32 {
314        self.index_id
315    }
316}
317
318impl From<u32> for IndexId {
319    fn from(id: u32) -> Self {
320        Self::new(id)
321    }
322}
323impl From<IndexId> for u32 {
324    fn from(id: IndexId) -> Self {
325        id.index_id
326    }
327}
328
329#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
330pub struct FunctionId(pub u32);
331
332impl FunctionId {
333    pub const fn new(id: u32) -> Self {
334        FunctionId(id)
335    }
336
337    pub const fn placeholder() -> Self {
338        FunctionId(OBJECT_ID_PLACEHOLDER)
339    }
340
341    pub fn function_id(&self) -> u32 {
342        self.0
343    }
344}
345
346impl From<u32> for FunctionId {
347    fn from(id: u32) -> Self {
348        Self::new(id)
349    }
350}
351
352impl From<&u32> for FunctionId {
353    fn from(id: &u32) -> Self {
354        Self::new(*id)
355    }
356}
357
358impl From<FunctionId> for u32 {
359    fn from(id: FunctionId) -> Self {
360        id.0
361    }
362}
363
364#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
365#[display("{user_id}")]
366pub struct UserId {
367    pub user_id: u32,
368}
369
370impl UserId {
371    pub const fn new(user_id: u32) -> Self {
372        UserId { user_id }
373    }
374
375    pub const fn placeholder() -> Self {
376        UserId {
377            user_id: OBJECT_ID_PLACEHOLDER,
378        }
379    }
380}
381
382impl From<u32> for UserId {
383    fn from(id: u32) -> Self {
384        Self::new(id)
385    }
386}
387
388impl From<&u32> for UserId {
389    fn from(id: &u32) -> Self {
390        Self::new(*id)
391    }
392}
393
394impl From<UserId> for u32 {
395    fn from(id: UserId) -> Self {
396        id.user_id
397    }
398}
399
400#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
401pub struct ConnectionId(pub u32);
402
403impl ConnectionId {
404    pub const fn new(id: u32) -> Self {
405        ConnectionId(id)
406    }
407
408    pub const fn placeholder() -> Self {
409        ConnectionId(OBJECT_ID_PLACEHOLDER)
410    }
411
412    pub fn connection_id(&self) -> u32 {
413        self.0
414    }
415}
416
417impl From<u32> for ConnectionId {
418    fn from(id: u32) -> Self {
419        Self::new(id)
420    }
421}
422
423impl From<&u32> for ConnectionId {
424    fn from(id: &u32) -> Self {
425        Self::new(*id)
426    }
427}
428
429impl From<ConnectionId> for u32 {
430    fn from(id: ConnectionId) -> Self {
431        id.0
432    }
433}
434
435#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
436pub struct SecretId(pub u32);
437
438impl SecretId {
439    pub const fn new(id: u32) -> Self {
440        SecretId(id)
441    }
442
443    pub const fn placeholder() -> Self {
444        SecretId(OBJECT_ID_PLACEHOLDER)
445    }
446
447    pub fn secret_id(&self) -> u32 {
448        self.0
449    }
450}
451
452impl From<u32> for SecretId {
453    fn from(id: u32) -> Self {
454        Self::new(id)
455    }
456}
457
458impl From<&u32> for SecretId {
459    fn from(id: &u32) -> Self {
460        Self::new(*id)
461    }
462}
463
464impl From<SecretId> for u32 {
465    fn from(id: SecretId) -> Self {
466        id.0
467    }
468}
469
470#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
471pub enum ConflictBehavior {
472    #[default]
473    NoCheck,
474    Overwrite,
475    IgnoreConflict,
476    DoUpdateIfNotNull,
477}
478
479impl ConflictBehavior {
480    pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
481        match tb_conflict_behavior {
482            PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
483            PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
484            PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
485            // This is for backward compatibility, in the previous version
486            // `HandleConflictBehavior::Unspecified` represented `NoCheck`, so just treat it as `NoCheck`.
487            PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
488                ConflictBehavior::NoCheck
489            }
490        }
491    }
492
493    pub fn to_protobuf(self) -> PbHandleConflictBehavior {
494        match self {
495            ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
496            ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
497            ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
498            ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
499        }
500    }
501
502    pub fn debug_to_string(self) -> String {
503        match self {
504            ConflictBehavior::NoCheck => "NoCheck".to_owned(),
505            ConflictBehavior::Overwrite => "Overwrite".to_owned(),
506            ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
507            ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
508        }
509    }
510}
511
512#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
513pub enum Engine {
514    #[default]
515    Hummock,
516    Iceberg,
517}
518
519impl Engine {
520    pub fn from_protobuf(engine: &PbEngine) -> Self {
521        match engine {
522            PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
523            PbEngine::Iceberg => Engine::Iceberg,
524        }
525    }
526
527    pub fn to_protobuf(self) -> PbEngine {
528        match self {
529            Engine::Hummock => PbEngine::Hummock,
530            Engine::Iceberg => PbEngine::Iceberg,
531        }
532    }
533
534    pub fn debug_to_string(self) -> String {
535        match self {
536            Engine::Hummock => "Hummock".to_owned(),
537            Engine::Iceberg => "Iceberg".to_owned(),
538        }
539    }
540}
541
542#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
543pub enum StreamJobStatus {
544    #[default]
545    Creating,
546    Created,
547}
548
549impl StreamJobStatus {
550    pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
551        match stream_job_status {
552            PbStreamJobStatus::Creating => StreamJobStatus::Creating,
553            PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
554        }
555    }
556
557    pub fn to_proto(self) -> PbStreamJobStatus {
558        match self {
559            StreamJobStatus::Creating => PbStreamJobStatus::Creating,
560            StreamJobStatus::Created => PbStreamJobStatus::Created,
561        }
562    }
563}
564
565#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
566pub enum CreateType {
567    Foreground,
568    Background,
569}
570
571impl Default for CreateType {
572    fn default() -> Self {
573        Self::Foreground
574    }
575}
576
577impl CreateType {
578    pub fn from_proto(pb_create_type: PbCreateType) -> Self {
579        match pb_create_type {
580            PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
581            PbCreateType::Background => CreateType::Background,
582        }
583    }
584
585    pub fn to_proto(self) -> PbCreateType {
586        match self {
587            CreateType::Foreground => PbCreateType::Foreground,
588            CreateType::Background => PbCreateType::Background,
589        }
590    }
591}