risingwave_frontend/catalog/system_catalog/
mod.rs1pub mod information_schema;
16pub mod pg_catalog;
17pub mod rw_catalog;
18
19use std::collections::HashMap;
20use std::sync::{Arc, LazyLock};
21
22use futures::stream::BoxStream;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::acl::AclMode;
26use risingwave_common::array::DataChunk;
27use risingwave_common::catalog::{
28 ColumnCatalog, ColumnDesc, DEFAULT_SUPER_USER_ID, Field, MAX_SYS_CATALOG_NUM,
29 SYS_CATALOG_START_ID, SysCatalogReader, TableDesc, TableId,
30};
31use risingwave_common::error::BoxedError;
32use risingwave_common::session_config::SessionConfig;
33use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
34use risingwave_common::types::DataType;
35use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
36use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
37use risingwave_pb::user::grant_privilege::Object as GrantObject;
38
39use crate::catalog::catalog_service::CatalogReader;
40use crate::catalog::view_catalog::ViewCatalog;
41use crate::meta_client::FrontendMetaClient;
42use crate::session::AuthContext;
43use crate::user::UserId;
44use crate::user::user_catalog::UserCatalog;
45use crate::user::user_privilege::available_prost_privilege;
46use crate::user::user_service::UserInfoReader;
47
48#[derive(Clone, Debug, PartialEq)]
49pub struct SystemTableCatalog {
50 pub id: TableId,
51
52 pub name: String,
53
54 pub columns: Vec<ColumnCatalog>,
56
57 pub pk: Vec<usize>,
59
60 pub owner: u32,
62
63 pub description: Option<String>,
65}
66
67impl SystemTableCatalog {
68 pub fn id(&self) -> TableId {
70 self.id
71 }
72
73 pub fn with_id(mut self, id: TableId) -> Self {
74 self.id = id;
75 self
76 }
77
78 pub fn columns(&self) -> &[ColumnCatalog] {
80 &self.columns
81 }
82
83 pub fn table_desc(&self) -> TableDesc {
85 TableDesc {
86 table_id: self.id,
87 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
88 stream_key: self.pk.clone(),
89 ..Default::default()
90 }
91 }
92
93 pub fn name(&self) -> &str {
95 self.name.as_ref()
96 }
97}
98
99pub struct SysCatalogReaderImpl {
100 catalog_reader: CatalogReader,
102 user_info_reader: UserInfoReader,
104 meta_client: Arc<dyn FrontendMetaClient>,
106 auth_context: Arc<AuthContext>,
108 config: Arc<RwLock<SessionConfig>>,
110 system_params: SystemParamsReaderRef,
112}
113
114impl SysCatalogReaderImpl {
115 pub fn new(
116 catalog_reader: CatalogReader,
117 user_info_reader: UserInfoReader,
118 meta_client: Arc<dyn FrontendMetaClient>,
119 auth_context: Arc<AuthContext>,
120 config: Arc<RwLock<SessionConfig>>,
121 system_params: SystemParamsReaderRef,
122 ) -> Self {
123 Self {
124 catalog_reader,
125 user_info_reader,
126 meta_client,
127 auth_context,
128 config,
129 system_params,
130 }
131 }
132}
133
134pub struct BuiltinTable {
135 name: &'static str,
136 schema: &'static str,
137 columns: Vec<SystemCatalogColumnsDef<'static>>,
138 pk: &'static [usize],
139 function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
140}
141
142pub struct BuiltinView {
143 name: &'static str,
144 schema: &'static str,
145 columns: Vec<SystemCatalogColumnsDef<'static>>,
146 sql: String,
147}
148
149pub enum BuiltinCatalog {
150 Table(BuiltinTable),
151 View(BuiltinView),
152}
153
154impl BuiltinCatalog {
155 fn full_name(&self) -> String {
156 match self {
157 BuiltinCatalog::Table(t) => format!("{}.{}", t.schema, t.name),
158 BuiltinCatalog::View(t) => format!("{}.{}", t.schema, t.name),
159 }
160 }
161}
162
163impl From<&BuiltinTable> for SystemTableCatalog {
164 fn from(val: &BuiltinTable) -> Self {
165 SystemTableCatalog {
166 id: TableId::placeholder(),
167 name: val.name.to_owned(),
168 columns: val
169 .columns
170 .iter()
171 .enumerate()
172 .map(|(idx, (name, ty))| ColumnCatalog {
173 column_desc: ColumnDesc::named(*name, (idx as i32).into(), ty.clone()),
174 is_hidden: false,
175 })
176 .collect(),
177 pk: val.pk.to_vec(),
178 owner: DEFAULT_SUPER_USER_ID,
179 description: None,
180 }
181 }
182}
183
184impl From<&BuiltinView> for ViewCatalog {
185 fn from(val: &BuiltinView) -> Self {
186 ViewCatalog {
187 id: 0,
188 name: val.name.to_owned(),
189 schema_id: 0,
190 database_id: 0,
191 columns: val
192 .columns
193 .iter()
194 .map(|(name, ty)| Field::with_name(ty.clone(), name.to_string()))
195 .collect(),
196 sql: val.sql.clone(),
197 owner: DEFAULT_SUPER_USER_ID,
198 properties: Default::default(),
199 }
200 }
201}
202
203pub(super) type SystemCatalogColumnsDef<'a> = (&'a str, DataType);
205
206#[inline(always)]
211pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
212 format!(
213 "SELECT {} WHERE 1 != 1",
214 columns
215 .iter()
216 .map(|(name, ty)| format!("NULL::{} AS {}", ty, name))
217 .join(", ")
218 )
219}
220
221fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
222 match state
223 .parallelism
224 .as_ref()
225 .and_then(|parallelism| parallelism.parallelism.as_ref())
226 {
227 Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => "adaptive".to_owned(),
228 Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => {
229 format!("fixed({parallelism})")
230 }
231 Some(PbParallelism::Custom(_)) => "custom".to_owned(),
232 None => "unknown".to_owned(),
233 }
234}
235
236fn get_acl_items(
238 object: &GrantObject,
239 for_dml_table: bool,
240 users: &Vec<UserCatalog>,
241 username_map: &HashMap<UserId, String>,
242) -> Vec<String> {
243 let mut res = vec![];
244 let super_privilege = available_prost_privilege(*object, for_dml_table);
245 for user in users {
246 let privileges = if user.is_super {
247 vec![&super_privilege]
248 } else {
249 user.grant_privileges
250 .iter()
251 .filter(|&privilege| privilege.object.as_ref().unwrap() == object)
252 .collect_vec()
253 };
254 if privileges.is_empty() {
255 continue;
256 };
257 let mut grantor_map = HashMap::new();
258 privileges.iter().for_each(|&privilege| {
259 privilege.action_with_opts.iter().for_each(|ao| {
260 grantor_map
261 .entry(ao.granted_by)
262 .or_insert_with(Vec::new)
263 .push((ao.get_action().unwrap(), ao.with_grant_option));
264 })
265 });
266 for (granted_by, actions) in grantor_map {
267 let mut aclitem = String::new();
268 aclitem.push_str(&user.name);
269 aclitem.push('=');
270 for (action, option) in actions {
271 aclitem.push_str(&AclMode::from(action).to_string());
272 if option {
273 aclitem.push('*');
274 }
275 }
276 aclitem.push('/');
277 aclitem.push_str(username_map.get(&granted_by).unwrap());
279 res.push(aclitem);
280 }
281 }
282 res
283}
284
285pub struct SystemCatalog {
286 catalogs: Vec<BuiltinCatalog>,
288}
289
290pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog>> {
291 SYS_CATALOGS
292 .catalogs
293 .iter()
294 .enumerate()
295 .filter_map(|(idx, c)| match c {
296 BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
297 SystemTableCatalog::from(t)
298 .with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
299 )),
300 _ => None,
301 })
302 .collect()
303}
304
305pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<ViewCatalog> {
306 SYS_CATALOGS
307 .catalogs
308 .iter()
309 .enumerate()
310 .filter_map(|(idx, c)| match c {
311 BuiltinCatalog::View(v) if v.schema == schema_name => {
312 Some(ViewCatalog::from(v).with_id(idx as u32 + SYS_CATALOG_START_ID as u32))
313 }
314 _ => None,
315 })
316 .collect()
317}
318
319pub fn is_system_catalog(oid: u32) -> bool {
320 oid >= SYS_CATALOG_START_ID as u32
321}
322
323pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
325 tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
326 assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
327 let catalogs = SYS_CATALOGS_SLICE
328 .iter()
329 .map(|f| f())
330 .sorted_by_key(|c| c.full_name())
331 .collect();
332 SystemCatalog { catalogs }
333});
334
335#[linkme::distributed_slice]
336pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];
337
338impl SysCatalogReader for SysCatalogReaderImpl {
339 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
340 let table_name = SYS_CATALOGS
341 .catalogs
342 .get((table_id.table_id - SYS_CATALOG_START_ID as u32) as usize)
343 .unwrap();
344 match table_name {
345 BuiltinCatalog::Table(t) => (t.function)(self),
346 BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
347 }
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use crate::catalog::system_catalog::SYS_CATALOGS;
354 use crate::test_utils::LocalFrontend;
355
356 #[tokio::test]
357 async fn test_builtin_view_definition() {
358 let frontend = LocalFrontend::new(Default::default()).await;
359 let sqls = SYS_CATALOGS.catalogs.iter().filter_map(|c| match c {
360 super::BuiltinCatalog::View(v) => Some(v.sql.clone()),
361 _ => None,
362 });
363 for sql in sqls {
364 frontend.query_formatted_result(sql).await;
365 }
366 }
367}