risingwave_frontend/catalog/system_catalog/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod information_schema;
16pub mod pg_catalog;
17pub mod rw_catalog;
18
19use std::collections::HashMap;
20use std::sync::{Arc, LazyLock};
21
22use futures::stream::BoxStream;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::acl::AclMode;
26use risingwave_common::array::DataChunk;
27use risingwave_common::catalog::{
28    ColumnCatalog, ColumnDesc, DEFAULT_SUPER_USER_ID, Field, MAX_SYS_CATALOG_NUM,
29    SYS_CATALOG_START_ID, SysCatalogReader, TableId,
30};
31use risingwave_common::error::BoxedError;
32use risingwave_common::id::ObjectId;
33use risingwave_common::metrics_reader::MetricsReader;
34use risingwave_common::session_config::SessionConfig;
35use risingwave_common::system_param::AdaptiveParallelismStrategy;
36use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
37use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
38use risingwave_common::types::DataType;
39use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
40use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
41use risingwave_pb::user::grant_privilege::Object as GrantObject;
42
43use crate::catalog::catalog_service::CatalogReader;
44use crate::catalog::view_catalog::ViewCatalog;
45use crate::meta_client::FrontendMetaClient;
46use crate::session::AuthContext;
47use crate::user::UserId;
48use crate::user::user_catalog::UserCatalog;
49use crate::user::user_privilege::available_prost_privilege;
50use crate::user::user_service::UserInfoReader;
51
52#[derive(Clone, Debug, PartialEq, Hash)]
53pub struct SystemTableCatalog {
54    pub id: TableId,
55
56    pub name: String,
57
58    // All columns in this table.
59    pub columns: Vec<ColumnCatalog>,
60
61    /// Primary key columns indices.
62    pub pk: Vec<usize>,
63
64    // owner of table, should always be default super user, keep it for compatibility.
65    pub owner: UserId,
66
67    /// description of table, set by `comment on`.
68    pub description: Option<String>,
69}
70
71impl SystemTableCatalog {
72    /// Get a reference to the system catalog's table id.
73    pub fn id(&self) -> TableId {
74        self.id
75    }
76
77    pub fn with_id(mut self, id: TableId) -> Self {
78        self.id = id;
79        self
80    }
81
82    /// Get a reference to the system catalog's columns.
83    pub fn columns(&self) -> &[ColumnCatalog] {
84        &self.columns
85    }
86
87    /// Get a reference to the system catalog's name.
88    pub fn name(&self) -> &str {
89        self.name.as_ref()
90    }
91}
92
93pub struct SysCatalogReaderImpl {
94    // Read catalog info: database/schema/source/table.
95    catalog_reader: CatalogReader,
96    // Read user info.
97    user_info_reader: UserInfoReader,
98    // Read from meta.
99    meta_client: Arc<dyn FrontendMetaClient>,
100    // Read auth context.
101    auth_context: Arc<AuthContext>,
102    // Read config.
103    config: Arc<RwLock<SessionConfig>>,
104    // Read system params.
105    system_params: SystemParamsReaderRef,
106    // Read metrics.
107    pub(super) metrics_reader: Arc<dyn MetricsReader>,
108}
109
110impl SysCatalogReaderImpl {
111    pub fn new(
112        catalog_reader: CatalogReader,
113        user_info_reader: UserInfoReader,
114        meta_client: Arc<dyn FrontendMetaClient>,
115        auth_context: Arc<AuthContext>,
116        config: Arc<RwLock<SessionConfig>>,
117        system_params: SystemParamsReaderRef,
118        metrics_reader: Arc<dyn MetricsReader>,
119    ) -> Self {
120        Self {
121            catalog_reader,
122            user_info_reader,
123            meta_client,
124            auth_context,
125            config,
126            system_params,
127            metrics_reader,
128        }
129    }
130}
131
132pub struct BuiltinTable {
133    name: &'static str,
134    schema: &'static str,
135    columns: Vec<SystemCatalogColumnsDef<'static>>,
136    pk: &'static [usize],
137    function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
138}
139
140pub struct BuiltinView {
141    name: &'static str,
142    schema: &'static str,
143    columns: Vec<SystemCatalogColumnsDef<'static>>,
144    sql: String,
145}
146
147pub enum BuiltinCatalog {
148    Table(BuiltinTable),
149    View(BuiltinView),
150}
151
152impl BuiltinCatalog {
153    fn full_name(&self) -> String {
154        match self {
155            BuiltinCatalog::Table(t) => format!("{}.{}", t.schema, t.name),
156            BuiltinCatalog::View(t) => format!("{}.{}", t.schema, t.name),
157        }
158    }
159}
160
161impl From<&BuiltinTable> for SystemTableCatalog {
162    fn from(val: &BuiltinTable) -> Self {
163        SystemTableCatalog {
164            id: TableId::placeholder(),
165            name: val.name.to_owned(),
166            columns: val
167                .columns
168                .iter()
169                .enumerate()
170                .map(|(idx, (name, ty))| ColumnCatalog {
171                    column_desc: ColumnDesc::named(*name, (idx as i32).into(), ty.clone()),
172                    is_hidden: false,
173                })
174                .collect(),
175            pk: val.pk.to_vec(),
176            owner: DEFAULT_SUPER_USER_ID,
177            description: None,
178        }
179    }
180}
181
182impl From<&BuiltinView> for ViewCatalog {
183    fn from(val: &BuiltinView) -> Self {
184        ViewCatalog {
185            id: 0.into(),
186            name: val.name.to_owned(),
187            schema_id: 0.into(),
188            database_id: 0.into(),
189            columns: val
190                .columns
191                .iter()
192                .map(|(name, ty)| Field::with_name(ty.clone(), name.to_string()))
193                .collect(),
194            sql: val.sql.clone(),
195            owner: DEFAULT_SUPER_USER_ID,
196            properties: Default::default(),
197            created_at_epoch: None,
198            created_at_cluster_version: None,
199        }
200    }
201}
202
203// TODO: support struct column and type name when necessary.
204pub(super) type SystemCatalogColumnsDef<'a> = (&'a str, DataType);
205
206/// `infer_dummy_view_sql` returns a dummy SQL statement for a view with the given columns that
207/// returns no rows. For example, with columns `a` and `b`, it returns `SELECT NULL::integer AS a,
208/// NULL::varchar AS b WHERE 1 != 1`.
209// FIXME(noel): Tracked by <https://github.com/risingwavelabs/risingwave/issues/3431#issuecomment-1164160988>
210#[inline(always)]
211pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
212    format!(
213        "SELECT {} WHERE 1 != 1",
214        columns
215            .iter()
216            .map(|(name, ty)| format!("NULL::{} AS {}", ty, name))
217            .join(", ")
218    )
219}
220
221// Shared by `rw_streaming_jobs` and `rw_table_fragments` so both catalogs present the same
222// user-visible parallelism format.
223fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
224    match state
225        .parallelism
226        .as_ref()
227        .and_then(|parallelism| parallelism.parallelism.as_ref())
228    {
229        Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => parallelism.to_string(),
230        Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => state
231            .adaptive_parallelism_strategy
232            .as_deref()
233            .and_then(format_adaptive_parallelism_strategy)
234            .unwrap_or_else(|| "adaptive".to_owned()),
235        Some(PbParallelism::Custom(_)) => state
236            .adaptive_parallelism_strategy
237            .as_deref()
238            .and_then(format_adaptive_parallelism_strategy)
239            .unwrap_or_else(|| "custom".to_owned()),
240        None => "unknown".to_owned(),
241    }
242}
243
244fn format_adaptive_parallelism_strategy(strategy: &str) -> Option<String> {
245    parse_strategy(strategy)
246        .ok()
247        .map(|strategy| match strategy {
248            AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
249                "adaptive".to_owned()
250            }
251            AdaptiveParallelismStrategy::Bounded(n) => format!("bounded({n})"),
252            AdaptiveParallelismStrategy::Ratio(r) => format!("ratio({r})"),
253        })
254}
255
256/// get acl items of `object` in string, ignore public.
257fn get_acl_items(
258    object: impl Into<GrantObject>,
259    for_dml_table: bool,
260    users: &Vec<UserCatalog>,
261    username_map: &HashMap<UserId, String>,
262) -> Vec<String> {
263    let object = object.into();
264    let mut res = vec![];
265    let super_privilege = available_prost_privilege(object, for_dml_table);
266    for user in users {
267        let privileges = if user.is_super {
268            vec![&super_privilege]
269        } else {
270            user.grant_privileges
271                .iter()
272                .filter(|&privilege| privilege.object.as_ref().unwrap() == &object)
273                .collect_vec()
274        };
275        if privileges.is_empty() {
276            continue;
277        };
278        let mut grantor_map = HashMap::new();
279        privileges.iter().for_each(|&privilege| {
280            privilege.action_with_opts.iter().for_each(|ao| {
281                grantor_map
282                    .entry(ao.granted_by)
283                    .or_insert_with(Vec::new)
284                    .push((ao.get_action().unwrap(), ao.with_grant_option));
285            })
286        });
287        for (granted_by, actions) in grantor_map {
288            let mut aclitem = String::new();
289            aclitem.push_str(&user.name);
290            aclitem.push('=');
291            for (action, option) in actions {
292                aclitem.push_str(&AclMode::from(action).to_string());
293                if option {
294                    aclitem.push('*');
295                }
296            }
297            aclitem.push('/');
298            // should be able to query grantor's name
299            aclitem.push_str(username_map.get(&granted_by).unwrap());
300            res.push(aclitem);
301        }
302    }
303    res
304}
305
306pub struct SystemCatalog {
307    // table id = index + SYS_CATALOG_START_ID
308    catalogs: Vec<BuiltinCatalog>,
309}
310
311pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog>> {
312    SYS_CATALOGS
313        .catalogs
314        .iter()
315        .enumerate()
316        .filter_map(|(idx, c)| match c {
317            BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
318                SystemTableCatalog::from(t)
319                    .with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
320            )),
321            _ => None,
322        })
323        .collect()
324}
325
326pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<ViewCatalog> {
327    SYS_CATALOGS
328        .catalogs
329        .iter()
330        .enumerate()
331        .filter_map(|(idx, c)| match c {
332            BuiltinCatalog::View(v) if v.schema == schema_name => Some(
333                ViewCatalog::from(v).with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
334            ),
335            _ => None,
336        })
337        .collect()
338}
339
340pub fn is_system_catalog(oid: ObjectId) -> bool {
341    oid.as_raw_id() >= SYS_CATALOG_START_ID as u32
342}
343
344/// The global registry of all builtin catalogs.
345pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
346    tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
347    assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
348    let catalogs = SYS_CATALOGS_SLICE
349        .iter()
350        .map(|f| f())
351        .sorted_by_key(|c| c.full_name())
352        .collect();
353    SystemCatalog { catalogs }
354});
355
356#[linkme::distributed_slice]
357pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];
358
359impl SysCatalogReader for SysCatalogReaderImpl {
360    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
361        let table_name = SYS_CATALOGS
362            .catalogs
363            .get((table_id.as_raw_id() - SYS_CATALOG_START_ID as u32) as usize)
364            .unwrap();
365        match table_name {
366            BuiltinCatalog::Table(t) => (t.function)(self),
367            BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
368        }
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use risingwave_pb::meta::TableParallelism;
375    use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
376    use risingwave_pb::meta::table_parallelism::{
377        PbAdaptiveParallelism, PbFixedParallelism, PbParallelism,
378    };
379
380    use super::extract_parallelism_from_table_state;
381    use crate::catalog::system_catalog::SYS_CATALOGS;
382    use crate::test_utils::LocalFrontend;
383
384    #[tokio::test]
385    async fn test_builtin_view_definition() {
386        let frontend = LocalFrontend::new(Default::default()).await;
387        let sqls = SYS_CATALOGS.catalogs.iter().filter_map(|c| match c {
388            super::BuiltinCatalog::View(v) => Some(v.sql.clone()),
389            _ => None,
390        });
391        for sql in sqls {
392            frontend.query_formatted_result(sql).await;
393        }
394    }
395
396    #[test]
397    fn test_extract_parallelism_from_table_state_uses_unified_display() {
398        let fixed_state = StreamingJobState {
399            parallelism: Some(TableParallelism {
400                parallelism: Some(PbParallelism::Fixed(PbFixedParallelism { parallelism: 4 })),
401            }),
402            ..Default::default()
403        };
404        assert_eq!(extract_parallelism_from_table_state(&fixed_state), "4");
405
406        let adaptive_state = StreamingJobState {
407            parallelism: Some(TableParallelism {
408                parallelism: Some(PbParallelism::Adaptive(PbAdaptiveParallelism {})),
409            }),
410            adaptive_parallelism_strategy: Some("RATIO(0.5)".to_owned()),
411            ..Default::default()
412        };
413        assert_eq!(
414            extract_parallelism_from_table_state(&adaptive_state),
415            "ratio(0.5)"
416        );
417
418        let fallback_state = StreamingJobState {
419            parallelism: Some(TableParallelism {
420                parallelism: Some(PbParallelism::Adaptive(PbAdaptiveParallelism {})),
421            }),
422            ..Default::default()
423        };
424        assert_eq!(
425            extract_parallelism_from_table_state(&fallback_state),
426            "adaptive"
427        );
428    }
429}