risingwave_frontend/handler/
describe.rs1use std::fmt::Display;
16
17use itertools::Itertools;
18use pgwire::pg_field_descriptor::PgFieldDescriptor;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
21use risingwave_common::types::Fields;
22use risingwave_sqlparser::ast::{ObjectName, display_comma_separated};
23
24use super::show::ShowColumnRow;
25use super::{RwPgResponse, fields_to_descriptors};
26use crate::binder::{Binder, Relation};
27use crate::catalog::CatalogError;
28use crate::error::Result;
29use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
30
31pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
32 let session = handler_args.session;
33 let mut binder = Binder::new_for_system(&session);
34
35 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
36 let not_found_err =
37 CatalogError::NotFound("table, source, sink or view", object_name.to_string());
38
39 let (columns, pk_columns, dist_columns, indices, relname, description) = if let Ok(relation) =
41 binder.bind_relation_by_name(object_name.clone(), None, None, false)
42 {
43 match relation {
44 Relation::Source(s) => {
45 let pk_column_catalogs = s
46 .catalog
47 .pk_col_ids
48 .iter()
49 .map(|&column_id| {
50 s.catalog
51 .columns
52 .iter()
53 .filter(|x| x.column_id() == column_id)
54 .map(|x| x.column_desc.clone())
55 .exactly_one()
56 .unwrap()
57 })
58 .collect_vec();
59 (
60 s.catalog.columns,
61 pk_column_catalogs,
62 vec![],
63 vec![],
64 s.catalog.name,
65 None, )
67 }
68 Relation::BaseTable(t) => {
69 let pk_column_catalogs = t
70 .table_catalog
71 .pk()
72 .iter()
73 .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
74 .collect_vec();
75 let dist_columns = t
76 .table_catalog
77 .distribution_key()
78 .iter()
79 .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
80 .collect_vec();
81 (
82 t.table_catalog.columns.clone(),
83 pk_column_catalogs,
84 dist_columns,
85 t.table_indexes,
86 t.table_catalog.name.clone(),
87 t.table_catalog.description.clone(),
88 )
89 }
90 Relation::SystemTable(t) => {
91 let pk_column_catalogs = t
92 .sys_table_catalog
93 .pk
94 .iter()
95 .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
96 .collect_vec();
97 (
98 t.sys_table_catalog.columns.clone(),
99 pk_column_catalogs,
100 vec![],
101 vec![],
102 t.sys_table_catalog.name.clone(),
103 None, )
105 }
106 Relation::Share(_) => {
107 if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
108 let columns = view
109 .view_catalog
110 .columns
111 .iter()
112 .enumerate()
113 .map(|(idx, field)| ColumnCatalog {
114 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
115 is_hidden: false,
116 })
117 .collect();
118 (
119 columns,
120 vec![],
121 vec![],
122 vec![],
123 view.view_catalog.name.clone(),
124 None,
125 )
126 } else {
127 return Err(not_found_err.into());
128 }
129 }
130 _ => {
131 return Err(not_found_err.into());
132 }
133 }
134 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
135 let columns = sink.sink_catalog.full_columns().to_vec();
136 let pk_columns = sink
137 .sink_catalog
138 .downstream_pk_indices()
139 .into_iter()
140 .map(|idx| columns[idx].column_desc.clone())
141 .collect_vec();
142 let dist_columns = sink
143 .sink_catalog
144 .distribution_key
145 .iter()
146 .map(|idx| columns[*idx].column_desc.clone())
147 .collect_vec();
148 (
149 columns,
150 pk_columns,
151 dist_columns,
152 vec![],
153 sink.sink_catalog.name.clone(),
154 None,
155 )
156 } else {
157 return Err(not_found_err.into());
158 };
159
160 let mut rows = columns
162 .into_iter()
163 .flat_map(ShowColumnRow::from_catalog)
164 .collect_vec();
165
166 fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
167 where
168 T: Display,
169 {
170 format!(
171 "{}",
172 display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
173 )
174 }
175
176 if !pk_columns.is_empty() {
178 rows.push(ShowColumnRow {
179 name: "primary key".into(),
180 r#type: concat(pk_columns.iter().map(|x| &x.name)),
181 is_hidden: None,
182 description: None,
183 });
184 }
185
186 if !dist_columns.is_empty() {
188 rows.push(ShowColumnRow {
189 name: "distribution key".into(),
190 r#type: concat(dist_columns.iter().map(|x| &x.name)),
191 is_hidden: None,
192 description: None,
193 });
194 }
195
196 rows.extend(indices.iter().map(|index| {
198 let index_display = index.display();
199
200 ShowColumnRow {
201 name: index.name.clone(),
202 r#type: if index_display.include_columns.is_empty() {
203 format!(
204 "index({}) distributed by({})",
205 display_comma_separated(&index_display.index_columns_with_ordering),
206 display_comma_separated(&index_display.distributed_by_columns),
207 )
208 } else {
209 format!(
210 "index({}) include({}) distributed by({})",
211 display_comma_separated(&index_display.index_columns_with_ordering),
212 display_comma_separated(&index_display.include_columns),
213 display_comma_separated(&index_display.distributed_by_columns),
214 )
215 },
216 is_hidden: None,
217 description: None,
219 }
220 }));
221
222 rows.push(ShowColumnRow {
223 name: "table description".into(),
224 r#type: relname,
225 is_hidden: None,
226 description,
227 });
228
229 Ok(PgResponse::builder(StatementType::DESCRIBE)
232 .rows(rows)
233 .into())
234}
235
236pub fn infer_describe() -> Vec<PgFieldDescriptor> {
237 fields_to_descriptors(ShowColumnRow::fields())
238}
239
240#[cfg(test)]
241mod tests {
242 use std::collections::HashMap;
243 use std::ops::Index;
244
245 use futures_async_stream::for_await;
246
247 use crate::test_utils::LocalFrontend;
248
249 #[tokio::test]
250 async fn test_describe_handler() {
251 let frontend = LocalFrontend::new(Default::default()).await;
252 frontend
253 .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
254 .await
255 .unwrap();
256
257 frontend
258 .run_sql("create index idx1 on t (v1 DESC, v2);")
259 .await
260 .unwrap();
261
262 let sql = "describe t";
263 let mut pg_response = frontend.run_sql(sql).await.unwrap();
264
265 let mut columns = HashMap::new();
266 #[for_await]
267 for row_set in pg_response.values_stream() {
268 let row_set = row_set.unwrap();
269 for row in row_set {
270 columns.insert(
271 std::str::from_utf8(row.index(0).as_ref().unwrap())
272 .unwrap()
273 .to_owned(),
274 std::str::from_utf8(row.index(1).as_ref().unwrap())
275 .unwrap()
276 .to_owned(),
277 );
278 }
279 }
280
281 let expected_columns: HashMap<String, String> = maplit::hashmap! {
282 "v1".into() => "integer".into(),
283 "v2".into() => "integer".into(),
284 "v3".into() => "integer".into(),
285 "v4".into() => "integer".into(),
286 "primary key".into() => "v3".into(),
287 "distribution key".into() => "v3".into(),
288 "_rw_timestamp".into() => "timestamp with time zone".into(),
289 "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
290 "table description".into() => "t".into(),
291 };
292
293 assert_eq!(columns, expected_columns);
294 }
295}