risingwave_frontend/catalog/
mod.rs1use risingwave_common::catalog::{
22 ROW_ID_COLUMN_NAME, RW_RESERVED_COLUMN_NAME_PREFIX, is_system_schema,
23};
24use risingwave_common::error::code::PostgresErrorCode;
25use risingwave_connector::sink::catalog::SinkCatalog;
26use risingwave_pb::user::grant_privilege::Object as PbGrantObject;
27use thiserror::Error;
28
29use crate::error::{ErrorCode, Result, RwError};
30pub(crate) mod catalog_service;
31pub mod purify;
32
33pub(crate) mod connection_catalog;
34pub(crate) mod database_catalog;
35pub(crate) mod function_catalog;
36pub(crate) mod index_catalog;
37pub(crate) mod root_catalog;
38pub(crate) mod schema_catalog;
39pub(crate) mod source_catalog;
40pub(crate) mod subscription_catalog;
41pub(crate) mod system_catalog;
42pub(crate) mod table_catalog;
43pub(crate) mod view_catalog;
44
45pub(crate) mod secret_catalog;
46
47pub(crate) use catalog_service::CatalogReader;
48pub use index_catalog::IndexCatalog;
49pub use table_catalog::TableCatalog;
50
51use crate::user::UserId;
52
53pub(crate) type ConnectionId = risingwave_common::id::ConnectionId;
54pub(crate) type SourceId = risingwave_common::id::SourceId;
55pub(crate) type SinkId = risingwave_common::id::SinkId;
56pub(crate) type SubscriptionId = risingwave_common::id::SubscriptionId;
57pub(crate) type ViewId = risingwave_common::id::ViewId;
58pub(crate) type DatabaseId = risingwave_common::catalog::DatabaseId;
59pub(crate) type SchemaId = risingwave_common::catalog::SchemaId;
60pub(crate) type TableId = risingwave_common::catalog::TableId;
61pub(crate) type ColumnId = risingwave_common::catalog::ColumnId;
62pub(crate) type FragmentId = risingwave_common::id::FragmentId;
63pub(crate) type SecretId = risingwave_common::catalog::SecretId;
64
65pub fn check_column_name_not_reserved(column_name: &str) -> Result<()> {
67 if column_name.starts_with(ROW_ID_COLUMN_NAME) {
68 return Err(ErrorCode::InternalError(format!(
69 "column name prefixed with {:?} are reserved word.",
70 ROW_ID_COLUMN_NAME
71 ))
72 .into());
73 }
74
75 if column_name.starts_with(RW_RESERVED_COLUMN_NAME_PREFIX) {
76 return Err(ErrorCode::InternalError(format!(
77 "column name prefixed with {:?} are reserved word.",
78 RW_RESERVED_COLUMN_NAME_PREFIX
79 ))
80 .into());
81 }
82
83 if ["tableoid", "xmin", "cmin", "xmax", "cmax", "ctid"].contains(&column_name) {
84 return Err(ErrorCode::InvalidInputSyntax(format!(
85 "column name \"{column_name}\" conflicts with a system column name"
86 ))
87 .into());
88 }
89
90 Ok(())
91}
92
93pub fn check_schema_writable(schema: &str) -> Result<()> {
95 if is_system_schema(schema) {
96 Err(ErrorCode::ProtocolError(format!(
97 "permission denied to write on \"{}\", System catalog modifications are currently disallowed.",
98 schema
99 )).into())
100 } else {
101 Ok(())
102 }
103}
104
105pub type CatalogResult<T> = std::result::Result<T, CatalogError>;
106
107#[derive(Error, Debug, thiserror_ext::Box)]
109#[thiserror_ext(newtype(name = CatalogError, extra_provide = Self::provide_postgres_error_code))]
110pub enum CatalogErrorInner {
111 #[error("{object_type} not found: {name}")]
112 NotFound {
113 object_type: &'static str,
114 name: String,
115 },
116
117 #[error(
118 "{object_type} with name {name} exists{}",
119 if *.under_creation { " but under creation" } else { "" },
120 )]
121 Duplicated {
122 object_type: &'static str,
123 name: String,
124 under_creation: bool, },
126}
127
128impl CatalogError {
129 fn provide_postgres_error_code(&self, request: &mut std::error::Request<'_>) {
131 match self.inner() {
132 CatalogErrorInner::NotFound { .. } => {
133 request.provide_value(PostgresErrorCode::UndefinedObject);
134 }
135 CatalogErrorInner::Duplicated { .. } => {
136 request.provide_value(PostgresErrorCode::DuplicateObject);
137 }
138 };
139 }
140
141 pub fn not_found(object_type: &'static str, name: impl Into<String>) -> Self {
143 CatalogErrorInner::NotFound {
144 object_type,
145 name: name.into(),
146 }
147 .into()
148 }
149
150 pub fn duplicated(object_type: &'static str, name: impl Into<String>) -> Self {
152 Self::duplicated_under_creation(object_type, name, false)
153 }
154
155 pub fn duplicated_under_creation(
157 object_type: &'static str,
158 name: impl Into<String>,
159 under_creation: bool,
160 ) -> Self {
161 CatalogErrorInner::Duplicated {
162 object_type,
163 name: name.into(),
164 under_creation,
165 }
166 .into()
167 }
168
169 pub fn is_duplicated(&self, object_type: &'static str) -> bool {
171 matches!(
172 self.inner(),
173 CatalogErrorInner::Duplicated { object_type: t, .. } if *t == object_type
174 )
175 }
176
177 pub fn is_not_found(&self, object_type: &'static str) -> bool {
179 matches!(
180 self.inner(),
181 CatalogErrorInner::NotFound { object_type: t, .. } if *t == object_type
182 )
183 }
184}
185
186impl From<CatalogError> for RwError {
187 fn from(e: CatalogError) -> Self {
188 ErrorCode::CatalogError(Box::new(e)).into()
189 }
190}
191
192pub trait OwnedByUserCatalog {
197 fn owner(&self) -> UserId;
199}
200
201impl OwnedByUserCatalog for SinkCatalog {
202 fn owner(&self) -> UserId {
203 self.owner.user_id
204 }
205}
206
207pub struct OwnedGrantObject {
208 pub owner: UserId,
209 pub object: PbGrantObject,
210}