1use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::{CatalogError, FragmentId};
36use crate::error::{ErrorCode, Result};
37use crate::handler::show::ShowColumnName;
38use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
39
40pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
41 let session = handler_args.session;
42 let mut binder = Binder::new_for_system(&session);
43 let catalog_reader = session.env().catalog_reader().read_guard();
44
45 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
46 let not_found_err =
47 CatalogError::not_found("table, source, sink or view", object_name.to_string());
48
49 let (columns, pk_columns, dist_columns, indices, relname, description, target_table_name) =
51 if let Ok(relation) = binder.bind_relation_by_name(&object_name, None, None, false) {
52 match relation {
53 Relation::Source(s) => {
54 let pk_column_catalogs = s
55 .catalog
56 .pk_col_ids
57 .iter()
58 .map(|&column_id| {
59 s.catalog
60 .columns
61 .iter()
62 .filter(|x| x.column_id() == column_id)
63 .map(|x| x.column_desc.clone())
64 .exactly_one()
65 .unwrap()
66 })
67 .collect_vec();
68 (
69 s.catalog.columns,
70 pk_column_catalogs,
71 vec![],
72 vec![],
73 s.catalog.name,
74 None, None,
76 )
77 }
78 Relation::BaseTable(t) => {
79 let pk_column_catalogs = t
80 .table_catalog
81 .pk()
82 .iter()
83 .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
84 .collect_vec();
85 let dist_columns = t
86 .table_catalog
87 .distribution_key()
88 .iter()
89 .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
90 .collect_vec();
91 (
92 t.table_catalog.columns.clone(),
93 pk_column_catalogs,
94 dist_columns,
95 t.table_indexes,
96 t.table_catalog.name.clone(),
97 t.table_catalog.description.clone(),
98 None,
99 )
100 }
101 Relation::SystemTable(t) => {
102 let pk_column_catalogs = t
103 .sys_table_catalog
104 .pk
105 .iter()
106 .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
107 .collect_vec();
108 (
109 t.sys_table_catalog.columns.clone(),
110 pk_column_catalogs,
111 vec![],
112 vec![],
113 t.sys_table_catalog.name.clone(),
114 None, None,
116 )
117 }
118 Relation::Share(_) => {
119 if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
120 let columns = view
121 .view_catalog
122 .columns
123 .iter()
124 .enumerate()
125 .map(|(idx, field)| ColumnCatalog {
126 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
127 is_hidden: false,
128 })
129 .collect();
130 (
131 columns,
132 vec![],
133 vec![],
134 vec![],
135 view.view_catalog.name.clone(),
136 None,
137 None,
138 )
139 } else {
140 return Err(not_found_err.into());
141 }
142 }
143 _ => {
144 return Err(not_found_err.into());
145 }
146 }
147 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
148 let columns = sink.sink_catalog.full_columns().to_vec();
149 let pk_columns = (sink.sink_catalog.downstream_pk.clone().unwrap_or_default())
150 .into_iter()
151 .map(|idx| columns[idx].column_desc.clone())
152 .collect_vec();
153 let dist_columns = sink
154 .sink_catalog
155 .distribution_key
156 .iter()
157 .map(|idx| columns[*idx].column_desc.clone())
158 .collect_vec();
159 let target_table_name = sink
160 .sink_catalog
161 .target_table
162 .and_then(|table_id| catalog_reader.get_table_name_by_id(table_id).ok());
163 (
164 columns,
165 pk_columns,
166 dist_columns,
167 vec![],
168 sink.sink_catalog.name.clone(),
169 None,
170 target_table_name,
171 )
172 } else {
173 return Err(not_found_err.into());
174 };
175
176 let mut rows = columns
178 .into_iter()
179 .flat_map(ShowColumnRow::from_catalog)
180 .collect_vec();
181
182 fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
183 where
184 T: Display,
185 {
186 format!(
187 "{}",
188 display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
189 )
190 }
191
192 if !pk_columns.is_empty() {
194 rows.push(ShowColumnRow {
195 name: ShowColumnName::special("primary key"),
196 r#type: concat(pk_columns.iter().map(|x| &x.name)),
197 is_hidden: None,
198 description: None,
199 });
200 }
201
202 if !dist_columns.is_empty() {
204 rows.push(ShowColumnRow {
205 name: ShowColumnName::special("distribution key"),
206 r#type: concat(dist_columns.iter().map(|x| &x.name)),
207 is_hidden: None,
208 description: None,
209 });
210 }
211
212 rows.extend(indices.iter().map(|index| {
214 let index_display = index.display();
215
216 ShowColumnRow {
217 name: ShowColumnName::special(&index.name),
218 r#type: if index_display.include_columns.is_empty() {
219 format!(
220 "index({}) distributed by({})",
221 display_comma_separated(&index_display.index_columns_with_ordering),
222 display_comma_separated(&index_display.distributed_by_columns),
223 )
224 } else {
225 format!(
226 "index({}) include({}) distributed by({})",
227 display_comma_separated(&index_display.index_columns_with_ordering),
228 display_comma_separated(&index_display.include_columns),
229 display_comma_separated(&index_display.distributed_by_columns),
230 )
231 },
232 is_hidden: None,
233 description: None,
235 }
236 }));
237
238 rows.push(ShowColumnRow {
239 name: ShowColumnName::special("table description"),
240 r#type: relname,
241 is_hidden: None,
242 description,
243 });
244
245 if let Some(target_table) = target_table_name {
246 rows.push(ShowColumnRow {
247 name: ShowColumnName::special("target table name"),
248 r#type: target_table,
249 is_hidden: None,
250 description: None,
251 });
252 };
253
254 Ok(PgResponse::builder(StatementType::DESCRIBE)
257 .rows(rows)
258 .into())
259}
260
261pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
262 match kind {
263 DescribeKind::Fragments => vec![PgFieldDescriptor::new(
264 "Fragments".to_owned(),
265 DataType::Varchar.to_oid(),
266 DataType::Varchar.type_len(),
267 )],
268 DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
269 }
270}
271
272pub async fn handle_describe_fragments(
273 handler_args: HandlerArgs,
274 object_name: ObjectName,
275) -> Result<RwPgResponse> {
276 let session = handler_args.session.clone();
277 let job_id = {
278 let mut binder = Binder::new_for_system(&session);
279
280 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
281 let not_found_err = CatalogError::not_found("stream job", object_name.to_string());
282
283 if let Ok(relation) = binder.bind_catalog_relation_by_object_name(&object_name, true) {
284 match relation {
285 Relation::Source(s) => {
286 if s.is_shared() {
287 s.catalog.id.as_share_source_job_id()
288 } else {
289 bail!(ErrorCode::NotSupported(
290 "non shared source has no fragments to describe".to_owned(),
291 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
292 ));
293 }
294 }
295 Relation::BaseTable(t) => t.table_catalog.id.as_job_id(),
296 Relation::SystemTable(_t) => {
297 bail!(ErrorCode::NotSupported(
298 "system table has no fragments to describe".to_owned(),
299 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
300 ));
301 }
302 Relation::Share(_s) => {
303 bail!(ErrorCode::NotSupported(
304 "view has no fragments to describe".to_owned(),
305 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
306 ));
307 }
308 _ => {
309 return Err(not_found_err.into());
311 }
312 }
313 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
314 sink.sink_catalog.id.as_job_id()
315 } else {
316 return Err(not_found_err.into());
317 }
318 };
319
320 let meta_client = session.env().meta_client();
321 let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
322 let res = generate_fragments_string(fragments)?;
323 Ok(res)
324}
325
326fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
329 let mut config = PrettyConfig {
330 need_boundaries: false,
331 width: 80,
332 ..Default::default()
333 };
334
335 let mut max_width = 80;
336
337 let mut blocks = vec![];
338 for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
339 let mut res = String::new();
340 let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
341 res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
342 let node = &fragment.actors[0].node;
343 let node = explain_node(node.as_ref().unwrap(), true);
344 let width = config.unicode(&mut res, &node);
345 max_width = max(width, max_width);
346 config.width = max_width;
347 blocks.push(res);
348 blocks.push("".to_owned());
349 }
350
351 let rows = blocks.iter().map(|b| ExplainRow {
352 query_plan: b.into(),
353 });
354 Ok(PgResponse::builder(StatementType::DESCRIBE)
355 .rows(rows)
356 .into())
357}
358
359fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
360 let one_line_explain = node.identity.clone();
362
363 let mut fields = Vec::with_capacity(3);
364 if verbose {
365 fields.push((
366 "output",
367 Pretty::Array(
368 node.fields
369 .iter()
370 .map(|f| Pretty::display(f.get_name()))
371 .collect(),
372 ),
373 ));
374 fields.push((
375 "stream key",
376 Pretty::Array(
377 node.stream_key
378 .iter()
379 .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
380 .collect(),
381 ),
382 ));
383 }
384 let children = node
385 .input
386 .iter()
387 .map(|input| explain_node(input, verbose))
388 .collect();
389 Pretty::simple_record(one_line_explain, fields, children)
390}
391
392pub async fn handle_describe_fragment(
393 handler_args: HandlerArgs,
394 fragment_id: FragmentId,
395) -> Result<RwPgResponse> {
396 let session = handler_args.session.clone();
397 let meta_client = session.env().meta_client();
398 let distribution = &meta_client
399 .get_fragment_by_id(fragment_id)
400 .await?
401 .ok_or_else(|| CatalogError::not_found("fragment", fragment_id.to_string()))?;
402 let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
403 Ok(res)
404}
405
406fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
407 let mut config = PrettyConfig {
408 need_boundaries: false,
409 width: 80,
410 ..Default::default()
411 };
412
413 let mut res = String::new();
414
415 res.push_str(&format!(
416 "Fragment {} (Table {})\n",
417 fragment_dist.fragment_id, fragment_dist.table_id
418 ));
419 let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
420 .unwrap_or(fragment::FragmentDistributionType::Unspecified);
421 res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
422 res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
423 res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
424
425 if !fragment_dist.state_table_ids.is_empty() {
426 res.push_str(&format!(
427 "State Tables: [{}]\n",
428 fragment_dist
429 .state_table_ids
430 .iter()
431 .map(|id| id.to_string())
432 .collect::<Vec<_>>()
433 .join(", ")
434 ));
435 }
436
437 if !fragment_dist.upstream_fragment_ids.is_empty() {
438 res.push_str(&format!(
439 "Upstream Fragments: [{}]\n",
440 fragment_dist
441 .upstream_fragment_ids
442 .iter()
443 .map(|id| id.to_string())
444 .collect::<Vec<_>>()
445 .join(", ")
446 ));
447 }
448
449 if let Some(node) = &fragment_dist.node {
450 res.push_str("Stream Plan:\n");
451 let node_pretty = explain_node(node, true);
452 config.unicode(&mut res, &node_pretty);
453 }
454
455 let rows = vec![ExplainRow { query_plan: res }];
456
457 Ok(PgResponse::builder(StatementType::DESCRIBE)
458 .rows(rows)
459 .into())
460}
461
462#[cfg(test)]
463mod tests {
464 use std::collections::HashMap;
465 use std::ops::Index;
466
467 use futures_async_stream::for_await;
468
469 use crate::test_utils::LocalFrontend;
470
471 #[tokio::test]
472 async fn test_describe_handler() {
473 let frontend = LocalFrontend::new(Default::default()).await;
474 frontend
475 .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
476 .await
477 .unwrap();
478
479 frontend
480 .run_sql("create index idx1 on t (v1 DESC, v2);")
481 .await
482 .unwrap();
483
484 let sql = "describe t";
485 let mut pg_response = frontend.run_sql(sql).await.unwrap();
486
487 let mut columns = HashMap::new();
488 #[for_await]
489 for row_set in pg_response.values_stream() {
490 let row_set = row_set.unwrap();
491 for row in row_set {
492 columns.insert(
493 std::str::from_utf8(row.index(0).as_ref().unwrap())
494 .unwrap()
495 .to_owned(),
496 std::str::from_utf8(row.index(1).as_ref().unwrap())
497 .unwrap()
498 .to_owned(),
499 );
500 }
501 }
502
503 let expected_columns: HashMap<String, String> = maplit::hashmap! {
504 "v1".into() => "integer".into(),
505 "v2".into() => "integer".into(),
506 "v3".into() => "integer".into(),
507 "v4".into() => "integer".into(),
508 "primary key".into() => "v3".into(),
509 "distribution key".into() => "v3".into(),
510 "_rw_timestamp".into() => "timestamp with time zone".into(),
511 "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
512 "table description".into() => "t".into(),
513 };
514
515 assert_eq!(columns, expected_columns);
516 }
517
518 #[tokio::test]
519 async fn test_describe_handler_with_target_table() {
520 let frontend = LocalFrontend::new(Default::default()).await;
521 frontend.run_sql("CREATE TABLE t(v int);").await.unwrap();
522
523 frontend.run_sql("CREATE TABLE tt(v int);").await.unwrap();
524
525 frontend.run_sql("CREATE SINK ssss INTO tt AS SELECT * FROM t WITH (type = 'append-only', force_append_only = 'true');").await.unwrap();
526
527 let sql = "describe ssss;";
528 let mut pg_response = frontend.run_sql(sql).await.unwrap();
529
530 let mut columns = HashMap::new();
531 #[for_await]
532 for row_set in pg_response.values_stream() {
533 let row_set = row_set.unwrap();
534 for row in row_set {
535 columns.insert(
536 std::str::from_utf8(row.index(0).as_ref().unwrap())
537 .unwrap()
538 .to_owned(),
539 std::str::from_utf8(row.index(1).as_ref().unwrap())
540 .unwrap()
541 .to_owned(),
542 );
543 }
544 }
545
546 let expected_columns: HashMap<String, String> = maplit::hashmap! {
547 "v".into() => "integer".into(),
548 "\"t._row_id\"".into() => "serial".into(),
549 "distribution key".into() => "t._row_id".into(),
550 "table description".into() => "ssss".into(),
551 "target table name".into() => "tt".into(),
552 };
553
554 assert_eq!(columns, expected_columns);
555 }
556}