risingwave_frontend/catalog/system_catalog/
mod.rs1pub mod information_schema;
16pub mod pg_catalog;
17pub mod rw_catalog;
18
19use std::collections::HashMap;
20use std::sync::{Arc, LazyLock};
21
22use futures::stream::BoxStream;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::acl::AclMode;
26use risingwave_common::array::DataChunk;
27use risingwave_common::catalog::{
28 ColumnCatalog, ColumnDesc, DEFAULT_SUPER_USER_ID, Field, MAX_SYS_CATALOG_NUM,
29 SYS_CATALOG_START_ID, SysCatalogReader, TableId,
30};
31use risingwave_common::error::BoxedError;
32use risingwave_common::session_config::SessionConfig;
33use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
34use risingwave_common::types::DataType;
35use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
36use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
37use risingwave_pb::user::grant_privilege::Object as GrantObject;
38
39use crate::catalog::catalog_service::CatalogReader;
40use crate::catalog::view_catalog::ViewCatalog;
41use crate::meta_client::FrontendMetaClient;
42use crate::session::AuthContext;
43use crate::user::UserId;
44use crate::user::user_catalog::UserCatalog;
45use crate::user::user_privilege::available_prost_privilege;
46use crate::user::user_service::UserInfoReader;
47
48#[derive(Clone, Debug, PartialEq, Hash)]
49pub struct SystemTableCatalog {
50 pub id: TableId,
51
52 pub name: String,
53
54 pub columns: Vec<ColumnCatalog>,
56
57 pub pk: Vec<usize>,
59
60 pub owner: u32,
62
63 pub description: Option<String>,
65}
66
67impl SystemTableCatalog {
68 pub fn id(&self) -> TableId {
70 self.id
71 }
72
73 pub fn with_id(mut self, id: TableId) -> Self {
74 self.id = id;
75 self
76 }
77
78 pub fn columns(&self) -> &[ColumnCatalog] {
80 &self.columns
81 }
82
83 pub fn name(&self) -> &str {
85 self.name.as_ref()
86 }
87}
88
89pub struct SysCatalogReaderImpl {
90 catalog_reader: CatalogReader,
92 user_info_reader: UserInfoReader,
94 meta_client: Arc<dyn FrontendMetaClient>,
96 auth_context: Arc<AuthContext>,
98 config: Arc<RwLock<SessionConfig>>,
100 system_params: SystemParamsReaderRef,
102}
103
104impl SysCatalogReaderImpl {
105 pub fn new(
106 catalog_reader: CatalogReader,
107 user_info_reader: UserInfoReader,
108 meta_client: Arc<dyn FrontendMetaClient>,
109 auth_context: Arc<AuthContext>,
110 config: Arc<RwLock<SessionConfig>>,
111 system_params: SystemParamsReaderRef,
112 ) -> Self {
113 Self {
114 catalog_reader,
115 user_info_reader,
116 meta_client,
117 auth_context,
118 config,
119 system_params,
120 }
121 }
122}
123
124pub struct BuiltinTable {
125 name: &'static str,
126 schema: &'static str,
127 columns: Vec<SystemCatalogColumnsDef<'static>>,
128 pk: &'static [usize],
129 function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
130}
131
132pub struct BuiltinView {
133 name: &'static str,
134 schema: &'static str,
135 columns: Vec<SystemCatalogColumnsDef<'static>>,
136 sql: String,
137}
138
139pub enum BuiltinCatalog {
140 Table(BuiltinTable),
141 View(BuiltinView),
142}
143
144impl BuiltinCatalog {
145 fn full_name(&self) -> String {
146 match self {
147 BuiltinCatalog::Table(t) => format!("{}.{}", t.schema, t.name),
148 BuiltinCatalog::View(t) => format!("{}.{}", t.schema, t.name),
149 }
150 }
151}
152
153impl From<&BuiltinTable> for SystemTableCatalog {
154 fn from(val: &BuiltinTable) -> Self {
155 SystemTableCatalog {
156 id: TableId::placeholder(),
157 name: val.name.to_owned(),
158 columns: val
159 .columns
160 .iter()
161 .enumerate()
162 .map(|(idx, (name, ty))| ColumnCatalog {
163 column_desc: ColumnDesc::named(*name, (idx as i32).into(), ty.clone()),
164 is_hidden: false,
165 })
166 .collect(),
167 pk: val.pk.to_vec(),
168 owner: DEFAULT_SUPER_USER_ID,
169 description: None,
170 }
171 }
172}
173
174impl From<&BuiltinView> for ViewCatalog {
175 fn from(val: &BuiltinView) -> Self {
176 ViewCatalog {
177 id: 0,
178 name: val.name.to_owned(),
179 schema_id: 0,
180 database_id: 0,
181 columns: val
182 .columns
183 .iter()
184 .map(|(name, ty)| Field::with_name(ty.clone(), name.to_string()))
185 .collect(),
186 sql: val.sql.clone(),
187 owner: DEFAULT_SUPER_USER_ID,
188 properties: Default::default(),
189 }
190 }
191}
192
193pub(super) type SystemCatalogColumnsDef<'a> = (&'a str, DataType);
195
196#[inline(always)]
201pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
202 format!(
203 "SELECT {} WHERE 1 != 1",
204 columns
205 .iter()
206 .map(|(name, ty)| format!("NULL::{} AS {}", ty, name))
207 .join(", ")
208 )
209}
210
211fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
212 match state
213 .parallelism
214 .as_ref()
215 .and_then(|parallelism| parallelism.parallelism.as_ref())
216 {
217 Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => "adaptive".to_owned(),
218 Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => {
219 format!("fixed({parallelism})")
220 }
221 Some(PbParallelism::Custom(_)) => "custom".to_owned(),
222 None => "unknown".to_owned(),
223 }
224}
225
226fn get_acl_items(
228 object: &GrantObject,
229 for_dml_table: bool,
230 users: &Vec<UserCatalog>,
231 username_map: &HashMap<UserId, String>,
232) -> Vec<String> {
233 let mut res = vec![];
234 let super_privilege = available_prost_privilege(*object, for_dml_table);
235 for user in users {
236 let privileges = if user.is_super {
237 vec![&super_privilege]
238 } else {
239 user.grant_privileges
240 .iter()
241 .filter(|&privilege| privilege.object.as_ref().unwrap() == object)
242 .collect_vec()
243 };
244 if privileges.is_empty() {
245 continue;
246 };
247 let mut grantor_map = HashMap::new();
248 privileges.iter().for_each(|&privilege| {
249 privilege.action_with_opts.iter().for_each(|ao| {
250 grantor_map
251 .entry(ao.granted_by)
252 .or_insert_with(Vec::new)
253 .push((ao.get_action().unwrap(), ao.with_grant_option));
254 })
255 });
256 for (granted_by, actions) in grantor_map {
257 let mut aclitem = String::new();
258 aclitem.push_str(&user.name);
259 aclitem.push('=');
260 for (action, option) in actions {
261 aclitem.push_str(&AclMode::from(action).to_string());
262 if option {
263 aclitem.push('*');
264 }
265 }
266 aclitem.push('/');
267 aclitem.push_str(username_map.get(&granted_by).unwrap());
269 res.push(aclitem);
270 }
271 }
272 res
273}
274
275pub struct SystemCatalog {
276 catalogs: Vec<BuiltinCatalog>,
278}
279
280pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog>> {
281 SYS_CATALOGS
282 .catalogs
283 .iter()
284 .enumerate()
285 .filter_map(|(idx, c)| match c {
286 BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
287 SystemTableCatalog::from(t)
288 .with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
289 )),
290 _ => None,
291 })
292 .collect()
293}
294
295pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<ViewCatalog> {
296 SYS_CATALOGS
297 .catalogs
298 .iter()
299 .enumerate()
300 .filter_map(|(idx, c)| match c {
301 BuiltinCatalog::View(v) if v.schema == schema_name => {
302 Some(ViewCatalog::from(v).with_id(idx as u32 + SYS_CATALOG_START_ID as u32))
303 }
304 _ => None,
305 })
306 .collect()
307}
308
309pub fn is_system_catalog(oid: u32) -> bool {
310 oid >= SYS_CATALOG_START_ID as u32
311}
312
313pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
315 tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
316 assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
317 let catalogs = SYS_CATALOGS_SLICE
318 .iter()
319 .map(|f| f())
320 .sorted_by_key(|c| c.full_name())
321 .collect();
322 SystemCatalog { catalogs }
323});
324
325#[linkme::distributed_slice]
326pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];
327
328impl SysCatalogReader for SysCatalogReaderImpl {
329 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
330 let table_name = SYS_CATALOGS
331 .catalogs
332 .get((table_id.table_id - SYS_CATALOG_START_ID as u32) as usize)
333 .unwrap();
334 match table_name {
335 BuiltinCatalog::Table(t) => (t.function)(self),
336 BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
337 }
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use crate::catalog::system_catalog::SYS_CATALOGS;
344 use crate::test_utils::LocalFrontend;
345
346 #[tokio::test]
347 async fn test_builtin_view_definition() {
348 let frontend = LocalFrontend::new(Default::default()).await;
349 let sqls = SYS_CATALOGS.catalogs.iter().filter_map(|c| match c {
350 super::BuiltinCatalog::View(v) => Some(v.sql.clone()),
351 _ => None,
352 });
353 for sql in sqls {
354 frontend.query_formatted_result(sql).await;
355 }
356 }
357}