risingwave_frontend/handler/
describe.rs1use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::CatalogError;
36use crate::error::{ErrorCode, Result};
37use crate::handler::show::ShowColumnName;
38use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
39
40pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
41 let session = handler_args.session;
42 let mut binder = Binder::new_for_system(&session);
43
44 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
45 let not_found_err =
46 CatalogError::NotFound("table, source, sink or view", object_name.to_string());
47
48 let (columns, pk_columns, dist_columns, indices, relname, description) = if let Ok(relation) =
50 binder.bind_relation_by_name(object_name.clone(), None, None, false)
51 {
52 match relation {
53 Relation::Source(s) => {
54 let pk_column_catalogs = s
55 .catalog
56 .pk_col_ids
57 .iter()
58 .map(|&column_id| {
59 s.catalog
60 .columns
61 .iter()
62 .filter(|x| x.column_id() == column_id)
63 .map(|x| x.column_desc.clone())
64 .exactly_one()
65 .unwrap()
66 })
67 .collect_vec();
68 (
69 s.catalog.columns,
70 pk_column_catalogs,
71 vec![],
72 vec![],
73 s.catalog.name,
74 None, )
76 }
77 Relation::BaseTable(t) => {
78 let pk_column_catalogs = t
79 .table_catalog
80 .pk()
81 .iter()
82 .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
83 .collect_vec();
84 let dist_columns = t
85 .table_catalog
86 .distribution_key()
87 .iter()
88 .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
89 .collect_vec();
90 (
91 t.table_catalog.columns.clone(),
92 pk_column_catalogs,
93 dist_columns,
94 t.table_indexes,
95 t.table_catalog.name.clone(),
96 t.table_catalog.description.clone(),
97 )
98 }
99 Relation::SystemTable(t) => {
100 let pk_column_catalogs = t
101 .sys_table_catalog
102 .pk
103 .iter()
104 .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
105 .collect_vec();
106 (
107 t.sys_table_catalog.columns.clone(),
108 pk_column_catalogs,
109 vec![],
110 vec![],
111 t.sys_table_catalog.name.clone(),
112 None, )
114 }
115 Relation::Share(_) => {
116 if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
117 let columns = view
118 .view_catalog
119 .columns
120 .iter()
121 .enumerate()
122 .map(|(idx, field)| ColumnCatalog {
123 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
124 is_hidden: false,
125 })
126 .collect();
127 (
128 columns,
129 vec![],
130 vec![],
131 vec![],
132 view.view_catalog.name.clone(),
133 None,
134 )
135 } else {
136 return Err(not_found_err.into());
137 }
138 }
139 _ => {
140 return Err(not_found_err.into());
141 }
142 }
143 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
144 let columns = sink.sink_catalog.full_columns().to_vec();
145 let pk_columns = sink
146 .sink_catalog
147 .downstream_pk_indices()
148 .into_iter()
149 .map(|idx| columns[idx].column_desc.clone())
150 .collect_vec();
151 let dist_columns = sink
152 .sink_catalog
153 .distribution_key
154 .iter()
155 .map(|idx| columns[*idx].column_desc.clone())
156 .collect_vec();
157 (
158 columns,
159 pk_columns,
160 dist_columns,
161 vec![],
162 sink.sink_catalog.name.clone(),
163 None,
164 )
165 } else {
166 return Err(not_found_err.into());
167 };
168
169 let mut rows = columns
171 .into_iter()
172 .flat_map(ShowColumnRow::from_catalog)
173 .collect_vec();
174
175 fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
176 where
177 T: Display,
178 {
179 format!(
180 "{}",
181 display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
182 )
183 }
184
185 if !pk_columns.is_empty() {
187 rows.push(ShowColumnRow {
188 name: ShowColumnName::special("primary key"),
189 r#type: concat(pk_columns.iter().map(|x| &x.name)),
190 is_hidden: None,
191 description: None,
192 });
193 }
194
195 if !dist_columns.is_empty() {
197 rows.push(ShowColumnRow {
198 name: ShowColumnName::special("distribution key"),
199 r#type: concat(dist_columns.iter().map(|x| &x.name)),
200 is_hidden: None,
201 description: None,
202 });
203 }
204
205 rows.extend(indices.iter().map(|index| {
207 let index_display = index.display();
208
209 ShowColumnRow {
210 name: ShowColumnName::special(&index.name),
211 r#type: if index_display.include_columns.is_empty() {
212 format!(
213 "index({}) distributed by({})",
214 display_comma_separated(&index_display.index_columns_with_ordering),
215 display_comma_separated(&index_display.distributed_by_columns),
216 )
217 } else {
218 format!(
219 "index({}) include({}) distributed by({})",
220 display_comma_separated(&index_display.index_columns_with_ordering),
221 display_comma_separated(&index_display.include_columns),
222 display_comma_separated(&index_display.distributed_by_columns),
223 )
224 },
225 is_hidden: None,
226 description: None,
228 }
229 }));
230
231 rows.push(ShowColumnRow {
232 name: ShowColumnName::special("table description"),
233 r#type: relname,
234 is_hidden: None,
235 description,
236 });
237
238 Ok(PgResponse::builder(StatementType::DESCRIBE)
241 .rows(rows)
242 .into())
243}
244
245pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
246 match kind {
247 DescribeKind::Fragments => vec![PgFieldDescriptor::new(
248 "Fragments".to_owned(),
249 DataType::Varchar.to_oid(),
250 DataType::Varchar.type_len(),
251 )],
252 DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
253 }
254}
255
256pub async fn handle_describe_fragments(
257 handler_args: HandlerArgs,
258 object_name: ObjectName,
259) -> Result<RwPgResponse> {
260 let session = handler_args.session.clone();
261 let job_id = {
262 let mut binder = Binder::new_for_system(&session);
263
264 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
265 let not_found_err = CatalogError::NotFound("stream job", object_name.to_string());
266
267 if let Ok(relation) = binder.bind_catalog_relation_by_object_name(object_name.clone(), true)
268 {
269 match relation {
270 Relation::Source(s) => {
271 if s.is_shared() {
272 s.catalog.id
273 } else {
274 bail!(ErrorCode::NotSupported(
275 "non shared source has no fragments to describe".to_owned(),
276 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
277 ));
278 }
279 }
280 Relation::BaseTable(t) => t.table_catalog.id.table_id,
281 Relation::SystemTable(_t) => {
282 bail!(ErrorCode::NotSupported(
283 "system table has no fragments to describe".to_owned(),
284 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
285 ));
286 }
287 Relation::Share(_s) => {
288 bail!(ErrorCode::NotSupported(
289 "view has no fragments to describe".to_owned(),
290 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
291 ));
292 }
293 _ => {
294 return Err(not_found_err.into());
296 }
297 }
298 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
299 sink.sink_catalog.id.sink_id
300 } else {
301 return Err(not_found_err.into());
302 }
303 };
304
305 let meta_client = session.env().meta_client();
306 let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
307 let res = generate_fragments_string(fragments)?;
308 Ok(res)
309}
310
311fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
314 let mut config = PrettyConfig {
315 need_boundaries: false,
316 width: 80,
317 ..Default::default()
318 };
319
320 let mut max_width = 80;
321
322 let mut blocks = vec![];
323 for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
324 let mut res = String::new();
325 let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
326 res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
327 let node = &fragment.actors[0].node;
328 let node = explain_node(node.as_ref().unwrap(), true);
329 let width = config.unicode(&mut res, &node);
330 max_width = max(width, max_width);
331 config.width = max_width;
332 blocks.push(res);
333 blocks.push("".to_owned());
334 }
335
336 let rows = blocks.iter().map(|b| ExplainRow {
337 query_plan: b.into(),
338 });
339 Ok(PgResponse::builder(StatementType::DESCRIBE)
340 .rows(rows)
341 .into())
342}
343
344fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
345 let one_line_explain = node.identity.clone();
347
348 let mut fields = Vec::with_capacity(3);
349 if verbose {
350 fields.push((
351 "output",
352 Pretty::Array(
353 node.fields
354 .iter()
355 .map(|f| Pretty::display(f.get_name()))
356 .collect(),
357 ),
358 ));
359 fields.push((
360 "stream key",
361 Pretty::Array(
362 node.stream_key
363 .iter()
364 .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
365 .collect(),
366 ),
367 ));
368 }
369 let children = node
370 .input
371 .iter()
372 .map(|input| explain_node(input, verbose))
373 .collect();
374 Pretty::simple_record(one_line_explain, fields, children)
375}
376
377pub async fn handle_describe_fragment(
378 handler_args: HandlerArgs,
379 fragment_id: u32,
380) -> Result<RwPgResponse> {
381 let session = handler_args.session.clone();
382 let meta_client = session.env().meta_client();
383 let distribution = &meta_client
384 .get_fragment_by_id(fragment_id)
385 .await?
386 .ok_or_else(|| CatalogError::NotFound("fragment", fragment_id.to_string()))?;
387 let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
388 Ok(res)
389}
390
391fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
392 let mut config = PrettyConfig {
393 need_boundaries: false,
394 width: 80,
395 ..Default::default()
396 };
397
398 let mut res = String::new();
399
400 res.push_str(&format!(
401 "Fragment {} (Table {})\n",
402 fragment_dist.fragment_id, fragment_dist.table_id
403 ));
404 let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
405 .unwrap_or(fragment::FragmentDistributionType::Unspecified);
406 res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
407 res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
408 res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
409
410 if !fragment_dist.state_table_ids.is_empty() {
411 res.push_str(&format!(
412 "State Tables: [{}]\n",
413 fragment_dist
414 .state_table_ids
415 .iter()
416 .map(|id| id.to_string())
417 .collect::<Vec<_>>()
418 .join(", ")
419 ));
420 }
421
422 if !fragment_dist.upstream_fragment_ids.is_empty() {
423 res.push_str(&format!(
424 "Upstream Fragments: [{}]\n",
425 fragment_dist
426 .upstream_fragment_ids
427 .iter()
428 .map(|id| id.to_string())
429 .collect::<Vec<_>>()
430 .join(", ")
431 ));
432 }
433
434 if let Some(node) = &fragment_dist.node {
435 res.push_str("Stream Plan:\n");
436 let node_pretty = explain_node(node, true);
437 config.unicode(&mut res, &node_pretty);
438 }
439
440 let rows = vec![ExplainRow { query_plan: res }];
441
442 Ok(PgResponse::builder(StatementType::DESCRIBE)
443 .rows(rows)
444 .into())
445}
446
447#[cfg(test)]
448mod tests {
449 use std::collections::HashMap;
450 use std::ops::Index;
451
452 use futures_async_stream::for_await;
453
454 use crate::test_utils::LocalFrontend;
455
456 #[tokio::test]
457 async fn test_describe_handler() {
458 let frontend = LocalFrontend::new(Default::default()).await;
459 frontend
460 .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
461 .await
462 .unwrap();
463
464 frontend
465 .run_sql("create index idx1 on t (v1 DESC, v2);")
466 .await
467 .unwrap();
468
469 let sql = "describe t";
470 let mut pg_response = frontend.run_sql(sql).await.unwrap();
471
472 let mut columns = HashMap::new();
473 #[for_await]
474 for row_set in pg_response.values_stream() {
475 let row_set = row_set.unwrap();
476 for row in row_set {
477 columns.insert(
478 std::str::from_utf8(row.index(0).as_ref().unwrap())
479 .unwrap()
480 .to_owned(),
481 std::str::from_utf8(row.index(1).as_ref().unwrap())
482 .unwrap()
483 .to_owned(),
484 );
485 }
486 }
487
488 let expected_columns: HashMap<String, String> = maplit::hashmap! {
489 "v1".into() => "integer".into(),
490 "v2".into() => "integer".into(),
491 "v3".into() => "integer".into(),
492 "v4".into() => "integer".into(),
493 "primary key".into() => "v3".into(),
494 "distribution key".into() => "v3".into(),
495 "_rw_timestamp".into() => "timestamp with time zone".into(),
496 "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
497 "table description".into() => "t".into(),
498 };
499
500 assert_eq!(columns, expected_columns);
501 }
502}