risingwave_frontend/catalog/system_catalog/
mod.rs1pub mod information_schema;
16pub mod pg_catalog;
17pub mod rw_catalog;
18
19use std::collections::HashMap;
20use std::sync::{Arc, LazyLock};
21
22use futures::stream::BoxStream;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::acl::AclMode;
26use risingwave_common::array::DataChunk;
27use risingwave_common::catalog::{
28 ColumnCatalog, ColumnDesc, DEFAULT_SUPER_USER_ID, Field, MAX_SYS_CATALOG_NUM,
29 SYS_CATALOG_START_ID, SysCatalogReader, TableId,
30};
31use risingwave_common::error::BoxedError;
32use risingwave_common::id::ObjectId;
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
35use risingwave_common::types::DataType;
36use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
37use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
38use risingwave_pb::user::grant_privilege::Object as GrantObject;
39
40use crate::catalog::catalog_service::CatalogReader;
41use crate::catalog::view_catalog::ViewCatalog;
42use crate::meta_client::FrontendMetaClient;
43use crate::session::AuthContext;
44use crate::user::UserId;
45use crate::user::user_catalog::UserCatalog;
46use crate::user::user_privilege::available_prost_privilege;
47use crate::user::user_service::UserInfoReader;
48
49#[derive(Clone, Debug, PartialEq, Hash)]
50pub struct SystemTableCatalog {
51 pub id: TableId,
52
53 pub name: String,
54
55 pub columns: Vec<ColumnCatalog>,
57
58 pub pk: Vec<usize>,
60
61 pub owner: u32,
63
64 pub description: Option<String>,
66}
67
68impl SystemTableCatalog {
69 pub fn id(&self) -> TableId {
71 self.id
72 }
73
74 pub fn with_id(mut self, id: TableId) -> Self {
75 self.id = id;
76 self
77 }
78
79 pub fn columns(&self) -> &[ColumnCatalog] {
81 &self.columns
82 }
83
84 pub fn name(&self) -> &str {
86 self.name.as_ref()
87 }
88}
89
90pub struct SysCatalogReaderImpl {
91 catalog_reader: CatalogReader,
93 user_info_reader: UserInfoReader,
95 meta_client: Arc<dyn FrontendMetaClient>,
97 auth_context: Arc<AuthContext>,
99 config: Arc<RwLock<SessionConfig>>,
101 system_params: SystemParamsReaderRef,
103}
104
105impl SysCatalogReaderImpl {
106 pub fn new(
107 catalog_reader: CatalogReader,
108 user_info_reader: UserInfoReader,
109 meta_client: Arc<dyn FrontendMetaClient>,
110 auth_context: Arc<AuthContext>,
111 config: Arc<RwLock<SessionConfig>>,
112 system_params: SystemParamsReaderRef,
113 ) -> Self {
114 Self {
115 catalog_reader,
116 user_info_reader,
117 meta_client,
118 auth_context,
119 config,
120 system_params,
121 }
122 }
123}
124
125pub struct BuiltinTable {
126 name: &'static str,
127 schema: &'static str,
128 columns: Vec<SystemCatalogColumnsDef<'static>>,
129 pk: &'static [usize],
130 function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
131}
132
133pub struct BuiltinView {
134 name: &'static str,
135 schema: &'static str,
136 columns: Vec<SystemCatalogColumnsDef<'static>>,
137 sql: String,
138}
139
140pub enum BuiltinCatalog {
141 Table(BuiltinTable),
142 View(BuiltinView),
143}
144
145impl BuiltinCatalog {
146 fn full_name(&self) -> String {
147 match self {
148 BuiltinCatalog::Table(t) => format!("{}.{}", t.schema, t.name),
149 BuiltinCatalog::View(t) => format!("{}.{}", t.schema, t.name),
150 }
151 }
152}
153
154impl From<&BuiltinTable> for SystemTableCatalog {
155 fn from(val: &BuiltinTable) -> Self {
156 SystemTableCatalog {
157 id: TableId::placeholder(),
158 name: val.name.to_owned(),
159 columns: val
160 .columns
161 .iter()
162 .enumerate()
163 .map(|(idx, (name, ty))| ColumnCatalog {
164 column_desc: ColumnDesc::named(*name, (idx as i32).into(), ty.clone()),
165 is_hidden: false,
166 })
167 .collect(),
168 pk: val.pk.to_vec(),
169 owner: DEFAULT_SUPER_USER_ID,
170 description: None,
171 }
172 }
173}
174
175impl From<&BuiltinView> for ViewCatalog {
176 fn from(val: &BuiltinView) -> Self {
177 ViewCatalog {
178 id: 0.into(),
179 name: val.name.to_owned(),
180 schema_id: 0.into(),
181 database_id: 0.into(),
182 columns: val
183 .columns
184 .iter()
185 .map(|(name, ty)| Field::with_name(ty.clone(), name.to_string()))
186 .collect(),
187 sql: val.sql.clone(),
188 owner: DEFAULT_SUPER_USER_ID,
189 properties: Default::default(),
190 created_at_epoch: None,
191 created_at_cluster_version: None,
192 }
193 }
194}
195
196pub(super) type SystemCatalogColumnsDef<'a> = (&'a str, DataType);
198
199#[inline(always)]
204pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
205 format!(
206 "SELECT {} WHERE 1 != 1",
207 columns
208 .iter()
209 .map(|(name, ty)| format!("NULL::{} AS {}", ty, name))
210 .join(", ")
211 )
212}
213
214fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
215 match state
216 .parallelism
217 .as_ref()
218 .and_then(|parallelism| parallelism.parallelism.as_ref())
219 {
220 Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => "adaptive".to_owned(),
221 Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => {
222 format!("fixed({parallelism})")
223 }
224 Some(PbParallelism::Custom(_)) => "custom".to_owned(),
225 None => "unknown".to_owned(),
226 }
227}
228
229fn get_acl_items(
231 object: impl Into<GrantObject>,
232 for_dml_table: bool,
233 users: &Vec<UserCatalog>,
234 username_map: &HashMap<UserId, String>,
235) -> Vec<String> {
236 let object = object.into();
237 let mut res = vec![];
238 let super_privilege = available_prost_privilege(object, for_dml_table);
239 for user in users {
240 let privileges = if user.is_super {
241 vec![&super_privilege]
242 } else {
243 user.grant_privileges
244 .iter()
245 .filter(|&privilege| privilege.object.as_ref().unwrap() == &object)
246 .collect_vec()
247 };
248 if privileges.is_empty() {
249 continue;
250 };
251 let mut grantor_map = HashMap::new();
252 privileges.iter().for_each(|&privilege| {
253 privilege.action_with_opts.iter().for_each(|ao| {
254 grantor_map
255 .entry(ao.granted_by)
256 .or_insert_with(Vec::new)
257 .push((ao.get_action().unwrap(), ao.with_grant_option));
258 })
259 });
260 for (granted_by, actions) in grantor_map {
261 let mut aclitem = String::new();
262 aclitem.push_str(&user.name);
263 aclitem.push('=');
264 for (action, option) in actions {
265 aclitem.push_str(&AclMode::from(action).to_string());
266 if option {
267 aclitem.push('*');
268 }
269 }
270 aclitem.push('/');
271 aclitem.push_str(username_map.get(&granted_by).unwrap());
273 res.push(aclitem);
274 }
275 }
276 res
277}
278
279pub struct SystemCatalog {
280 catalogs: Vec<BuiltinCatalog>,
282}
283
284pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog>> {
285 SYS_CATALOGS
286 .catalogs
287 .iter()
288 .enumerate()
289 .filter_map(|(idx, c)| match c {
290 BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
291 SystemTableCatalog::from(t)
292 .with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
293 )),
294 _ => None,
295 })
296 .collect()
297}
298
299pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<ViewCatalog> {
300 SYS_CATALOGS
301 .catalogs
302 .iter()
303 .enumerate()
304 .filter_map(|(idx, c)| match c {
305 BuiltinCatalog::View(v) if v.schema == schema_name => Some(
306 ViewCatalog::from(v).with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
307 ),
308 _ => None,
309 })
310 .collect()
311}
312
313pub fn is_system_catalog(oid: ObjectId) -> bool {
314 oid.as_raw_id() >= SYS_CATALOG_START_ID as u32
315}
316
317pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
319 tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
320 assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
321 let catalogs = SYS_CATALOGS_SLICE
322 .iter()
323 .map(|f| f())
324 .sorted_by_key(|c| c.full_name())
325 .collect();
326 SystemCatalog { catalogs }
327});
328
329#[linkme::distributed_slice]
330pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];
331
332impl SysCatalogReader for SysCatalogReaderImpl {
333 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
334 let table_name = SYS_CATALOGS
335 .catalogs
336 .get((table_id.as_raw_id() - SYS_CATALOG_START_ID as u32) as usize)
337 .unwrap();
338 match table_name {
339 BuiltinCatalog::Table(t) => (t.function)(self),
340 BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
341 }
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use crate::catalog::system_catalog::SYS_CATALOGS;
348 use crate::test_utils::LocalFrontend;
349
350 #[tokio::test]
351 async fn test_builtin_view_definition() {
352 let frontend = LocalFrontend::new(Default::default()).await;
353 let sqls = SYS_CATALOGS.catalogs.iter().filter_map(|c| match c {
354 super::BuiltinCatalog::View(v) => Some(v.sql.clone()),
355 _ => None,
356 });
357 for sql in sqls {
358 frontend.query_formatted_result(sql).await;
359 }
360 }
361}