risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_columns.rs1use risingwave_common::types::Fields;
16use risingwave_frontend_macro::system_catalog;
17
18use crate::catalog::schema_catalog::SchemaCatalog;
19use crate::catalog::system_catalog::SysCatalogReaderImpl;
20use crate::error::Result;
21use crate::expr::{ExprDisplay, ExprImpl};
22use crate::user::user_catalog::UserCatalog;
23
24#[derive(Fields)]
25#[primary_key(relation_id, name)]
26struct RwColumn {
27 relation_id: i32,
28 name: String,
30 position: i32,
32 is_hidden: bool,
34 is_primary_key: bool,
35 is_distribution_key: bool,
36 is_generated: bool,
37 is_nullable: bool,
38 generation_expression: Option<String>,
39 data_type: String,
40 type_oid: i32,
41 type_len: i16,
42 udt_type: String,
43}
44
45#[system_catalog(table, "rw_catalog.rw_columns")]
46fn read_rw_columns(reader: &SysCatalogReaderImpl) -> Result<Vec<RwColumn>> {
47 let catalog_reader = reader.catalog_reader.read_guard();
48 let user_reader = reader.user_info_reader.read_guard();
49 let current_user = user_reader
50 .get_user_by_name(&reader.auth_context.user_name)
51 .expect("user not found");
52 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
53
54 Ok(schemas
55 .flat_map(|s| read_rw_columns_in_schema(current_user, s))
56 .collect())
57}
58
59fn read_rw_columns_in_schema(current_user: &UserCatalog, schema: &SchemaCatalog) -> Vec<RwColumn> {
60 let view_rows = schema.iter_view_with_acl(current_user).flat_map(|view| {
61 view.columns
62 .iter()
63 .enumerate()
64 .map(|(index, column)| RwColumn {
65 relation_id: view.id.as_i32_id(),
66 name: column.name.clone(),
67 position: index as i32 + 1,
68 is_hidden: false,
69 is_primary_key: false,
70 is_distribution_key: false,
71 is_generated: false,
72 is_nullable: true,
73 generation_expression: None,
74 data_type: column.data_type().to_string(),
75 type_oid: column.data_type().to_oid(),
76 type_len: column.data_type().type_len(),
77 udt_type: column.data_type().pg_name().into(),
78 })
79 });
80
81 let sink_rows = schema.iter_sink_with_acl(current_user).flat_map(|sink| {
82 sink.full_columns()
83 .iter()
84 .enumerate()
85 .map(|(index, column)| RwColumn {
86 relation_id: sink.id.as_i32_id(),
87 name: column.name().into(),
88 position: index as i32 + 1,
89 is_hidden: column.is_hidden,
90 is_primary_key: (sink.downstream_pk.as_ref())
91 .map(|pk| pk.contains(&index))
92 .unwrap_or(false),
93 is_distribution_key: sink.distribution_key.contains(&index),
94 is_generated: false,
95 is_nullable: column.nullable(),
96 generation_expression: None,
97 data_type: column.data_type().to_string(),
98 type_oid: column.data_type().to_oid(),
99 type_len: column.data_type().type_len(),
100 udt_type: column.data_type().pg_name().into(),
101 })
102 });
103
104 let catalog_rows = schema.iter_system_tables().flat_map(|table| {
105 table
106 .columns
107 .iter()
108 .enumerate()
109 .map(move |(index, column)| RwColumn {
110 relation_id: table.id.as_i32_id(),
111 name: column.name().into(),
112 position: index as i32 + 1,
113 is_hidden: column.is_hidden,
114 is_primary_key: table.pk.contains(&index),
115 is_distribution_key: false,
116 is_generated: false,
117 is_nullable: column.nullable(),
118 generation_expression: None,
119 data_type: column.data_type().to_string(),
120 type_oid: column.data_type().to_oid(),
121 type_len: column.data_type().type_len(),
122 udt_type: column.data_type().pg_name().into(),
123 })
124 });
125
126 let table_rows = schema
127 .iter_table_mv_indices_with_acl(current_user)
128 .flat_map(|table| {
129 let schema = table.column_schema();
130 table
131 .columns
132 .iter()
133 .enumerate()
134 .map(move |(index, column)| RwColumn {
135 relation_id: table.id.as_i32_id(),
136 name: column.name().into(),
137 position: index as i32 + 1,
138 is_hidden: column.is_hidden,
139 is_primary_key: table.pk().iter().any(|idx| idx.column_index == index),
140 is_distribution_key: table.distribution_key.contains(&index),
141 is_generated: column.is_generated(),
142 is_nullable: column.nullable(),
143 generation_expression: column.generated_expr().map(|expr_node| {
144 let expr = ExprImpl::from_expr_proto(expr_node).unwrap();
145 let expr_display = ExprDisplay {
146 expr: &expr,
147 input_schema: &schema,
148 };
149 expr_display.to_string()
150 }),
151 data_type: column.data_type().to_string(),
152 type_oid: column.data_type().to_oid(),
153 type_len: column.data_type().type_len(),
154 udt_type: column.data_type().pg_name().into(),
155 })
156 });
157
158 let schema_rows = schema
159 .iter_source_with_acl(current_user)
160 .filter(|source| source.associated_table_id.is_none())
161 .flat_map(|source| {
162 source
163 .columns
164 .iter()
165 .enumerate()
166 .map(move |(index, column)| RwColumn {
167 relation_id: source.id.as_i32_id(),
168 name: column.name().into(),
169 position: index as i32 + 1,
170 is_hidden: column.is_hidden,
171 is_primary_key: source.pk_col_ids.contains(&column.column_id()),
172 is_distribution_key: false,
173 is_generated: false,
174 is_nullable: column.nullable(),
175 generation_expression: None,
176 data_type: column.data_type().to_string(),
177 type_oid: column.data_type().to_oid(),
178 type_len: column.data_type().type_len(),
179 udt_type: column.data_type().pg_name().into(),
180 })
181 });
182
183 view_rows
184 .chain(sink_rows)
185 .chain(catalog_rows)
186 .chain(table_rows)
187 .chain(schema_rows)
188 .collect()
189}