risingwave_frontend/catalog/system_catalog/
mod.rs1pub mod information_schema;
16pub mod pg_catalog;
17pub mod rw_catalog;
18
19use std::collections::HashMap;
20use std::sync::{Arc, LazyLock};
21
22use futures::stream::BoxStream;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::acl::AclMode;
26use risingwave_common::array::DataChunk;
27use risingwave_common::catalog::{
28 ColumnCatalog, ColumnDesc, DEFAULT_SUPER_USER_ID, Field, MAX_SYS_CATALOG_NUM,
29 SYS_CATALOG_START_ID, SysCatalogReader, TableId,
30};
31use risingwave_common::error::BoxedError;
32use risingwave_common::id::ObjectId;
33use risingwave_common::metrics_reader::MetricsReader;
34use risingwave_common::session_config::SessionConfig;
35use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
36use risingwave_common::types::DataType;
37use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
38use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
39use risingwave_pb::user::grant_privilege::Object as GrantObject;
40
41use crate::catalog::catalog_service::CatalogReader;
42use crate::catalog::view_catalog::ViewCatalog;
43use crate::meta_client::FrontendMetaClient;
44use crate::session::AuthContext;
45use crate::user::UserId;
46use crate::user::user_catalog::UserCatalog;
47use crate::user::user_privilege::available_prost_privilege;
48use crate::user::user_service::UserInfoReader;
49
50#[derive(Clone, Debug, PartialEq, Hash)]
51pub struct SystemTableCatalog {
52 pub id: TableId,
53
54 pub name: String,
55
56 pub columns: Vec<ColumnCatalog>,
58
59 pub pk: Vec<usize>,
61
62 pub owner: UserId,
64
65 pub description: Option<String>,
67}
68
69impl SystemTableCatalog {
70 pub fn id(&self) -> TableId {
72 self.id
73 }
74
75 pub fn with_id(mut self, id: TableId) -> Self {
76 self.id = id;
77 self
78 }
79
80 pub fn columns(&self) -> &[ColumnCatalog] {
82 &self.columns
83 }
84
85 pub fn name(&self) -> &str {
87 self.name.as_ref()
88 }
89}
90
91pub struct SysCatalogReaderImpl {
92 catalog_reader: CatalogReader,
94 user_info_reader: UserInfoReader,
96 meta_client: Arc<dyn FrontendMetaClient>,
98 auth_context: Arc<AuthContext>,
100 config: Arc<RwLock<SessionConfig>>,
102 system_params: SystemParamsReaderRef,
104 pub(super) metrics_reader: Arc<dyn MetricsReader>,
106}
107
108impl SysCatalogReaderImpl {
109 pub fn new(
110 catalog_reader: CatalogReader,
111 user_info_reader: UserInfoReader,
112 meta_client: Arc<dyn FrontendMetaClient>,
113 auth_context: Arc<AuthContext>,
114 config: Arc<RwLock<SessionConfig>>,
115 system_params: SystemParamsReaderRef,
116 metrics_reader: Arc<dyn MetricsReader>,
117 ) -> Self {
118 Self {
119 catalog_reader,
120 user_info_reader,
121 meta_client,
122 auth_context,
123 config,
124 system_params,
125 metrics_reader,
126 }
127 }
128}
129
130pub struct BuiltinTable {
131 name: &'static str,
132 schema: &'static str,
133 columns: Vec<SystemCatalogColumnsDef<'static>>,
134 pk: &'static [usize],
135 function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
136}
137
138pub struct BuiltinView {
139 name: &'static str,
140 schema: &'static str,
141 columns: Vec<SystemCatalogColumnsDef<'static>>,
142 sql: String,
143}
144
145pub enum BuiltinCatalog {
146 Table(BuiltinTable),
147 View(BuiltinView),
148}
149
150impl BuiltinCatalog {
151 fn full_name(&self) -> String {
152 match self {
153 BuiltinCatalog::Table(t) => format!("{}.{}", t.schema, t.name),
154 BuiltinCatalog::View(t) => format!("{}.{}", t.schema, t.name),
155 }
156 }
157}
158
159impl From<&BuiltinTable> for SystemTableCatalog {
160 fn from(val: &BuiltinTable) -> Self {
161 SystemTableCatalog {
162 id: TableId::placeholder(),
163 name: val.name.to_owned(),
164 columns: val
165 .columns
166 .iter()
167 .enumerate()
168 .map(|(idx, (name, ty))| ColumnCatalog {
169 column_desc: ColumnDesc::named(*name, (idx as i32).into(), ty.clone()),
170 is_hidden: false,
171 })
172 .collect(),
173 pk: val.pk.to_vec(),
174 owner: DEFAULT_SUPER_USER_ID,
175 description: None,
176 }
177 }
178}
179
180impl From<&BuiltinView> for ViewCatalog {
181 fn from(val: &BuiltinView) -> Self {
182 ViewCatalog {
183 id: 0.into(),
184 name: val.name.to_owned(),
185 schema_id: 0.into(),
186 database_id: 0.into(),
187 columns: val
188 .columns
189 .iter()
190 .map(|(name, ty)| Field::with_name(ty.clone(), name.to_string()))
191 .collect(),
192 sql: val.sql.clone(),
193 owner: DEFAULT_SUPER_USER_ID,
194 properties: Default::default(),
195 created_at_epoch: None,
196 created_at_cluster_version: None,
197 }
198 }
199}
200
201pub(super) type SystemCatalogColumnsDef<'a> = (&'a str, DataType);
203
204#[inline(always)]
209pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
210 format!(
211 "SELECT {} WHERE 1 != 1",
212 columns
213 .iter()
214 .map(|(name, ty)| format!("NULL::{} AS {}", ty, name))
215 .join(", ")
216 )
217}
218
219fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
220 match state
221 .parallelism
222 .as_ref()
223 .and_then(|parallelism| parallelism.parallelism.as_ref())
224 {
225 Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => "adaptive".to_owned(),
226 Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => {
227 format!("fixed({parallelism})")
228 }
229 Some(PbParallelism::Custom(_)) => "custom".to_owned(),
230 None => "unknown".to_owned(),
231 }
232}
233
234fn get_acl_items(
236 object: impl Into<GrantObject>,
237 for_dml_table: bool,
238 users: &Vec<UserCatalog>,
239 username_map: &HashMap<UserId, String>,
240) -> Vec<String> {
241 let object = object.into();
242 let mut res = vec![];
243 let super_privilege = available_prost_privilege(object, for_dml_table);
244 for user in users {
245 let privileges = if user.is_super {
246 vec![&super_privilege]
247 } else {
248 user.grant_privileges
249 .iter()
250 .filter(|&privilege| privilege.object.as_ref().unwrap() == &object)
251 .collect_vec()
252 };
253 if privileges.is_empty() {
254 continue;
255 };
256 let mut grantor_map = HashMap::new();
257 privileges.iter().for_each(|&privilege| {
258 privilege.action_with_opts.iter().for_each(|ao| {
259 grantor_map
260 .entry(ao.granted_by)
261 .or_insert_with(Vec::new)
262 .push((ao.get_action().unwrap(), ao.with_grant_option));
263 })
264 });
265 for (granted_by, actions) in grantor_map {
266 let mut aclitem = String::new();
267 aclitem.push_str(&user.name);
268 aclitem.push('=');
269 for (action, option) in actions {
270 aclitem.push_str(&AclMode::from(action).to_string());
271 if option {
272 aclitem.push('*');
273 }
274 }
275 aclitem.push('/');
276 aclitem.push_str(username_map.get(&granted_by).unwrap());
278 res.push(aclitem);
279 }
280 }
281 res
282}
283
284pub struct SystemCatalog {
285 catalogs: Vec<BuiltinCatalog>,
287}
288
289pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog>> {
290 SYS_CATALOGS
291 .catalogs
292 .iter()
293 .enumerate()
294 .filter_map(|(idx, c)| match c {
295 BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
296 SystemTableCatalog::from(t)
297 .with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
298 )),
299 _ => None,
300 })
301 .collect()
302}
303
304pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<ViewCatalog> {
305 SYS_CATALOGS
306 .catalogs
307 .iter()
308 .enumerate()
309 .filter_map(|(idx, c)| match c {
310 BuiltinCatalog::View(v) if v.schema == schema_name => Some(
311 ViewCatalog::from(v).with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
312 ),
313 _ => None,
314 })
315 .collect()
316}
317
318pub fn is_system_catalog(oid: ObjectId) -> bool {
319 oid.as_raw_id() >= SYS_CATALOG_START_ID as u32
320}
321
322pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
324 tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
325 assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
326 let catalogs = SYS_CATALOGS_SLICE
327 .iter()
328 .map(|f| f())
329 .sorted_by_key(|c| c.full_name())
330 .collect();
331 SystemCatalog { catalogs }
332});
333
334#[linkme::distributed_slice]
335pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];
336
337impl SysCatalogReader for SysCatalogReaderImpl {
338 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
339 let table_name = SYS_CATALOGS
340 .catalogs
341 .get((table_id.as_raw_id() - SYS_CATALOG_START_ID as u32) as usize)
342 .unwrap();
343 match table_name {
344 BuiltinCatalog::Table(t) => (t.function)(self),
345 BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
346 }
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use crate::catalog::system_catalog::SYS_CATALOGS;
353 use crate::test_utils::LocalFrontend;
354
355 #[tokio::test]
356 async fn test_builtin_view_definition() {
357 let frontend = LocalFrontend::new(Default::default()).await;
358 let sqls = SYS_CATALOGS.catalogs.iter().filter_map(|c| match c {
359 super::BuiltinCatalog::View(v) => Some(v.sql.clone()),
360 _ => None,
361 });
362 for sql in sqls {
363 frontend.query_formatted_result(sql).await;
364 }
365 }
366}