risingwave_frontend/catalog/system_catalog/
mod.rs1pub mod information_schema;
16pub mod pg_catalog;
17pub mod rw_catalog;
18
19use std::collections::HashMap;
20use std::sync::{Arc, LazyLock};
21
22use futures::stream::BoxStream;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::acl::AclMode;
26use risingwave_common::array::DataChunk;
27use risingwave_common::catalog::{
28 ColumnCatalog, ColumnDesc, DEFAULT_SUPER_USER_ID, Field, MAX_SYS_CATALOG_NUM,
29 SYS_CATALOG_START_ID, SysCatalogReader, TableId,
30};
31use risingwave_common::error::BoxedError;
32use risingwave_common::id::ObjectId;
33use risingwave_common::metrics_reader::MetricsReader;
34use risingwave_common::session_config::SessionConfig;
35use risingwave_common::system_param::AdaptiveParallelismStrategy;
36use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
37use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
38use risingwave_common::types::DataType;
39use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
40use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
41use risingwave_pb::user::grant_privilege::Object as GrantObject;
42
43use crate::catalog::catalog_service::CatalogReader;
44use crate::catalog::view_catalog::ViewCatalog;
45use crate::meta_client::FrontendMetaClient;
46use crate::session::AuthContext;
47use crate::user::UserId;
48use crate::user::user_catalog::UserCatalog;
49use crate::user::user_privilege::available_prost_privilege;
50use crate::user::user_service::UserInfoReader;
51
52#[derive(Clone, Debug, PartialEq, Hash)]
53pub struct SystemTableCatalog {
54 pub id: TableId,
55
56 pub name: String,
57
58 pub columns: Vec<ColumnCatalog>,
60
61 pub pk: Vec<usize>,
63
64 pub owner: UserId,
66
67 pub description: Option<String>,
69}
70
71impl SystemTableCatalog {
72 pub fn id(&self) -> TableId {
74 self.id
75 }
76
77 pub fn with_id(mut self, id: TableId) -> Self {
78 self.id = id;
79 self
80 }
81
82 pub fn columns(&self) -> &[ColumnCatalog] {
84 &self.columns
85 }
86
87 pub fn name(&self) -> &str {
89 self.name.as_ref()
90 }
91}
92
93pub struct SysCatalogReaderImpl {
94 catalog_reader: CatalogReader,
96 user_info_reader: UserInfoReader,
98 meta_client: Arc<dyn FrontendMetaClient>,
100 auth_context: Arc<AuthContext>,
102 config: Arc<RwLock<SessionConfig>>,
104 system_params: SystemParamsReaderRef,
106 pub(super) metrics_reader: Arc<dyn MetricsReader>,
108}
109
110impl SysCatalogReaderImpl {
111 pub fn new(
112 catalog_reader: CatalogReader,
113 user_info_reader: UserInfoReader,
114 meta_client: Arc<dyn FrontendMetaClient>,
115 auth_context: Arc<AuthContext>,
116 config: Arc<RwLock<SessionConfig>>,
117 system_params: SystemParamsReaderRef,
118 metrics_reader: Arc<dyn MetricsReader>,
119 ) -> Self {
120 Self {
121 catalog_reader,
122 user_info_reader,
123 meta_client,
124 auth_context,
125 config,
126 system_params,
127 metrics_reader,
128 }
129 }
130}
131
132pub struct BuiltinTable {
133 name: &'static str,
134 schema: &'static str,
135 columns: Vec<SystemCatalogColumnsDef<'static>>,
136 pk: &'static [usize],
137 function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
138}
139
140pub struct BuiltinView {
141 name: &'static str,
142 schema: &'static str,
143 columns: Vec<SystemCatalogColumnsDef<'static>>,
144 sql: String,
145}
146
147pub enum BuiltinCatalog {
148 Table(BuiltinTable),
149 View(BuiltinView),
150}
151
152impl BuiltinCatalog {
153 fn full_name(&self) -> String {
154 match self {
155 BuiltinCatalog::Table(t) => format!("{}.{}", t.schema, t.name),
156 BuiltinCatalog::View(t) => format!("{}.{}", t.schema, t.name),
157 }
158 }
159}
160
161impl From<&BuiltinTable> for SystemTableCatalog {
162 fn from(val: &BuiltinTable) -> Self {
163 SystemTableCatalog {
164 id: TableId::placeholder(),
165 name: val.name.to_owned(),
166 columns: val
167 .columns
168 .iter()
169 .enumerate()
170 .map(|(idx, (name, ty))| ColumnCatalog {
171 column_desc: ColumnDesc::named(*name, (idx as i32).into(), ty.clone()),
172 is_hidden: false,
173 })
174 .collect(),
175 pk: val.pk.to_vec(),
176 owner: DEFAULT_SUPER_USER_ID,
177 description: None,
178 }
179 }
180}
181
182impl From<&BuiltinView> for ViewCatalog {
183 fn from(val: &BuiltinView) -> Self {
184 ViewCatalog {
185 id: 0.into(),
186 name: val.name.to_owned(),
187 schema_id: 0.into(),
188 database_id: 0.into(),
189 columns: val
190 .columns
191 .iter()
192 .map(|(name, ty)| Field::with_name(ty.clone(), name.to_string()))
193 .collect(),
194 sql: val.sql.clone(),
195 owner: DEFAULT_SUPER_USER_ID,
196 properties: Default::default(),
197 created_at_epoch: None,
198 created_at_cluster_version: None,
199 }
200 }
201}
202
203pub(super) type SystemCatalogColumnsDef<'a> = (&'a str, DataType);
205
206#[inline(always)]
211pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
212 format!(
213 "SELECT {} WHERE 1 != 1",
214 columns
215 .iter()
216 .map(|(name, ty)| format!("NULL::{} AS {}", ty, name))
217 .join(", ")
218 )
219}
220
221fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
224 match state
225 .parallelism
226 .as_ref()
227 .and_then(|parallelism| parallelism.parallelism.as_ref())
228 {
229 Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => parallelism.to_string(),
230 Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => state
231 .adaptive_parallelism_strategy
232 .as_deref()
233 .and_then(format_adaptive_parallelism_strategy)
234 .unwrap_or_else(|| "adaptive".to_owned()),
235 Some(PbParallelism::Custom(_)) => state
236 .adaptive_parallelism_strategy
237 .as_deref()
238 .and_then(format_adaptive_parallelism_strategy)
239 .unwrap_or_else(|| "custom".to_owned()),
240 None => "unknown".to_owned(),
241 }
242}
243
244fn format_adaptive_parallelism_strategy(strategy: &str) -> Option<String> {
245 parse_strategy(strategy)
246 .ok()
247 .map(|strategy| match strategy {
248 AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
249 "adaptive".to_owned()
250 }
251 AdaptiveParallelismStrategy::Bounded(n) => format!("bounded({n})"),
252 AdaptiveParallelismStrategy::Ratio(r) => format!("ratio({r})"),
253 })
254}
255
256fn get_acl_items(
258 object: impl Into<GrantObject>,
259 for_dml_table: bool,
260 users: &Vec<UserCatalog>,
261 username_map: &HashMap<UserId, String>,
262) -> Vec<String> {
263 let object = object.into();
264 let mut res = vec![];
265 let super_privilege = available_prost_privilege(object, for_dml_table);
266 for user in users {
267 let privileges = if user.is_super {
268 vec![&super_privilege]
269 } else {
270 user.grant_privileges
271 .iter()
272 .filter(|&privilege| privilege.object.as_ref().unwrap() == &object)
273 .collect_vec()
274 };
275 if privileges.is_empty() {
276 continue;
277 };
278 let mut grantor_map = HashMap::new();
279 privileges.iter().for_each(|&privilege| {
280 privilege.action_with_opts.iter().for_each(|ao| {
281 grantor_map
282 .entry(ao.granted_by)
283 .or_insert_with(Vec::new)
284 .push((ao.get_action().unwrap(), ao.with_grant_option));
285 })
286 });
287 for (granted_by, actions) in grantor_map {
288 let mut aclitem = String::new();
289 aclitem.push_str(&user.name);
290 aclitem.push('=');
291 for (action, option) in actions {
292 aclitem.push_str(&AclMode::from(action).to_string());
293 if option {
294 aclitem.push('*');
295 }
296 }
297 aclitem.push('/');
298 aclitem.push_str(username_map.get(&granted_by).unwrap());
300 res.push(aclitem);
301 }
302 }
303 res
304}
305
306pub struct SystemCatalog {
307 catalogs: Vec<BuiltinCatalog>,
309}
310
311pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog>> {
312 SYS_CATALOGS
313 .catalogs
314 .iter()
315 .enumerate()
316 .filter_map(|(idx, c)| match c {
317 BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
318 SystemTableCatalog::from(t)
319 .with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
320 )),
321 _ => None,
322 })
323 .collect()
324}
325
326pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<ViewCatalog> {
327 SYS_CATALOGS
328 .catalogs
329 .iter()
330 .enumerate()
331 .filter_map(|(idx, c)| match c {
332 BuiltinCatalog::View(v) if v.schema == schema_name => Some(
333 ViewCatalog::from(v).with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
334 ),
335 _ => None,
336 })
337 .collect()
338}
339
340pub fn is_system_catalog(oid: ObjectId) -> bool {
341 oid.as_raw_id() >= SYS_CATALOG_START_ID as u32
342}
343
344pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
346 tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
347 assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
348 let catalogs = SYS_CATALOGS_SLICE
349 .iter()
350 .map(|f| f())
351 .sorted_by_key(|c| c.full_name())
352 .collect();
353 SystemCatalog { catalogs }
354});
355
356#[linkme::distributed_slice]
357pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];
358
359impl SysCatalogReader for SysCatalogReaderImpl {
360 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
361 let table_name = SYS_CATALOGS
362 .catalogs
363 .get((table_id.as_raw_id() - SYS_CATALOG_START_ID as u32) as usize)
364 .unwrap();
365 match table_name {
366 BuiltinCatalog::Table(t) => (t.function)(self),
367 BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
368 }
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use risingwave_pb::meta::TableParallelism;
375 use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
376 use risingwave_pb::meta::table_parallelism::{
377 PbAdaptiveParallelism, PbFixedParallelism, PbParallelism,
378 };
379
380 use super::extract_parallelism_from_table_state;
381 use crate::catalog::system_catalog::SYS_CATALOGS;
382 use crate::test_utils::LocalFrontend;
383
384 #[tokio::test]
385 async fn test_builtin_view_definition() {
386 let frontend = LocalFrontend::new(Default::default()).await;
387 let sqls = SYS_CATALOGS.catalogs.iter().filter_map(|c| match c {
388 super::BuiltinCatalog::View(v) => Some(v.sql.clone()),
389 _ => None,
390 });
391 for sql in sqls {
392 frontend.query_formatted_result(sql).await;
393 }
394 }
395
396 #[test]
397 fn test_extract_parallelism_from_table_state_uses_unified_display() {
398 let fixed_state = StreamingJobState {
399 parallelism: Some(TableParallelism {
400 parallelism: Some(PbParallelism::Fixed(PbFixedParallelism { parallelism: 4 })),
401 }),
402 ..Default::default()
403 };
404 assert_eq!(extract_parallelism_from_table_state(&fixed_state), "4");
405
406 let adaptive_state = StreamingJobState {
407 parallelism: Some(TableParallelism {
408 parallelism: Some(PbParallelism::Adaptive(PbAdaptiveParallelism {})),
409 }),
410 adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
411 ..Default::default()
412 };
413 assert_eq!(
414 extract_parallelism_from_table_state(&adaptive_state),
415 "ratio(0.5)"
416 );
417
418 let fallback_state = StreamingJobState {
419 parallelism: Some(TableParallelism {
420 parallelism: Some(PbParallelism::Adaptive(PbAdaptiveParallelism {})),
421 }),
422 ..Default::default()
423 };
424 assert_eq!(
425 extract_parallelism_from_table_state(&fallback_state),
426 "adaptive"
427 );
428 }
429}