risingwave_frontend/catalog/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Definitions of catalog structs.
16//!
17//! The main struct is [`root_catalog::Catalog`], which is the root containing other catalog
18//! structs. It is accessed via [`catalog_service::CatalogReader`] and
19//! [`catalog_service::CatalogWriter`], which is held by [`crate::session::FrontendEnv`].
20
21use risingwave_common::catalog::{
22    ROW_ID_COLUMN_NAME, RW_RESERVED_COLUMN_NAME_PREFIX, is_system_schema,
23};
24use risingwave_common::error::code::PostgresErrorCode;
25use risingwave_connector::sink::catalog::SinkCatalog;
26use risingwave_pb::user::grant_privilege::Object as PbGrantObject;
27use thiserror::Error;
28
29use crate::error::{ErrorCode, Result, RwError};
30pub(crate) mod catalog_service;
31pub mod purify;
32
33pub(crate) mod connection_catalog;
34pub(crate) mod database_catalog;
35pub(crate) mod function_catalog;
36pub(crate) mod index_catalog;
37pub(crate) mod root_catalog;
38pub(crate) mod schema_catalog;
39pub(crate) mod source_catalog;
40pub(crate) mod subscription_catalog;
41pub(crate) mod system_catalog;
42pub(crate) mod table_catalog;
43pub(crate) mod view_catalog;
44
45pub(crate) mod secret_catalog;
46
47pub(crate) use catalog_service::CatalogReader;
48pub use index_catalog::IndexCatalog;
49pub use table_catalog::TableCatalog;
50
51use crate::user::UserId;
52
53pub(crate) type ConnectionId = risingwave_common::id::ConnectionId;
54pub(crate) type SourceId = risingwave_common::id::SourceId;
55pub(crate) type SinkId = risingwave_common::id::SinkId;
56pub(crate) type SubscriptionId = risingwave_common::id::SubscriptionId;
57pub(crate) type ViewId = risingwave_common::id::ViewId;
58pub(crate) type DatabaseId = risingwave_common::catalog::DatabaseId;
59pub(crate) type SchemaId = risingwave_common::catalog::SchemaId;
60pub(crate) type TableId = risingwave_common::catalog::TableId;
61pub(crate) type ColumnId = risingwave_common::catalog::ColumnId;
62pub(crate) type FragmentId = risingwave_common::id::FragmentId;
63pub(crate) type SecretId = risingwave_common::catalog::SecretId;
64
65/// Check if the column name does not conflict with the internally reserved column name.
66pub fn check_column_name_not_reserved(column_name: &str) -> Result<()> {
67    if column_name.starts_with(ROW_ID_COLUMN_NAME) {
68        return Err(ErrorCode::InternalError(format!(
69            "column name prefixed with {:?} are reserved word.",
70            ROW_ID_COLUMN_NAME
71        ))
72        .into());
73    }
74
75    if column_name.starts_with(RW_RESERVED_COLUMN_NAME_PREFIX) {
76        return Err(ErrorCode::InternalError(format!(
77            "column name prefixed with {:?} are reserved word.",
78            RW_RESERVED_COLUMN_NAME_PREFIX
79        ))
80        .into());
81    }
82
83    if ["tableoid", "xmin", "cmin", "xmax", "cmax", "ctid"].contains(&column_name) {
84        return Err(ErrorCode::InvalidInputSyntax(format!(
85            "column name \"{column_name}\" conflicts with a system column name"
86        ))
87        .into());
88    }
89
90    Ok(())
91}
92
93/// Check if modifications happen to system catalog.
94pub fn check_schema_writable(schema: &str) -> Result<()> {
95    if is_system_schema(schema) {
96        Err(ErrorCode::ProtocolError(format!(
97            "permission denied to write on \"{}\", System catalog modifications are currently disallowed.",
98            schema
99        )).into())
100    } else {
101        Ok(())
102    }
103}
104
105pub type CatalogResult<T> = std::result::Result<T, CatalogError>;
106
107// TODO(error-handling): provide more concrete error code for different object types.
108#[derive(Error, Debug, thiserror_ext::Box)]
109#[thiserror_ext(newtype(name = CatalogError, extra_provide = Self::provide_postgres_error_code))]
110pub enum CatalogErrorInner {
111    #[error("{object_type} not found: {name}")]
112    NotFound {
113        object_type: &'static str,
114        name: String,
115    },
116
117    #[error(
118        "{object_type} with name {name} exists{}",
119        if *.under_creation { " but under creation" } else { "" },
120    )]
121    Duplicated {
122        object_type: &'static str,
123        name: String,
124        under_creation: bool, // only used for StreamingJob type and Subscription for now
125    },
126}
127
128impl CatalogError {
129    /// Provide the Postgres error code for the error.
130    fn provide_postgres_error_code(&self, request: &mut std::error::Request<'_>) {
131        match self.inner() {
132            CatalogErrorInner::NotFound { object_type, .. } => {
133                // `database` not found should map to SQLSTATE 3D000 (Invalid Catalog Name),
134                // which is used by Postgres for non-existing database in startup.
135                if *object_type == "database" {
136                    request.provide_value(PostgresErrorCode::InvalidCatalogName);
137                } else {
138                    request.provide_value(PostgresErrorCode::UndefinedObject);
139                }
140            }
141            CatalogErrorInner::Duplicated { .. } => {
142                request.provide_value(PostgresErrorCode::DuplicateObject);
143            }
144        };
145    }
146
147    /// Construct a `not found` error.
148    pub fn not_found(object_type: &'static str, name: impl Into<String>) -> Self {
149        CatalogErrorInner::NotFound {
150            object_type,
151            name: name.into(),
152        }
153        .into()
154    }
155
156    /// Construct a `duplicated` error.
157    pub fn duplicated(object_type: &'static str, name: impl Into<String>) -> Self {
158        Self::duplicated_under_creation(object_type, name, false)
159    }
160
161    /// Construct a `duplicated` error with `under_creation` flag.
162    pub fn duplicated_under_creation(
163        object_type: &'static str,
164        name: impl Into<String>,
165        under_creation: bool,
166    ) -> Self {
167        CatalogErrorInner::Duplicated {
168            object_type,
169            name: name.into(),
170            under_creation,
171        }
172        .into()
173    }
174
175    /// Whether the error is a `duplicated` error for the given object type.
176    pub fn is_duplicated(&self, object_type: &'static str) -> bool {
177        matches!(
178            self.inner(),
179            CatalogErrorInner::Duplicated { object_type: t, .. } if *t == object_type
180        )
181    }
182
183    /// Whether the error is a `not found` error for the given object type.
184    pub fn is_not_found(&self, object_type: &'static str) -> bool {
185        matches!(
186            self.inner(),
187            CatalogErrorInner::NotFound { object_type: t, .. } if *t == object_type
188        )
189    }
190}
191
192impl From<CatalogError> for RwError {
193    fn from(e: CatalogError) -> Self {
194        ErrorCode::CatalogError(Box::new(e)).into()
195    }
196}
197
198/// A trait for the catalog with owners, including relations (table, index, sink, etc.) and
199/// function, connection.
200///
201/// This trait can be used to reduce code duplication and can be extended if needed in the future.
202pub trait OwnedByUserCatalog {
203    /// Returns the owner of the catalog.
204    fn owner(&self) -> UserId;
205}
206
207impl OwnedByUserCatalog for SinkCatalog {
208    fn owner(&self) -> UserId {
209        self.owner.user_id
210    }
211}
212
213pub struct OwnedGrantObject {
214    pub owner: UserId,
215    pub object: PbGrantObject,
216}