risingwave_frontend/handler/
describe.rs1use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::CatalogError;
36use crate::error::{ErrorCode, Result};
37use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
38
39pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
40 let session = handler_args.session;
41 let mut binder = Binder::new_for_system(&session);
42
43 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
44 let not_found_err =
45 CatalogError::NotFound("table, source, sink or view", object_name.to_string());
46
47 let (columns, pk_columns, dist_columns, indices, relname, description) = if let Ok(relation) =
49 binder.bind_relation_by_name(object_name.clone(), None, None, false)
50 {
51 match relation {
52 Relation::Source(s) => {
53 let pk_column_catalogs = s
54 .catalog
55 .pk_col_ids
56 .iter()
57 .map(|&column_id| {
58 s.catalog
59 .columns
60 .iter()
61 .filter(|x| x.column_id() == column_id)
62 .map(|x| x.column_desc.clone())
63 .exactly_one()
64 .unwrap()
65 })
66 .collect_vec();
67 (
68 s.catalog.columns,
69 pk_column_catalogs,
70 vec![],
71 vec![],
72 s.catalog.name,
73 None, )
75 }
76 Relation::BaseTable(t) => {
77 let pk_column_catalogs = t
78 .table_catalog
79 .pk()
80 .iter()
81 .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
82 .collect_vec();
83 let dist_columns = t
84 .table_catalog
85 .distribution_key()
86 .iter()
87 .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
88 .collect_vec();
89 (
90 t.table_catalog.columns.clone(),
91 pk_column_catalogs,
92 dist_columns,
93 t.table_indexes,
94 t.table_catalog.name.clone(),
95 t.table_catalog.description.clone(),
96 )
97 }
98 Relation::SystemTable(t) => {
99 let pk_column_catalogs = t
100 .sys_table_catalog
101 .pk
102 .iter()
103 .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
104 .collect_vec();
105 (
106 t.sys_table_catalog.columns.clone(),
107 pk_column_catalogs,
108 vec![],
109 vec![],
110 t.sys_table_catalog.name.clone(),
111 None, )
113 }
114 Relation::Share(_) => {
115 if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
116 let columns = view
117 .view_catalog
118 .columns
119 .iter()
120 .enumerate()
121 .map(|(idx, field)| ColumnCatalog {
122 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
123 is_hidden: false,
124 })
125 .collect();
126 (
127 columns,
128 vec![],
129 vec![],
130 vec![],
131 view.view_catalog.name.clone(),
132 None,
133 )
134 } else {
135 return Err(not_found_err.into());
136 }
137 }
138 _ => {
139 return Err(not_found_err.into());
140 }
141 }
142 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
143 let columns = sink.sink_catalog.full_columns().to_vec();
144 let pk_columns = sink
145 .sink_catalog
146 .downstream_pk_indices()
147 .into_iter()
148 .map(|idx| columns[idx].column_desc.clone())
149 .collect_vec();
150 let dist_columns = sink
151 .sink_catalog
152 .distribution_key
153 .iter()
154 .map(|idx| columns[*idx].column_desc.clone())
155 .collect_vec();
156 (
157 columns,
158 pk_columns,
159 dist_columns,
160 vec![],
161 sink.sink_catalog.name.clone(),
162 None,
163 )
164 } else {
165 return Err(not_found_err.into());
166 };
167
168 let mut rows = columns
170 .into_iter()
171 .flat_map(ShowColumnRow::from_catalog)
172 .collect_vec();
173
174 fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
175 where
176 T: Display,
177 {
178 format!(
179 "{}",
180 display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
181 )
182 }
183
184 if !pk_columns.is_empty() {
186 rows.push(ShowColumnRow {
187 name: "primary key".into(),
188 r#type: concat(pk_columns.iter().map(|x| &x.name)),
189 is_hidden: None,
190 description: None,
191 });
192 }
193
194 if !dist_columns.is_empty() {
196 rows.push(ShowColumnRow {
197 name: "distribution key".into(),
198 r#type: concat(dist_columns.iter().map(|x| &x.name)),
199 is_hidden: None,
200 description: None,
201 });
202 }
203
204 rows.extend(indices.iter().map(|index| {
206 let index_display = index.display();
207
208 ShowColumnRow {
209 name: index.name.clone(),
210 r#type: if index_display.include_columns.is_empty() {
211 format!(
212 "index({}) distributed by({})",
213 display_comma_separated(&index_display.index_columns_with_ordering),
214 display_comma_separated(&index_display.distributed_by_columns),
215 )
216 } else {
217 format!(
218 "index({}) include({}) distributed by({})",
219 display_comma_separated(&index_display.index_columns_with_ordering),
220 display_comma_separated(&index_display.include_columns),
221 display_comma_separated(&index_display.distributed_by_columns),
222 )
223 },
224 is_hidden: None,
225 description: None,
227 }
228 }));
229
230 rows.push(ShowColumnRow {
231 name: "table description".into(),
232 r#type: relname,
233 is_hidden: None,
234 description,
235 });
236
237 Ok(PgResponse::builder(StatementType::DESCRIBE)
240 .rows(rows)
241 .into())
242}
243
244pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
245 match kind {
246 DescribeKind::Fragments => vec![PgFieldDescriptor::new(
247 "Fragments".to_owned(),
248 DataType::Varchar.to_oid(),
249 DataType::Varchar.type_len(),
250 )],
251 DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
252 }
253}
254
255pub async fn handle_describe_fragments(
256 handler_args: HandlerArgs,
257 object_name: ObjectName,
258) -> Result<RwPgResponse> {
259 let session = handler_args.session.clone();
260 let job_id = {
261 let mut binder = Binder::new_for_system(&session);
262
263 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
264 let not_found_err = CatalogError::NotFound("stream job", object_name.to_string());
265
266 if let Ok(relation) = binder.bind_relation_by_name(object_name.clone(), None, None, false) {
267 match relation {
268 Relation::Source(s) => {
269 if s.is_shared() {
270 s.catalog.id
271 } else {
272 bail!(ErrorCode::NotSupported(
273 "non shared source has no fragments to describe".to_owned(),
274 "Use `DESCRIBE` instead.".to_owned(),
275 ));
276 }
277 }
278 Relation::BaseTable(t) => t.table_catalog.id.table_id,
279 Relation::SystemTable(_t) => {
280 bail!(ErrorCode::NotSupported(
281 "system table has no fragments to describe".to_owned(),
282 "Use `DESCRIBE` instead.".to_owned(),
283 ));
284 }
285 Relation::Share(_s) => {
286 bail!(ErrorCode::NotSupported(
287 "view has no fragments to describe".to_owned(),
288 "Use `DESCRIBE` instead.".to_owned(),
289 ));
290 }
291 _ => {
292 return Err(not_found_err.into());
294 }
295 }
296 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
297 sink.sink_catalog.id.sink_id
298 } else {
299 return Err(not_found_err.into());
300 }
301 };
302
303 let meta_client = session.env().meta_client();
304 let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
305 let res = generate_fragments_string(fragments)?;
306 Ok(res)
307}
308
309fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
312 let mut config = PrettyConfig {
313 need_boundaries: false,
314 width: 80,
315 ..Default::default()
316 };
317
318 let mut max_width = 80;
319
320 let mut blocks = vec![];
321 for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
322 let mut res = String::new();
323 let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
324 res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
325 let node = &fragment.actors[0].node;
326 let node = explain_node(node.as_ref().unwrap(), true);
327 let width = config.unicode(&mut res, &node);
328 max_width = max(width, max_width);
329 config.width = max_width;
330 blocks.push(res);
331 blocks.push("".to_owned());
332 }
333
334 let rows = blocks.iter().map(|b| ExplainRow {
335 query_plan: b.into(),
336 });
337 Ok(PgResponse::builder(StatementType::DESCRIBE)
338 .rows(rows)
339 .into())
340}
341
342fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
343 let one_line_explain = node.identity.clone();
345
346 let mut fields = Vec::with_capacity(3);
347 if verbose {
348 fields.push((
349 "output",
350 Pretty::Array(
351 node.fields
352 .iter()
353 .map(|f| Pretty::display(f.get_name()))
354 .collect(),
355 ),
356 ));
357 fields.push((
358 "stream key",
359 Pretty::Array(
360 node.stream_key
361 .iter()
362 .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
363 .collect(),
364 ),
365 ));
366 }
367 let children = node
368 .input
369 .iter()
370 .map(|input| explain_node(input, verbose))
371 .collect();
372 Pretty::simple_record(one_line_explain, fields, children)
373}
374
375pub async fn handle_describe_fragment(
376 handler_args: HandlerArgs,
377 fragment_id: u32,
378) -> Result<RwPgResponse> {
379 let session = handler_args.session.clone();
380 let meta_client = session.env().meta_client();
381 let distribution = &meta_client
382 .get_fragment_by_id(fragment_id)
383 .await?
384 .ok_or_else(|| CatalogError::NotFound("fragment", fragment_id.to_string()))?;
385 let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
386 Ok(res)
387}
388
389fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
390 let mut config = PrettyConfig {
391 need_boundaries: false,
392 width: 80,
393 ..Default::default()
394 };
395
396 let mut res = String::new();
397
398 res.push_str(&format!(
399 "Fragment {} (Table {})\n",
400 fragment_dist.fragment_id, fragment_dist.table_id
401 ));
402 let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
403 .unwrap_or(fragment::FragmentDistributionType::Unspecified);
404 res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
405 res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
406 res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
407
408 if !fragment_dist.state_table_ids.is_empty() {
409 res.push_str(&format!(
410 "State Tables: [{}]\n",
411 fragment_dist
412 .state_table_ids
413 .iter()
414 .map(|id| id.to_string())
415 .collect::<Vec<_>>()
416 .join(", ")
417 ));
418 }
419
420 if !fragment_dist.upstream_fragment_ids.is_empty() {
421 res.push_str(&format!(
422 "Upstream Fragments: [{}]\n",
423 fragment_dist
424 .upstream_fragment_ids
425 .iter()
426 .map(|id| id.to_string())
427 .collect::<Vec<_>>()
428 .join(", ")
429 ));
430 }
431
432 if let Some(node) = &fragment_dist.node {
433 res.push_str("Stream Plan:\n");
434 let node_pretty = explain_node(node, true);
435 config.unicode(&mut res, &node_pretty);
436 }
437
438 let rows = vec![ExplainRow { query_plan: res }];
439
440 Ok(PgResponse::builder(StatementType::DESCRIBE)
441 .rows(rows)
442 .into())
443}
444
445#[cfg(test)]
446mod tests {
447 use std::collections::HashMap;
448 use std::ops::Index;
449
450 use futures_async_stream::for_await;
451
452 use crate::test_utils::LocalFrontend;
453
454 #[tokio::test]
455 async fn test_describe_handler() {
456 let frontend = LocalFrontend::new(Default::default()).await;
457 frontend
458 .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
459 .await
460 .unwrap();
461
462 frontend
463 .run_sql("create index idx1 on t (v1 DESC, v2);")
464 .await
465 .unwrap();
466
467 let sql = "describe t";
468 let mut pg_response = frontend.run_sql(sql).await.unwrap();
469
470 let mut columns = HashMap::new();
471 #[for_await]
472 for row_set in pg_response.values_stream() {
473 let row_set = row_set.unwrap();
474 for row in row_set {
475 columns.insert(
476 std::str::from_utf8(row.index(0).as_ref().unwrap())
477 .unwrap()
478 .to_owned(),
479 std::str::from_utf8(row.index(1).as_ref().unwrap())
480 .unwrap()
481 .to_owned(),
482 );
483 }
484 }
485
486 let expected_columns: HashMap<String, String> = maplit::hashmap! {
487 "v1".into() => "integer".into(),
488 "v2".into() => "integer".into(),
489 "v3".into() => "integer".into(),
490 "v4".into() => "integer".into(),
491 "primary key".into() => "v3".into(),
492 "distribution key".into() => "v3".into(),
493 "_rw_timestamp".into() => "timestamp with time zone".into(),
494 "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
495 "table description".into() => "t".into(),
496 };
497
498 assert_eq!(columns, expected_columns);
499 }
500}