risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_columns.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::Fields;
16use risingwave_frontend_macro::system_catalog;
17use risingwave_pb::id::RelationId;
18
19use crate::catalog::schema_catalog::SchemaCatalog;
20use crate::catalog::system_catalog::SysCatalogReaderImpl;
21use crate::error::Result;
22use crate::expr::{ExprDisplay, ExprImpl};
23use crate::user::user_catalog::UserCatalog;
24
25#[derive(Fields)]
26#[primary_key(relation_id, name)]
27struct RwColumn {
28    relation_id: RelationId,
29    // belonged relation id
30    name: String,
31    // column name
32    position: i32,
33    // 1-indexed position
34    is_hidden: bool,
35    is_primary_key: bool,
36    is_distribution_key: bool,
37    is_generated: bool,
38    is_nullable: bool,
39    generation_expression: Option<String>,
40    data_type: String,
41    type_oid: i32,
42    type_len: i16,
43    udt_type: String,
44}
45
46#[system_catalog(table, "rw_catalog.rw_columns")]
47fn read_rw_columns(reader: &SysCatalogReaderImpl) -> Result<Vec<RwColumn>> {
48    let catalog_reader = reader.catalog_reader.read_guard();
49    let user_reader = reader.user_info_reader.read_guard();
50    let current_user = user_reader
51        .get_user_by_name(&reader.auth_context.user_name)
52        .expect("user not found");
53    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
54
55    Ok(schemas
56        .flat_map(|s| read_rw_columns_in_schema(current_user, s))
57        .collect())
58}
59
60fn read_rw_columns_in_schema(current_user: &UserCatalog, schema: &SchemaCatalog) -> Vec<RwColumn> {
61    let view_rows = schema.iter_view_with_acl(current_user).flat_map(|view| {
62        view.columns
63            .iter()
64            .enumerate()
65            .map(|(index, column)| RwColumn {
66                relation_id: view.id.as_relation_id(),
67                name: column.name.clone(),
68                position: index as i32 + 1,
69                is_hidden: false,
70                is_primary_key: false,
71                is_distribution_key: false,
72                is_generated: false,
73                is_nullable: true,
74                generation_expression: None,
75                data_type: column.data_type().to_string(),
76                type_oid: column.data_type().to_oid(),
77                type_len: column.data_type().type_len(),
78                udt_type: column.data_type().pg_name().into(),
79            })
80    });
81
82    let sink_rows = schema.iter_sink_with_acl(current_user).flat_map(|sink| {
83        sink.full_columns()
84            .iter()
85            .enumerate()
86            .map(|(index, column)| RwColumn {
87                relation_id: sink.id.as_relation_id(),
88                name: column.name().into(),
89                position: index as i32 + 1,
90                is_hidden: column.is_hidden,
91                is_primary_key: (sink.downstream_pk.as_ref())
92                    .map(|pk| pk.contains(&index))
93                    .unwrap_or(false),
94                is_distribution_key: sink.distribution_key.contains(&index),
95                is_generated: false,
96                is_nullable: column.nullable(),
97                generation_expression: None,
98                data_type: column.data_type().to_string(),
99                type_oid: column.data_type().to_oid(),
100                type_len: column.data_type().type_len(),
101                udt_type: column.data_type().pg_name().into(),
102            })
103    });
104
105    let catalog_rows = schema.iter_system_tables().flat_map(|table| {
106        table
107            .columns
108            .iter()
109            .enumerate()
110            .map(move |(index, column)| RwColumn {
111                relation_id: table.id.as_relation_id(),
112                name: column.name().into(),
113                position: index as i32 + 1,
114                is_hidden: column.is_hidden,
115                is_primary_key: table.pk.contains(&index),
116                is_distribution_key: false,
117                is_generated: false,
118                is_nullable: column.nullable(),
119                generation_expression: None,
120                data_type: column.data_type().to_string(),
121                type_oid: column.data_type().to_oid(),
122                type_len: column.data_type().type_len(),
123                udt_type: column.data_type().pg_name().into(),
124            })
125    });
126
127    let table_rows = schema
128        .iter_table_mv_indices_with_acl(current_user)
129        .flat_map(|table| {
130            let schema = table.column_schema();
131            table
132                .columns
133                .iter()
134                .enumerate()
135                .map(move |(index, column)| RwColumn {
136                    relation_id: table.id.as_relation_id(),
137                    name: column.name().into(),
138                    position: index as i32 + 1,
139                    is_hidden: column.is_hidden,
140                    is_primary_key: table.pk().iter().any(|idx| idx.column_index == index),
141                    is_distribution_key: table.distribution_key.contains(&index),
142                    is_generated: column.is_generated(),
143                    is_nullable: column.nullable(),
144                    generation_expression: column.generated_expr().map(|expr_node| {
145                        let expr = ExprImpl::from_expr_proto(expr_node).unwrap();
146                        let expr_display = ExprDisplay {
147                            expr: &expr,
148                            input_schema: &schema,
149                        };
150                        expr_display.to_string()
151                    }),
152                    data_type: column.data_type().to_string(),
153                    type_oid: column.data_type().to_oid(),
154                    type_len: column.data_type().type_len(),
155                    udt_type: column.data_type().pg_name().into(),
156                })
157        });
158
159    let schema_rows = schema
160        .iter_source_with_acl(current_user)
161        .filter(|source| source.associated_table_id.is_none())
162        .flat_map(|source| {
163            source
164                .columns
165                .iter()
166                .enumerate()
167                .map(move |(index, column)| RwColumn {
168                    relation_id: source.id.as_relation_id(),
169                    name: column.name().into(),
170                    position: index as i32 + 1,
171                    is_hidden: column.is_hidden,
172                    is_primary_key: source.pk_col_ids.contains(&column.column_id()),
173                    is_distribution_key: false,
174                    is_generated: false,
175                    is_nullable: column.nullable(),
176                    generation_expression: None,
177                    data_type: column.data_type().to_string(),
178                    type_oid: column.data_type().to_oid(),
179                    type_len: column.data_type().type_len(),
180                    udt_type: column.data_type().pg_name().into(),
181                })
182        });
183
184    view_rows
185        .chain(sink_rows)
186        .chain(catalog_rows)
187        .chain(table_rows)
188        .chain(schema_rows)
189        .collect()
190}