risingwave_frontend/catalog/system_catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod information_schema;
16pub mod pg_catalog;
17pub mod rw_catalog;
18
19use std::collections::HashMap;
20use std::sync::{Arc, LazyLock};
21
22use futures::stream::BoxStream;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::acl::AclMode;
26use risingwave_common::array::DataChunk;
27use risingwave_common::catalog::{
28    ColumnCatalog, ColumnDesc, DEFAULT_SUPER_USER_ID, Field, MAX_SYS_CATALOG_NUM,
29    SYS_CATALOG_START_ID, SysCatalogReader, TableId,
30};
31use risingwave_common::error::BoxedError;
32use risingwave_common::id::ObjectId;
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
35use risingwave_common::types::DataType;
36use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
37use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
38use risingwave_pb::user::grant_privilege::Object as GrantObject;
39
40use crate::catalog::catalog_service::CatalogReader;
41use crate::catalog::view_catalog::ViewCatalog;
42use crate::meta_client::FrontendMetaClient;
43use crate::session::AuthContext;
44use crate::user::UserId;
45use crate::user::user_catalog::UserCatalog;
46use crate::user::user_privilege::available_prost_privilege;
47use crate::user::user_service::UserInfoReader;
48
49#[derive(Clone, Debug, PartialEq, Hash)]
50pub struct SystemTableCatalog {
51    pub id: TableId,
52
53    pub name: String,
54
55    // All columns in this table.
56    pub columns: Vec<ColumnCatalog>,
57
58    /// Primary key columns indices.
59    pub pk: Vec<usize>,
60
61    // owner of table, should always be default super user, keep it for compatibility.
62    pub owner: u32,
63
64    /// description of table, set by `comment on`.
65    pub description: Option<String>,
66}
67
68impl SystemTableCatalog {
69    /// Get a reference to the system catalog's table id.
70    pub fn id(&self) -> TableId {
71        self.id
72    }
73
74    pub fn with_id(mut self, id: TableId) -> Self {
75        self.id = id;
76        self
77    }
78
79    /// Get a reference to the system catalog's columns.
80    pub fn columns(&self) -> &[ColumnCatalog] {
81        &self.columns
82    }
83
84    /// Get a reference to the system catalog's name.
85    pub fn name(&self) -> &str {
86        self.name.as_ref()
87    }
88}
89
90pub struct SysCatalogReaderImpl {
91    // Read catalog info: database/schema/source/table.
92    catalog_reader: CatalogReader,
93    // Read user info.
94    user_info_reader: UserInfoReader,
95    // Read from meta.
96    meta_client: Arc<dyn FrontendMetaClient>,
97    // Read auth context.
98    auth_context: Arc<AuthContext>,
99    // Read config.
100    config: Arc<RwLock<SessionConfig>>,
101    // Read system params.
102    system_params: SystemParamsReaderRef,
103}
104
105impl SysCatalogReaderImpl {
106    pub fn new(
107        catalog_reader: CatalogReader,
108        user_info_reader: UserInfoReader,
109        meta_client: Arc<dyn FrontendMetaClient>,
110        auth_context: Arc<AuthContext>,
111        config: Arc<RwLock<SessionConfig>>,
112        system_params: SystemParamsReaderRef,
113    ) -> Self {
114        Self {
115            catalog_reader,
116            user_info_reader,
117            meta_client,
118            auth_context,
119            config,
120            system_params,
121        }
122    }
123}
124
125pub struct BuiltinTable {
126    name: &'static str,
127    schema: &'static str,
128    columns: Vec<SystemCatalogColumnsDef<'static>>,
129    pk: &'static [usize],
130    function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
131}
132
133pub struct BuiltinView {
134    name: &'static str,
135    schema: &'static str,
136    columns: Vec<SystemCatalogColumnsDef<'static>>,
137    sql: String,
138}
139
140pub enum BuiltinCatalog {
141    Table(BuiltinTable),
142    View(BuiltinView),
143}
144
145impl BuiltinCatalog {
146    fn full_name(&self) -> String {
147        match self {
148            BuiltinCatalog::Table(t) => format!("{}.{}", t.schema, t.name),
149            BuiltinCatalog::View(t) => format!("{}.{}", t.schema, t.name),
150        }
151    }
152}
153
154impl From<&BuiltinTable> for SystemTableCatalog {
155    fn from(val: &BuiltinTable) -> Self {
156        SystemTableCatalog {
157            id: TableId::placeholder(),
158            name: val.name.to_owned(),
159            columns: val
160                .columns
161                .iter()
162                .enumerate()
163                .map(|(idx, (name, ty))| ColumnCatalog {
164                    column_desc: ColumnDesc::named(*name, (idx as i32).into(), ty.clone()),
165                    is_hidden: false,
166                })
167                .collect(),
168            pk: val.pk.to_vec(),
169            owner: DEFAULT_SUPER_USER_ID,
170            description: None,
171        }
172    }
173}
174
175impl From<&BuiltinView> for ViewCatalog {
176    fn from(val: &BuiltinView) -> Self {
177        ViewCatalog {
178            id: 0.into(),
179            name: val.name.to_owned(),
180            schema_id: 0.into(),
181            database_id: 0.into(),
182            columns: val
183                .columns
184                .iter()
185                .map(|(name, ty)| Field::with_name(ty.clone(), name.to_string()))
186                .collect(),
187            sql: val.sql.clone(),
188            owner: DEFAULT_SUPER_USER_ID,
189            properties: Default::default(),
190        }
191    }
192}
193
194// TODO: support struct column and type name when necessary.
195pub(super) type SystemCatalogColumnsDef<'a> = (&'a str, DataType);
196
197/// `infer_dummy_view_sql` returns a dummy SQL statement for a view with the given columns that
198/// returns no rows. For example, with columns `a` and `b`, it returns `SELECT NULL::integer AS a,
199/// NULL::varchar AS b WHERE 1 != 1`.
200// FIXME(noel): Tracked by <https://github.com/risingwavelabs/risingwave/issues/3431#issuecomment-1164160988>
201#[inline(always)]
202pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
203    format!(
204        "SELECT {} WHERE 1 != 1",
205        columns
206            .iter()
207            .map(|(name, ty)| format!("NULL::{} AS {}", ty, name))
208            .join(", ")
209    )
210}
211
212fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
213    match state
214        .parallelism
215        .as_ref()
216        .and_then(|parallelism| parallelism.parallelism.as_ref())
217    {
218        Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => "adaptive".to_owned(),
219        Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => {
220            format!("fixed({parallelism})")
221        }
222        Some(PbParallelism::Custom(_)) => "custom".to_owned(),
223        None => "unknown".to_owned(),
224    }
225}
226
227/// get acl items of `object` in string, ignore public.
228fn get_acl_items(
229    object: impl Into<GrantObject>,
230    for_dml_table: bool,
231    users: &Vec<UserCatalog>,
232    username_map: &HashMap<UserId, String>,
233) -> Vec<String> {
234    let object = object.into();
235    let mut res = vec![];
236    let super_privilege = available_prost_privilege(object, for_dml_table);
237    for user in users {
238        let privileges = if user.is_super {
239            vec![&super_privilege]
240        } else {
241            user.grant_privileges
242                .iter()
243                .filter(|&privilege| privilege.object.as_ref().unwrap() == &object)
244                .collect_vec()
245        };
246        if privileges.is_empty() {
247            continue;
248        };
249        let mut grantor_map = HashMap::new();
250        privileges.iter().for_each(|&privilege| {
251            privilege.action_with_opts.iter().for_each(|ao| {
252                grantor_map
253                    .entry(ao.granted_by)
254                    .or_insert_with(Vec::new)
255                    .push((ao.get_action().unwrap(), ao.with_grant_option));
256            })
257        });
258        for (granted_by, actions) in grantor_map {
259            let mut aclitem = String::new();
260            aclitem.push_str(&user.name);
261            aclitem.push('=');
262            for (action, option) in actions {
263                aclitem.push_str(&AclMode::from(action).to_string());
264                if option {
265                    aclitem.push('*');
266                }
267            }
268            aclitem.push('/');
269            // should be able to query grantor's name
270            aclitem.push_str(username_map.get(&granted_by).unwrap());
271            res.push(aclitem);
272        }
273    }
274    res
275}
276
277pub struct SystemCatalog {
278    // table id = index + SYS_CATALOG_START_ID
279    catalogs: Vec<BuiltinCatalog>,
280}
281
282pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog>> {
283    SYS_CATALOGS
284        .catalogs
285        .iter()
286        .enumerate()
287        .filter_map(|(idx, c)| match c {
288            BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
289                SystemTableCatalog::from(t)
290                    .with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
291            )),
292            _ => None,
293        })
294        .collect()
295}
296
297pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<ViewCatalog> {
298    SYS_CATALOGS
299        .catalogs
300        .iter()
301        .enumerate()
302        .filter_map(|(idx, c)| match c {
303            BuiltinCatalog::View(v) if v.schema == schema_name => Some(
304                ViewCatalog::from(v).with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
305            ),
306            _ => None,
307        })
308        .collect()
309}
310
311pub fn is_system_catalog(oid: ObjectId) -> bool {
312    oid.as_raw_id() >= SYS_CATALOG_START_ID as u32
313}
314
315/// The global registry of all builtin catalogs.
316pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
317    tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
318    assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
319    let catalogs = SYS_CATALOGS_SLICE
320        .iter()
321        .map(|f| f())
322        .sorted_by_key(|c| c.full_name())
323        .collect();
324    SystemCatalog { catalogs }
325});
326
327#[linkme::distributed_slice]
328pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];
329
330impl SysCatalogReader for SysCatalogReaderImpl {
331    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
332        let table_name = SYS_CATALOGS
333            .catalogs
334            .get((table_id.as_raw_id() - SYS_CATALOG_START_ID as u32) as usize)
335            .unwrap();
336        match table_name {
337            BuiltinCatalog::Table(t) => (t.function)(self),
338            BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
339        }
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use crate::catalog::system_catalog::SYS_CATALOGS;
346    use crate::test_utils::LocalFrontend;
347
348    #[tokio::test]
349    async fn test_builtin_view_definition() {
350        let frontend = LocalFrontend::new(Default::default()).await;
351        let sqls = SYS_CATALOGS.catalogs.iter().filter_map(|c| match c {
352            super::BuiltinCatalog::View(v) => Some(v.sql.clone()),
353            _ => None,
354        });
355        for sql in sqls {
356            frontend.query_formatted_result(sql).await;
357        }
358    }
359}