risingwave_frontend/handler/
describe.rs1use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
26use risingwave_pb::stream_plan::StreamNode;
27use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
28
29use super::explain::ExplainRow;
30use super::show::ShowColumnRow;
31use super::{RwPgResponse, fields_to_descriptors};
32use crate::binder::{Binder, Relation};
33use crate::catalog::CatalogError;
34use crate::error::{ErrorCode, Result};
35use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
36
37pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
38 let session = handler_args.session;
39 let mut binder = Binder::new_for_system(&session);
40
41 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
42 let not_found_err =
43 CatalogError::NotFound("table, source, sink or view", object_name.to_string());
44
45 let (columns, pk_columns, dist_columns, indices, relname, description) = if let Ok(relation) =
47 binder.bind_relation_by_name(object_name.clone(), None, None, false)
48 {
49 match relation {
50 Relation::Source(s) => {
51 let pk_column_catalogs = s
52 .catalog
53 .pk_col_ids
54 .iter()
55 .map(|&column_id| {
56 s.catalog
57 .columns
58 .iter()
59 .filter(|x| x.column_id() == column_id)
60 .map(|x| x.column_desc.clone())
61 .exactly_one()
62 .unwrap()
63 })
64 .collect_vec();
65 (
66 s.catalog.columns,
67 pk_column_catalogs,
68 vec![],
69 vec![],
70 s.catalog.name,
71 None, )
73 }
74 Relation::BaseTable(t) => {
75 let pk_column_catalogs = t
76 .table_catalog
77 .pk()
78 .iter()
79 .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
80 .collect_vec();
81 let dist_columns = t
82 .table_catalog
83 .distribution_key()
84 .iter()
85 .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
86 .collect_vec();
87 (
88 t.table_catalog.columns.clone(),
89 pk_column_catalogs,
90 dist_columns,
91 t.table_indexes,
92 t.table_catalog.name.clone(),
93 t.table_catalog.description.clone(),
94 )
95 }
96 Relation::SystemTable(t) => {
97 let pk_column_catalogs = t
98 .sys_table_catalog
99 .pk
100 .iter()
101 .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
102 .collect_vec();
103 (
104 t.sys_table_catalog.columns.clone(),
105 pk_column_catalogs,
106 vec![],
107 vec![],
108 t.sys_table_catalog.name.clone(),
109 None, )
111 }
112 Relation::Share(_) => {
113 if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
114 let columns = view
115 .view_catalog
116 .columns
117 .iter()
118 .enumerate()
119 .map(|(idx, field)| ColumnCatalog {
120 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
121 is_hidden: false,
122 })
123 .collect();
124 (
125 columns,
126 vec![],
127 vec![],
128 vec![],
129 view.view_catalog.name.clone(),
130 None,
131 )
132 } else {
133 return Err(not_found_err.into());
134 }
135 }
136 _ => {
137 return Err(not_found_err.into());
138 }
139 }
140 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
141 let columns = sink.sink_catalog.full_columns().to_vec();
142 let pk_columns = sink
143 .sink_catalog
144 .downstream_pk_indices()
145 .into_iter()
146 .map(|idx| columns[idx].column_desc.clone())
147 .collect_vec();
148 let dist_columns = sink
149 .sink_catalog
150 .distribution_key
151 .iter()
152 .map(|idx| columns[*idx].column_desc.clone())
153 .collect_vec();
154 (
155 columns,
156 pk_columns,
157 dist_columns,
158 vec![],
159 sink.sink_catalog.name.clone(),
160 None,
161 )
162 } else {
163 return Err(not_found_err.into());
164 };
165
166 let mut rows = columns
168 .into_iter()
169 .flat_map(ShowColumnRow::from_catalog)
170 .collect_vec();
171
172 fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
173 where
174 T: Display,
175 {
176 format!(
177 "{}",
178 display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
179 )
180 }
181
182 if !pk_columns.is_empty() {
184 rows.push(ShowColumnRow {
185 name: "primary key".into(),
186 r#type: concat(pk_columns.iter().map(|x| &x.name)),
187 is_hidden: None,
188 description: None,
189 });
190 }
191
192 if !dist_columns.is_empty() {
194 rows.push(ShowColumnRow {
195 name: "distribution key".into(),
196 r#type: concat(dist_columns.iter().map(|x| &x.name)),
197 is_hidden: None,
198 description: None,
199 });
200 }
201
202 rows.extend(indices.iter().map(|index| {
204 let index_display = index.display();
205
206 ShowColumnRow {
207 name: index.name.clone(),
208 r#type: if index_display.include_columns.is_empty() {
209 format!(
210 "index({}) distributed by({})",
211 display_comma_separated(&index_display.index_columns_with_ordering),
212 display_comma_separated(&index_display.distributed_by_columns),
213 )
214 } else {
215 format!(
216 "index({}) include({}) distributed by({})",
217 display_comma_separated(&index_display.index_columns_with_ordering),
218 display_comma_separated(&index_display.include_columns),
219 display_comma_separated(&index_display.distributed_by_columns),
220 )
221 },
222 is_hidden: None,
223 description: None,
225 }
226 }));
227
228 rows.push(ShowColumnRow {
229 name: "table description".into(),
230 r#type: relname,
231 is_hidden: None,
232 description,
233 });
234
235 Ok(PgResponse::builder(StatementType::DESCRIBE)
238 .rows(rows)
239 .into())
240}
241
242pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
243 match kind {
244 DescribeKind::Fragments => vec![PgFieldDescriptor::new(
245 "Fragments".to_owned(),
246 DataType::Varchar.to_oid(),
247 DataType::Varchar.type_len(),
248 )],
249 DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
250 }
251}
252pub async fn handle_describe_fragments(
253 handler_args: HandlerArgs,
254 object_name: ObjectName,
255) -> Result<RwPgResponse> {
256 let session = handler_args.session.clone();
257 let job_id = {
258 let mut binder = Binder::new_for_system(&session);
259
260 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
261 let not_found_err = CatalogError::NotFound("stream job", object_name.to_string());
262
263 if let Ok(relation) = binder.bind_relation_by_name(object_name.clone(), None, None, false) {
264 match relation {
265 Relation::Source(s) => {
266 if s.is_shared() {
267 s.catalog.id
268 } else {
269 bail!(ErrorCode::NotSupported(
270 "non shared source has no fragments to describe".to_owned(),
271 "Use `DESCRIBE` instead.".to_owned(),
272 ));
273 }
274 }
275 Relation::BaseTable(t) => t.table_catalog.id.table_id,
276 Relation::SystemTable(_t) => {
277 bail!(ErrorCode::NotSupported(
278 "system table has no fragments to describe".to_owned(),
279 "Use `DESCRIBE` instead.".to_owned(),
280 ));
281 }
282 Relation::Share(_s) => {
283 bail!(ErrorCode::NotSupported(
284 "view has no fragments to describe".to_owned(),
285 "Use `DESCRIBE` instead.".to_owned(),
286 ));
287 }
288 _ => {
289 return Err(not_found_err.into());
291 }
292 }
293 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
294 sink.sink_catalog.id.sink_id
295 } else {
296 return Err(not_found_err.into());
297 }
298 };
299
300 let meta_client = session.env().meta_client();
301 let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
302 let res = generate_fragments_string(fragments)?;
303 Ok(res)
304}
305
306fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
309 let mut config = PrettyConfig {
310 need_boundaries: false,
311 width: 80,
312 ..Default::default()
313 };
314
315 let mut max_width = 80;
316
317 let mut blocks = vec![];
318 for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
319 let mut res = String::new();
320 let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
321 res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
322 let node = &fragment.actors[0].node;
323 let node = explain_node(node.as_ref().unwrap(), true);
324 let width = config.unicode(&mut res, &node);
325 max_width = max(width, max_width);
326 config.width = max_width;
327 blocks.push(res);
328 blocks.push("".to_owned());
329 }
330
331 let rows = blocks.iter().map(|b| ExplainRow {
332 query_plan: b.into(),
333 });
334 Ok(PgResponse::builder(StatementType::DESCRIBE)
335 .rows(rows)
336 .into())
337}
338
339fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
340 let one_line_explain = node.identity.clone();
342
343 let mut fields = Vec::with_capacity(3);
344 if verbose {
345 fields.push((
346 "output",
347 Pretty::Array(
348 node.fields
349 .iter()
350 .map(|f| Pretty::display(f.get_name()))
351 .collect(),
352 ),
353 ));
354 fields.push((
355 "stream key",
356 Pretty::Array(
357 node.stream_key
358 .iter()
359 .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
360 .collect(),
361 ),
362 ));
363 }
364 let children = node
365 .input
366 .iter()
367 .map(|input| explain_node(input, verbose))
368 .collect();
369 Pretty::simple_record(one_line_explain, fields, children)
370}
371
372#[cfg(test)]
373mod tests {
374 use std::collections::HashMap;
375 use std::ops::Index;
376
377 use futures_async_stream::for_await;
378
379 use crate::test_utils::LocalFrontend;
380
381 #[tokio::test]
382 async fn test_describe_handler() {
383 let frontend = LocalFrontend::new(Default::default()).await;
384 frontend
385 .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
386 .await
387 .unwrap();
388
389 frontend
390 .run_sql("create index idx1 on t (v1 DESC, v2);")
391 .await
392 .unwrap();
393
394 let sql = "describe t";
395 let mut pg_response = frontend.run_sql(sql).await.unwrap();
396
397 let mut columns = HashMap::new();
398 #[for_await]
399 for row_set in pg_response.values_stream() {
400 let row_set = row_set.unwrap();
401 for row in row_set {
402 columns.insert(
403 std::str::from_utf8(row.index(0).as_ref().unwrap())
404 .unwrap()
405 .to_owned(),
406 std::str::from_utf8(row.index(1).as_ref().unwrap())
407 .unwrap()
408 .to_owned(),
409 );
410 }
411 }
412
413 let expected_columns: HashMap<String, String> = maplit::hashmap! {
414 "v1".into() => "integer".into(),
415 "v2".into() => "integer".into(),
416 "v3".into() => "integer".into(),
417 "v4".into() => "integer".into(),
418 "primary key".into() => "v3".into(),
419 "distribution key".into() => "v3".into(),
420 "_rw_timestamp".into() => "timestamp with time zone".into(),
421 "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
422 "table description".into() => "t".into(),
423 };
424
425 assert_eq!(columns, expected_columns);
426 }
427}