risingwave_frontend/catalog/system_catalog/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod information_schema;
16pub mod pg_catalog;
17pub mod rw_catalog;
18
19use std::collections::HashMap;
20use std::sync::{Arc, LazyLock};
21
22use futures::stream::BoxStream;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::acl::AclMode;
26use risingwave_common::array::DataChunk;
27use risingwave_common::catalog::{
28    ColumnCatalog, ColumnDesc, DEFAULT_SUPER_USER_ID, Field, MAX_SYS_CATALOG_NUM,
29    SYS_CATALOG_START_ID, SysCatalogReader, TableId,
30};
31use risingwave_common::error::BoxedError;
32use risingwave_common::id::ObjectId;
33use risingwave_common::metrics_reader::MetricsReader;
34use risingwave_common::session_config::SessionConfig;
35use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
36use risingwave_common::types::DataType;
37use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
38use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
39use risingwave_pb::user::grant_privilege::Object as GrantObject;
40
41use crate::catalog::catalog_service::CatalogReader;
42use crate::catalog::view_catalog::ViewCatalog;
43use crate::meta_client::FrontendMetaClient;
44use crate::session::AuthContext;
45use crate::user::UserId;
46use crate::user::user_catalog::UserCatalog;
47use crate::user::user_privilege::available_prost_privilege;
48use crate::user::user_service::UserInfoReader;
49
50#[derive(Clone, Debug, PartialEq, Hash)]
51pub struct SystemTableCatalog {
52    pub id: TableId,
53
54    pub name: String,
55
56    // All columns in this table.
57    pub columns: Vec<ColumnCatalog>,
58
59    /// Primary key columns indices.
60    pub pk: Vec<usize>,
61
62    // owner of table, should always be default super user, keep it for compatibility.
63    pub owner: UserId,
64
65    /// description of table, set by `comment on`.
66    pub description: Option<String>,
67}
68
69impl SystemTableCatalog {
70    /// Get a reference to the system catalog's table id.
71    pub fn id(&self) -> TableId {
72        self.id
73    }
74
75    pub fn with_id(mut self, id: TableId) -> Self {
76        self.id = id;
77        self
78    }
79
80    /// Get a reference to the system catalog's columns.
81    pub fn columns(&self) -> &[ColumnCatalog] {
82        &self.columns
83    }
84
85    /// Get a reference to the system catalog's name.
86    pub fn name(&self) -> &str {
87        self.name.as_ref()
88    }
89}
90
91pub struct SysCatalogReaderImpl {
92    // Read catalog info: database/schema/source/table.
93    catalog_reader: CatalogReader,
94    // Read user info.
95    user_info_reader: UserInfoReader,
96    // Read from meta.
97    meta_client: Arc<dyn FrontendMetaClient>,
98    // Read auth context.
99    auth_context: Arc<AuthContext>,
100    // Read config.
101    config: Arc<RwLock<SessionConfig>>,
102    // Read system params.
103    system_params: SystemParamsReaderRef,
104    // Read metrics.
105    pub(super) metrics_reader: Arc<dyn MetricsReader>,
106}
107
108impl SysCatalogReaderImpl {
109    pub fn new(
110        catalog_reader: CatalogReader,
111        user_info_reader: UserInfoReader,
112        meta_client: Arc<dyn FrontendMetaClient>,
113        auth_context: Arc<AuthContext>,
114        config: Arc<RwLock<SessionConfig>>,
115        system_params: SystemParamsReaderRef,
116        metrics_reader: Arc<dyn MetricsReader>,
117    ) -> Self {
118        Self {
119            catalog_reader,
120            user_info_reader,
121            meta_client,
122            auth_context,
123            config,
124            system_params,
125            metrics_reader,
126        }
127    }
128}
129
130pub struct BuiltinTable {
131    name: &'static str,
132    schema: &'static str,
133    columns: Vec<SystemCatalogColumnsDef<'static>>,
134    pk: &'static [usize],
135    function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
136}
137
138pub struct BuiltinView {
139    name: &'static str,
140    schema: &'static str,
141    columns: Vec<SystemCatalogColumnsDef<'static>>,
142    sql: String,
143}
144
145pub enum BuiltinCatalog {
146    Table(BuiltinTable),
147    View(BuiltinView),
148}
149
150impl BuiltinCatalog {
151    fn full_name(&self) -> String {
152        match self {
153            BuiltinCatalog::Table(t) => format!("{}.{}", t.schema, t.name),
154            BuiltinCatalog::View(t) => format!("{}.{}", t.schema, t.name),
155        }
156    }
157}
158
159impl From<&BuiltinTable> for SystemTableCatalog {
160    fn from(val: &BuiltinTable) -> Self {
161        SystemTableCatalog {
162            id: TableId::placeholder(),
163            name: val.name.to_owned(),
164            columns: val
165                .columns
166                .iter()
167                .enumerate()
168                .map(|(idx, (name, ty))| ColumnCatalog {
169                    column_desc: ColumnDesc::named(*name, (idx as i32).into(), ty.clone()),
170                    is_hidden: false,
171                })
172                .collect(),
173            pk: val.pk.to_vec(),
174            owner: DEFAULT_SUPER_USER_ID,
175            description: None,
176        }
177    }
178}
179
180impl From<&BuiltinView> for ViewCatalog {
181    fn from(val: &BuiltinView) -> Self {
182        ViewCatalog {
183            id: 0.into(),
184            name: val.name.to_owned(),
185            schema_id: 0.into(),
186            database_id: 0.into(),
187            columns: val
188                .columns
189                .iter()
190                .map(|(name, ty)| Field::with_name(ty.clone(), name.to_string()))
191                .collect(),
192            sql: val.sql.clone(),
193            owner: DEFAULT_SUPER_USER_ID,
194            properties: Default::default(),
195            created_at_epoch: None,
196            created_at_cluster_version: None,
197        }
198    }
199}
200
201// TODO: support struct column and type name when necessary.
202pub(super) type SystemCatalogColumnsDef<'a> = (&'a str, DataType);
203
204/// `infer_dummy_view_sql` returns a dummy SQL statement for a view with the given columns that
205/// returns no rows. For example, with columns `a` and `b`, it returns `SELECT NULL::integer AS a,
206/// NULL::varchar AS b WHERE 1 != 1`.
207// FIXME(noel): Tracked by <https://github.com/risingwavelabs/risingwave/issues/3431#issuecomment-1164160988>
208#[inline(always)]
209pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
210    format!(
211        "SELECT {} WHERE 1 != 1",
212        columns
213            .iter()
214            .map(|(name, ty)| format!("NULL::{} AS {}", ty, name))
215            .join(", ")
216    )
217}
218
219fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
220    match state
221        .parallelism
222        .as_ref()
223        .and_then(|parallelism| parallelism.parallelism.as_ref())
224    {
225        Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => "adaptive".to_owned(),
226        Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => {
227            format!("fixed({parallelism})")
228        }
229        Some(PbParallelism::Custom(_)) => "custom".to_owned(),
230        None => "unknown".to_owned(),
231    }
232}
233
234/// get acl items of `object` in string, ignore public.
235fn get_acl_items(
236    object: impl Into<GrantObject>,
237    for_dml_table: bool,
238    users: &Vec<UserCatalog>,
239    username_map: &HashMap<UserId, String>,
240) -> Vec<String> {
241    let object = object.into();
242    let mut res = vec![];
243    let super_privilege = available_prost_privilege(object, for_dml_table);
244    for user in users {
245        let privileges = if user.is_super {
246            vec![&super_privilege]
247        } else {
248            user.grant_privileges
249                .iter()
250                .filter(|&privilege| privilege.object.as_ref().unwrap() == &object)
251                .collect_vec()
252        };
253        if privileges.is_empty() {
254            continue;
255        };
256        let mut grantor_map = HashMap::new();
257        privileges.iter().for_each(|&privilege| {
258            privilege.action_with_opts.iter().for_each(|ao| {
259                grantor_map
260                    .entry(ao.granted_by)
261                    .or_insert_with(Vec::new)
262                    .push((ao.get_action().unwrap(), ao.with_grant_option));
263            })
264        });
265        for (granted_by, actions) in grantor_map {
266            let mut aclitem = String::new();
267            aclitem.push_str(&user.name);
268            aclitem.push('=');
269            for (action, option) in actions {
270                aclitem.push_str(&AclMode::from(action).to_string());
271                if option {
272                    aclitem.push('*');
273                }
274            }
275            aclitem.push('/');
276            // should be able to query grantor's name
277            aclitem.push_str(username_map.get(&granted_by).unwrap());
278            res.push(aclitem);
279        }
280    }
281    res
282}
283
284pub struct SystemCatalog {
285    // table id = index + SYS_CATALOG_START_ID
286    catalogs: Vec<BuiltinCatalog>,
287}
288
289pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog>> {
290    SYS_CATALOGS
291        .catalogs
292        .iter()
293        .enumerate()
294        .filter_map(|(idx, c)| match c {
295            BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
296                SystemTableCatalog::from(t)
297                    .with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
298            )),
299            _ => None,
300        })
301        .collect()
302}
303
304pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<ViewCatalog> {
305    SYS_CATALOGS
306        .catalogs
307        .iter()
308        .enumerate()
309        .filter_map(|(idx, c)| match c {
310            BuiltinCatalog::View(v) if v.schema == schema_name => Some(
311                ViewCatalog::from(v).with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
312            ),
313            _ => None,
314        })
315        .collect()
316}
317
318pub fn is_system_catalog(oid: ObjectId) -> bool {
319    oid.as_raw_id() >= SYS_CATALOG_START_ID as u32
320}
321
322/// The global registry of all builtin catalogs.
323pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
324    tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
325    assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
326    let catalogs = SYS_CATALOGS_SLICE
327        .iter()
328        .map(|f| f())
329        .sorted_by_key(|c| c.full_name())
330        .collect();
331    SystemCatalog { catalogs }
332});
333
334#[linkme::distributed_slice]
335pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];
336
337impl SysCatalogReader for SysCatalogReaderImpl {
338    fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
339        let table_name = SYS_CATALOGS
340            .catalogs
341            .get((table_id.as_raw_id() - SYS_CATALOG_START_ID as u32) as usize)
342            .unwrap();
343        match table_name {
344            BuiltinCatalog::Table(t) => (t.function)(self),
345            BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
346        }
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use crate::catalog::system_catalog::SYS_CATALOGS;
353    use crate::test_utils::LocalFrontend;
354
355    #[tokio::test]
356    async fn test_builtin_view_definition() {
357        let frontend = LocalFrontend::new(Default::default()).await;
358        let sqls = SYS_CATALOGS.catalogs.iter().filter_map(|c| match c {
359            super::BuiltinCatalog::View(v) => Some(v.sql.clone()),
360            _ => None,
361        });
362        for sql in sqls {
363            frontend.query_formatted_result(sql).await;
364        }
365    }
366}