risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_columns.rs1use risingwave_common::types::Fields;
16use risingwave_frontend_macro::system_catalog;
17use risingwave_pb::id::RelationId;
18
19use crate::catalog::schema_catalog::SchemaCatalog;
20use crate::catalog::system_catalog::SysCatalogReaderImpl;
21use crate::error::Result;
22use crate::expr::{ExprDisplay, ExprImpl};
23use crate::user::user_catalog::UserCatalog;
24
25#[derive(Fields)]
26#[primary_key(relation_id, name)]
27struct RwColumn {
28 relation_id: RelationId,
29 name: String,
31 position: i32,
33 is_hidden: bool,
35 is_primary_key: bool,
36 is_distribution_key: bool,
37 is_generated: bool,
38 is_nullable: bool,
39 generation_expression: Option<String>,
40 data_type: String,
41 type_oid: i32,
42 type_len: i16,
43 udt_type: String,
44}
45
46#[system_catalog(table, "rw_catalog.rw_columns")]
47fn read_rw_columns(reader: &SysCatalogReaderImpl) -> Result<Vec<RwColumn>> {
48 let catalog_reader = reader.catalog_reader.read_guard();
49 let user_reader = reader.user_info_reader.read_guard();
50 let current_user = user_reader
51 .get_user_by_name(&reader.auth_context.user_name)
52 .expect("user not found");
53 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
54
55 Ok(schemas
56 .flat_map(|s| read_rw_columns_in_schema(current_user, s))
57 .collect())
58}
59
60fn read_rw_columns_in_schema(current_user: &UserCatalog, schema: &SchemaCatalog) -> Vec<RwColumn> {
61 let view_rows = schema.iter_view_with_acl(current_user).flat_map(|view| {
62 view.columns
63 .iter()
64 .enumerate()
65 .map(|(index, column)| RwColumn {
66 relation_id: view.id.as_relation_id(),
67 name: column.name.clone(),
68 position: index as i32 + 1,
69 is_hidden: false,
70 is_primary_key: false,
71 is_distribution_key: false,
72 is_generated: false,
73 is_nullable: true,
74 generation_expression: None,
75 data_type: column.data_type().to_string(),
76 type_oid: column.data_type().to_oid(),
77 type_len: column.data_type().type_len(),
78 udt_type: column.data_type().pg_name().into(),
79 })
80 });
81
82 let sink_rows = schema.iter_sink_with_acl(current_user).flat_map(|sink| {
83 sink.full_columns()
84 .iter()
85 .enumerate()
86 .map(|(index, column)| RwColumn {
87 relation_id: sink.id.as_relation_id(),
88 name: column.name().into(),
89 position: index as i32 + 1,
90 is_hidden: column.is_hidden,
91 is_primary_key: (sink.downstream_pk.as_ref())
92 .map(|pk| pk.contains(&index))
93 .unwrap_or(false),
94 is_distribution_key: sink.distribution_key.contains(&index),
95 is_generated: false,
96 is_nullable: column.nullable(),
97 generation_expression: None,
98 data_type: column.data_type().to_string(),
99 type_oid: column.data_type().to_oid(),
100 type_len: column.data_type().type_len(),
101 udt_type: column.data_type().pg_name().into(),
102 })
103 });
104
105 let catalog_rows = schema.iter_system_tables().flat_map(|table| {
106 table
107 .columns
108 .iter()
109 .enumerate()
110 .map(move |(index, column)| RwColumn {
111 relation_id: table.id.as_relation_id(),
112 name: column.name().into(),
113 position: index as i32 + 1,
114 is_hidden: column.is_hidden,
115 is_primary_key: table.pk.contains(&index),
116 is_distribution_key: false,
117 is_generated: false,
118 is_nullable: column.nullable(),
119 generation_expression: None,
120 data_type: column.data_type().to_string(),
121 type_oid: column.data_type().to_oid(),
122 type_len: column.data_type().type_len(),
123 udt_type: column.data_type().pg_name().into(),
124 })
125 });
126
127 let table_rows = schema
128 .iter_table_mv_indices_with_acl(current_user)
129 .flat_map(|table| {
130 let schema = table.column_schema();
131 table
132 .columns
133 .iter()
134 .enumerate()
135 .map(move |(index, column)| RwColumn {
136 relation_id: table.id.as_relation_id(),
137 name: column.name().into(),
138 position: index as i32 + 1,
139 is_hidden: column.is_hidden,
140 is_primary_key: table.pk().iter().any(|idx| idx.column_index == index),
141 is_distribution_key: table.distribution_key.contains(&index),
142 is_generated: column.is_generated(),
143 is_nullable: column.nullable(),
144 generation_expression: column.generated_expr().map(|expr_node| {
145 let expr = ExprImpl::from_expr_proto(expr_node).unwrap();
146 let expr_display = ExprDisplay {
147 expr: &expr,
148 input_schema: &schema,
149 };
150 expr_display.to_string()
151 }),
152 data_type: column.data_type().to_string(),
153 type_oid: column.data_type().to_oid(),
154 type_len: column.data_type().type_len(),
155 udt_type: column.data_type().pg_name().into(),
156 })
157 });
158
159 let schema_rows = schema
160 .iter_source_with_acl(current_user)
161 .filter(|source| source.associated_table_id.is_none())
162 .flat_map(|source| {
163 source
164 .columns
165 .iter()
166 .enumerate()
167 .map(move |(index, column)| RwColumn {
168 relation_id: source.id.as_relation_id(),
169 name: column.name().into(),
170 position: index as i32 + 1,
171 is_hidden: column.is_hidden,
172 is_primary_key: source.pk_col_ids.contains(&column.column_id()),
173 is_distribution_key: false,
174 is_generated: false,
175 is_nullable: column.nullable(),
176 generation_expression: None,
177 data_type: column.data_type().to_string(),
178 type_oid: column.data_type().to_oid(),
179 type_len: column.data_type().type_len(),
180 udt_type: column.data_type().pg_name().into(),
181 })
182 });
183
184 view_rows
185 .chain(sink_rows)
186 .chain(catalog_rows)
187 .chain(table_rows)
188 .chain(schema_rows)
189 .collect()
190}