risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_columns.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::Fields;
16use risingwave_frontend_macro::system_catalog;
17
18use crate::catalog::schema_catalog::SchemaCatalog;
19use crate::catalog::system_catalog::SysCatalogReaderImpl;
20use crate::error::Result;
21use crate::expr::{ExprDisplay, ExprImpl};
22use crate::user::user_catalog::UserCatalog;
23
24#[derive(Fields)]
25#[primary_key(relation_id, name)]
26struct RwColumn {
27    relation_id: i32,
28    // belonged relation id
29    name: String,
30    // column name
31    position: i32,
32    // 1-indexed position
33    is_hidden: bool,
34    is_primary_key: bool,
35    is_distribution_key: bool,
36    is_generated: bool,
37    is_nullable: bool,
38    generation_expression: Option<String>,
39    data_type: String,
40    type_oid: i32,
41    type_len: i16,
42    udt_type: String,
43}
44
45#[system_catalog(table, "rw_catalog.rw_columns")]
46fn read_rw_columns(reader: &SysCatalogReaderImpl) -> Result<Vec<RwColumn>> {
47    let catalog_reader = reader.catalog_reader.read_guard();
48    let user_reader = reader.user_info_reader.read_guard();
49    let current_user = user_reader
50        .get_user_by_name(&reader.auth_context.user_name)
51        .expect("user not found");
52    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
53
54    Ok(schemas
55        .flat_map(|s| read_rw_columns_in_schema(current_user, s))
56        .collect())
57}
58
59fn read_rw_columns_in_schema(current_user: &UserCatalog, schema: &SchemaCatalog) -> Vec<RwColumn> {
60    let view_rows = schema.iter_view_with_acl(current_user).flat_map(|view| {
61        view.columns
62            .iter()
63            .enumerate()
64            .map(|(index, column)| RwColumn {
65                relation_id: view.id.as_i32_id(),
66                name: column.name.clone(),
67                position: index as i32 + 1,
68                is_hidden: false,
69                is_primary_key: false,
70                is_distribution_key: false,
71                is_generated: false,
72                is_nullable: true,
73                generation_expression: None,
74                data_type: column.data_type().to_string(),
75                type_oid: column.data_type().to_oid(),
76                type_len: column.data_type().type_len(),
77                udt_type: column.data_type().pg_name().into(),
78            })
79    });
80
81    let sink_rows = schema.iter_sink_with_acl(current_user).flat_map(|sink| {
82        sink.full_columns()
83            .iter()
84            .enumerate()
85            .map(|(index, column)| RwColumn {
86                relation_id: sink.id.as_i32_id(),
87                name: column.name().into(),
88                position: index as i32 + 1,
89                is_hidden: column.is_hidden,
90                is_primary_key: (sink.downstream_pk.as_ref())
91                    .map(|pk| pk.contains(&index))
92                    .unwrap_or(false),
93                is_distribution_key: sink.distribution_key.contains(&index),
94                is_generated: false,
95                is_nullable: column.nullable(),
96                generation_expression: None,
97                data_type: column.data_type().to_string(),
98                type_oid: column.data_type().to_oid(),
99                type_len: column.data_type().type_len(),
100                udt_type: column.data_type().pg_name().into(),
101            })
102    });
103
104    let catalog_rows = schema.iter_system_tables().flat_map(|table| {
105        table
106            .columns
107            .iter()
108            .enumerate()
109            .map(move |(index, column)| RwColumn {
110                relation_id: table.id.as_i32_id(),
111                name: column.name().into(),
112                position: index as i32 + 1,
113                is_hidden: column.is_hidden,
114                is_primary_key: table.pk.contains(&index),
115                is_distribution_key: false,
116                is_generated: false,
117                is_nullable: column.nullable(),
118                generation_expression: None,
119                data_type: column.data_type().to_string(),
120                type_oid: column.data_type().to_oid(),
121                type_len: column.data_type().type_len(),
122                udt_type: column.data_type().pg_name().into(),
123            })
124    });
125
126    let table_rows = schema
127        .iter_table_mv_indices_with_acl(current_user)
128        .flat_map(|table| {
129            let schema = table.column_schema();
130            table
131                .columns
132                .iter()
133                .enumerate()
134                .map(move |(index, column)| RwColumn {
135                    relation_id: table.id.as_i32_id(),
136                    name: column.name().into(),
137                    position: index as i32 + 1,
138                    is_hidden: column.is_hidden,
139                    is_primary_key: table.pk().iter().any(|idx| idx.column_index == index),
140                    is_distribution_key: table.distribution_key.contains(&index),
141                    is_generated: column.is_generated(),
142                    is_nullable: column.nullable(),
143                    generation_expression: column.generated_expr().map(|expr_node| {
144                        let expr = ExprImpl::from_expr_proto(expr_node).unwrap();
145                        let expr_display = ExprDisplay {
146                            expr: &expr,
147                            input_schema: &schema,
148                        };
149                        expr_display.to_string()
150                    }),
151                    data_type: column.data_type().to_string(),
152                    type_oid: column.data_type().to_oid(),
153                    type_len: column.data_type().type_len(),
154                    udt_type: column.data_type().pg_name().into(),
155                })
156        });
157
158    let schema_rows = schema
159        .iter_source_with_acl(current_user)
160        .filter(|source| source.associated_table_id.is_none())
161        .flat_map(|source| {
162            source
163                .columns
164                .iter()
165                .enumerate()
166                .map(move |(index, column)| RwColumn {
167                    relation_id: source.id.as_i32_id(),
168                    name: column.name().into(),
169                    position: index as i32 + 1,
170                    is_hidden: column.is_hidden,
171                    is_primary_key: source.pk_col_ids.contains(&column.column_id()),
172                    is_distribution_key: false,
173                    is_generated: false,
174                    is_nullable: column.nullable(),
175                    generation_expression: None,
176                    data_type: column.data_type().to_string(),
177                    type_oid: column.data_type().to_oid(),
178                    type_len: column.data_type().type_len(),
179                    udt_type: column.data_type().pg_name().into(),
180                })
181        });
182
183    view_rows
184        .chain(sink_rows)
185        .chain(catalog_rows)
186        .chain(table_rows)
187        .chain(schema_rows)
188        .collect()
189}