risingwave_frontend/catalog/system_catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod information_schema;
16pub mod pg_catalog;
17pub mod rw_catalog;
18
19use std::collections::HashMap;
20use std::sync::{Arc, LazyLock};
21
22use futures::stream::BoxStream;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::acl::AclMode;
26use risingwave_common::array::DataChunk;
27use risingwave_common::catalog::{
28    ColumnCatalog, ColumnDesc, DEFAULT_SUPER_USER_ID, Field, MAX_SYS_CATALOG_NUM,
29    SYS_CATALOG_START_ID, SysCatalogReader, TableDesc, TableId,
30};
31use risingwave_common::error::BoxedError;
32use risingwave_common::session_config::SessionConfig;
33use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
34use risingwave_common::types::DataType;
35use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
36use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
37use risingwave_pb::user::grant_privilege::Object as GrantObject;
38
39use crate::catalog::catalog_service::CatalogReader;
40use crate::catalog::view_catalog::ViewCatalog;
41use crate::meta_client::FrontendMetaClient;
42use crate::session::AuthContext;
43use crate::user::UserId;
44use crate::user::user_catalog::UserCatalog;
45use crate::user::user_privilege::available_prost_privilege;
46use crate::user::user_service::UserInfoReader;
47
48#[derive(Clone, Debug, PartialEq)]
49pub struct SystemTableCatalog {
50    pub id: TableId,
51
52    pub name: String,
53
54    // All columns in this table.
55    pub columns: Vec<ColumnCatalog>,
56
57    /// Primary key columns indices.
58    pub pk: Vec<usize>,
59
60    // owner of table, should always be default super user, keep it for compatibility.
61    pub owner: u32,
62
63    /// description of table, set by `comment on`.
64    pub description: Option<String>,
65}
66
67impl SystemTableCatalog {
68    /// Get a reference to the system catalog's table id.
69    pub fn id(&self) -> TableId {
70        self.id
71    }
72
73    pub fn with_id(mut self, id: TableId) -> Self {
74        self.id = id;
75        self
76    }
77
78    /// Get a reference to the system catalog's columns.
79    pub fn columns(&self) -> &[ColumnCatalog] {
80        &self.columns
81    }
82
83    /// Get a [`TableDesc`] of the system table.
84    pub fn table_desc(&self) -> TableDesc {
85        TableDesc {
86            table_id: self.id,
87            columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
88            stream_key: self.pk.clone(),
89            ..Default::default()
90        }
91    }
92
93    /// Get a reference to the system catalog's name.
94    pub fn name(&self) -> &str {
95        self.name.as_ref()
96    }
97}
98
99pub struct SysCatalogReaderImpl {
100    // Read catalog info: database/schema/source/table.
101    catalog_reader: CatalogReader,
102    // Read user info.
103    user_info_reader: UserInfoReader,
104    // Read from meta.
105    meta_client: Arc<dyn FrontendMetaClient>,
106    // Read auth context.
107    auth_context: Arc<AuthContext>,
108    // Read config.
109    config: Arc<RwLock<SessionConfig>>,
110    // Read system params.
111    system_params: SystemParamsReaderRef,
112}
113
114impl SysCatalogReaderImpl {
115    pub fn new(
116        catalog_reader: CatalogReader,
117        user_info_reader: UserInfoReader,
118        meta_client: Arc<dyn FrontendMetaClient>,
119        auth_context: Arc<AuthContext>,
120        config: Arc<RwLock<SessionConfig>>,
121        system_params: SystemParamsReaderRef,
122    ) -> Self {
123        Self {
124            catalog_reader,
125            user_info_reader,
126            meta_client,
127            auth_context,
128            config,
129            system_params,
130        }
131    }
132}
133
134pub struct BuiltinTable {
135    name: &'static str,
136    schema: &'static str,
137    columns: Vec<SystemCatalogColumnsDef<'static>>,
138    pk: &'static [usize],
139    function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
140}
141
142pub struct BuiltinView {
143    name: &'static str,
144    schema: &'static str,
145    columns: Vec<SystemCatalogColumnsDef<'static>>,
146    sql: String,
147}
148
149pub enum BuiltinCatalog {
150    Table(BuiltinTable),
151    View(BuiltinView),
152}
153
154impl BuiltinCatalog {
155    fn full_name(&self) -> String {
156        match self {
157            BuiltinCatalog::Table(t) => format!("{}.{}", t.schema, t.name),
158            BuiltinCatalog::View(t) => format!("{}.{}", t.schema, t.name),
159        }
160    }
161}
162
163impl From<&BuiltinTable> for SystemTableCatalog {
164    fn from(val: &BuiltinTable) -> Self {
165        SystemTableCatalog {
166            id: TableId::placeholder(),
167            name: val.name.to_owned(),
168            columns: val
169                .columns
170                .iter()
171                .enumerate()
172                .map(|(idx, (name, ty))| ColumnCatalog {
173                    column_desc: ColumnDesc::named(*name, (idx as i32).into(), ty.clone()),
174                    is_hidden: false,
175                })
176                .collect(),
177            pk: val.pk.to_vec(),
178            owner: DEFAULT_SUPER_USER_ID,
179            description: None,
180        }
181    }
182}
183
184impl From<&BuiltinView> for ViewCatalog {
185    fn from(val: &BuiltinView) -> Self {
186        ViewCatalog {
187            id: 0,
188            name: val.name.to_owned(),
189            schema_id: 0,
190            database_id: 0,
191            columns: val
192                .columns
193                .iter()
194                .map(|(name, ty)| Field::with_name(ty.clone(), name.to_string()))
195                .collect(),
196            sql: val.sql.clone(),
197            owner: DEFAULT_SUPER_USER_ID,
198            properties: Default::default(),
199        }
200    }
201}
202
203// TODO: support struct column and type name when necessary.
204pub(super) type SystemCatalogColumnsDef<'a> = (&'a str, DataType);
205
206/// `infer_dummy_view_sql` returns a dummy SQL statement for a view with the given columns that
207/// returns no rows. For example, with columns `a` and `b`, it returns `SELECT NULL::integer AS a,
208/// NULL::varchar AS b WHERE 1 != 1`.
209// FIXME(noel): Tracked by <https://github.com/risingwavelabs/risingwave/issues/3431#issuecomment-1164160988>
210#[inline(always)]
211pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
212    format!(
213        "SELECT {} WHERE 1 != 1",
214        columns
215            .iter()
216            .map(|(name, ty)| format!("NULL::{} AS {}", ty, name))
217            .join(", ")
218    )
219}
220
221fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
222    match state
223        .parallelism
224        .as_ref()
225        .and_then(|parallelism| parallelism.parallelism.as_ref())
226    {
227        Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => "adaptive".to_owned(),
228        Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => {
229            format!("fixed({parallelism})")
230        }
231        Some(PbParallelism::Custom(_)) => "custom".to_owned(),
232        None => "unknown".to_owned(),
233    }
234}
235
236/// get acl items of `object` in string, ignore public.
237fn get_acl_items(
238    object: &GrantObject,
239    for_dml_table: bool,
240    users: &Vec<UserCatalog>,
241    username_map: &HashMap<UserId, String>,
242) -> Vec<String> {
243    let mut res = vec![];
244    let super_privilege = available_prost_privilege(*object, for_dml_table);
245    for user in users {
246        let privileges = if user.is_super {
247            vec![&super_privilege]
248        } else {
249            user.grant_privileges
250                .iter()
251                .filter(|&privilege| privilege.object.as_ref().unwrap() == object)
252                .collect_vec()
253        };
254        if privileges.is_empty() {
255            continue;
256        };
257        let mut grantor_map = HashMap::new();
258        privileges.iter().for_each(|&privilege| {
259            privilege.action_with_opts.iter().for_each(|ao| {
260                grantor_map
261                    .entry(ao.granted_by)
262                    .or_insert_with(Vec::new)
263                    .push((ao.get_action().unwrap(), ao.with_grant_option));
264            })
265        });
266        for (granted_by, actions) in grantor_map {
267            let mut aclitem = String::new();
268            aclitem.push_str(&user.name);
269            aclitem.push('=');
270            for (action, option) in actions {
271                aclitem.push_str(&AclMode::from(action).to_string());
272                if option {
273                    aclitem.push('*');
274                }
275            }
276            aclitem.push('/');
277            // should be able to query grantor's name
278            aclitem.push_str(username_map.get(&granted_by).unwrap());
279            res.push(aclitem);
280        }
281    }
282    res
283}
284
285pub struct SystemCatalog {
286    // table id = index + SYS_CATALOG_START_ID
287    catalogs: Vec<BuiltinCatalog>,
288}
289
290pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog>> {
291    SYS_CATALOGS
292        .catalogs
293        .iter()
294        .enumerate()
295        .filter_map(|(idx, c)| match c {
296            BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
297                SystemTableCatalog::from(t)
298                    .with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
299            )),
300            _ => None,
301        })
302        .collect()
303}
304
305pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<ViewCatalog> {
306    SYS_CATALOGS
307        .catalogs
308        .iter()
309        .enumerate()
310        .filter_map(|(idx, c)| match c {
311            BuiltinCatalog::View(v) if v.schema == schema_name => {
312                Some(ViewCatalog::from(v).with_id(idx as u32 + SYS_CATALOG_START_ID as u32))
313            }
314            _ => None,
315        })
316        .collect()
317}
318
319pub fn is_system_catalog(oid: u32) -> bool {
320    oid >= SYS_CATALOG_START_ID as u32
321}
322
323/// The global registry of all builtin catalogs.
324pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
325    tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
326    assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
327    let catalogs = SYS_CATALOGS_SLICE
328        .iter()
329        .map(|f| f())
330        .sorted_by_key(|c| c.full_name())
331        .collect();
332    SystemCatalog { catalogs }
333});
334
335#[linkme::distributed_slice]
336pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];
337
338impl SysCatalogReader for SysCatalogReaderImpl {
339    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
340        let table_name = SYS_CATALOGS
341            .catalogs
342            .get((table_id.table_id - SYS_CATALOG_START_ID as u32) as usize)
343            .unwrap();
344        match table_name {
345            BuiltinCatalog::Table(t) => (t.function)(self),
346            BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
347        }
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use crate::catalog::system_catalog::SYS_CATALOGS;
354    use crate::test_utils::LocalFrontend;
355
356    #[tokio::test]
357    async fn test_builtin_view_definition() {
358        let frontend = LocalFrontend::new(Default::default()).await;
359        let sqls = SYS_CATALOGS.catalogs.iter().filter_map(|c| match c {
360            super::BuiltinCatalog::View(v) => Some(v.sql.clone()),
361            _ => None,
362        });
363        for sql in sqls {
364            frontend.query_formatted_result(sql).await;
365        }
366    }
367}