risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_columns.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::Fields;
16use risingwave_frontend_macro::system_catalog;
17
18use crate::catalog::schema_catalog::SchemaCatalog;
19use crate::catalog::system_catalog::SysCatalogReaderImpl;
20use crate::error::Result;
21use crate::expr::{ExprDisplay, ExprImpl};
22use crate::user::user_catalog::UserCatalog;
23
24#[derive(Fields)]
25#[primary_key(relation_id, name)]
26struct RwColumn {
27    relation_id: i32,
28    // belonged relation id
29    name: String,
30    // column name
31    position: i32,
32    // 1-indexed position
33    is_hidden: bool,
34    is_primary_key: bool,
35    is_distribution_key: bool,
36    is_generated: bool,
37    is_nullable: bool,
38    generation_expression: Option<String>,
39    data_type: String,
40    type_oid: i32,
41    type_len: i16,
42    udt_type: String,
43}
44
45#[system_catalog(table, "rw_catalog.rw_columns")]
46fn read_rw_columns(reader: &SysCatalogReaderImpl) -> Result<Vec<RwColumn>> {
47    let catalog_reader = reader.catalog_reader.read_guard();
48    let user_reader = reader.user_info_reader.read_guard();
49    let current_user = user_reader
50        .get_user_by_name(&reader.auth_context.user_name)
51        .expect("user not found");
52    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
53
54    Ok(schemas
55        .flat_map(|s| read_rw_columns_in_schema(current_user, s))
56        .collect())
57}
58
59fn read_rw_columns_in_schema(current_user: &UserCatalog, schema: &SchemaCatalog) -> Vec<RwColumn> {
60    let view_rows = schema.iter_view_with_acl(current_user).flat_map(|view| {
61        view.columns
62            .iter()
63            .enumerate()
64            .map(|(index, column)| RwColumn {
65                relation_id: view.id as i32,
66                name: column.name.clone(),
67                position: index as i32 + 1,
68                is_hidden: false,
69                is_primary_key: false,
70                is_distribution_key: false,
71                is_generated: false,
72                is_nullable: false,
73                generation_expression: None,
74                data_type: column.data_type().to_string(),
75                type_oid: column.data_type().to_oid(),
76                type_len: column.data_type().type_len(),
77                udt_type: column.data_type().pg_name().into(),
78            })
79    });
80
81    let sink_rows = schema.iter_sink_with_acl(current_user).flat_map(|sink| {
82        sink.full_columns()
83            .iter()
84            .enumerate()
85            .map(|(index, column)| RwColumn {
86                relation_id: sink.id.sink_id as i32,
87                name: column.name().into(),
88                position: index as i32 + 1,
89                is_hidden: column.is_hidden,
90                is_primary_key: sink.downstream_pk.contains(&index),
91                is_distribution_key: sink.distribution_key.contains(&index),
92                is_generated: false,
93                is_nullable: column.nullable(),
94                generation_expression: None,
95                data_type: column.data_type().to_string(),
96                type_oid: column.data_type().to_oid(),
97                type_len: column.data_type().type_len(),
98                udt_type: column.data_type().pg_name().into(),
99            })
100    });
101
102    let catalog_rows = schema.iter_system_tables().flat_map(|table| {
103        table
104            .columns
105            .iter()
106            .enumerate()
107            .map(move |(index, column)| RwColumn {
108                relation_id: table.id.table_id as i32,
109                name: column.name().into(),
110                position: index as i32 + 1,
111                is_hidden: column.is_hidden,
112                is_primary_key: table.pk.contains(&index),
113                is_distribution_key: false,
114                is_generated: false,
115                is_nullable: column.nullable(),
116                generation_expression: None,
117                data_type: column.data_type().to_string(),
118                type_oid: column.data_type().to_oid(),
119                type_len: column.data_type().type_len(),
120                udt_type: column.data_type().pg_name().into(),
121            })
122    });
123
124    let table_rows = schema
125        .iter_table_mv_indices_with_acl(current_user)
126        .flat_map(|table| {
127            let schema = table.column_schema();
128            table
129                .columns
130                .iter()
131                .enumerate()
132                .map(move |(index, column)| RwColumn {
133                    relation_id: table.id.table_id as i32,
134                    name: column.name().into(),
135                    position: index as i32 + 1,
136                    is_hidden: column.is_hidden,
137                    is_primary_key: table.pk().iter().any(|idx| idx.column_index == index),
138                    is_distribution_key: table.distribution_key.contains(&index),
139                    is_generated: column.is_generated(),
140                    is_nullable: column.nullable(),
141                    generation_expression: column.generated_expr().map(|expr_node| {
142                        let expr = ExprImpl::from_expr_proto(expr_node).unwrap();
143                        let expr_display = ExprDisplay {
144                            expr: &expr,
145                            input_schema: &schema,
146                        };
147                        expr_display.to_string()
148                    }),
149                    data_type: column.data_type().to_string(),
150                    type_oid: column.data_type().to_oid(),
151                    type_len: column.data_type().type_len(),
152                    udt_type: column.data_type().pg_name().into(),
153                })
154        });
155
156    let schema_rows = schema
157        .iter_source_with_acl(current_user)
158        .flat_map(|source| {
159            source
160                .columns
161                .iter()
162                .enumerate()
163                .map(move |(index, column)| RwColumn {
164                    relation_id: source.id as i32,
165                    name: column.name().into(),
166                    position: index as i32 + 1,
167                    is_hidden: column.is_hidden,
168                    is_primary_key: source.pk_col_ids.contains(&column.column_id()),
169                    is_distribution_key: false,
170                    is_generated: false,
171                    is_nullable: column.nullable(),
172                    generation_expression: None,
173                    data_type: column.data_type().to_string(),
174                    type_oid: column.data_type().to_oid(),
175                    type_len: column.data_type().type_len(),
176                    udt_type: column.data_type().pg_name().into(),
177                })
178        });
179
180    view_rows
181        .chain(sink_rows)
182        .chain(catalog_rows)
183        .chain(table_rows)
184        .chain(schema_rows)
185        .collect()
186}