1use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::{CatalogError, FragmentId};
36use crate::error::{ErrorCode, Result};
37use crate::handler::show::ShowColumnName;
38use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
39
40pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
41 let session = handler_args.session;
42 let mut binder = Binder::new_for_system(&session);
43 let catalog_reader = session.env().catalog_reader().read_guard();
44
45 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
46 let not_found_err =
47 CatalogError::not_found("table, source, sink or view", object_name.to_string());
48
49 let (columns, pk_columns, dist_columns, indices, relname, description, target_table_name) =
51 if let Ok(relation) = binder.bind_relation_by_name(&object_name, None, None, false) {
52 match relation {
53 Relation::Source(s) => {
54 let pk_column_catalogs = s
55 .catalog
56 .pk_col_ids
57 .iter()
58 .map(|&column_id| {
59 Itertools::exactly_one(
60 s.catalog
61 .columns
62 .iter()
63 .filter(|x| x.column_id() == column_id)
64 .map(|x| x.column_desc.clone()),
65 )
66 .unwrap()
67 })
68 .collect_vec();
69 (
70 s.catalog.columns,
71 pk_column_catalogs,
72 vec![],
73 vec![],
74 s.catalog.name,
75 None, None,
77 )
78 }
79 Relation::BaseTable(t) => {
80 let pk_column_catalogs = t
81 .table_catalog
82 .pk()
83 .iter()
84 .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
85 .collect_vec();
86 let dist_columns = t
87 .table_catalog
88 .distribution_key()
89 .iter()
90 .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
91 .collect_vec();
92 (
93 t.table_catalog.columns.clone(),
94 pk_column_catalogs,
95 dist_columns,
96 t.table_indexes,
97 t.table_catalog.name.clone(),
98 t.table_catalog.description.clone(),
99 None,
100 )
101 }
102 Relation::SystemTable(t) => {
103 let pk_column_catalogs = t
104 .sys_table_catalog
105 .pk
106 .iter()
107 .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
108 .collect_vec();
109 (
110 t.sys_table_catalog.columns.clone(),
111 pk_column_catalogs,
112 vec![],
113 vec![],
114 t.sys_table_catalog.name.clone(),
115 None, None,
117 )
118 }
119 Relation::Share(_) => {
120 if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
121 let columns = view
122 .view_catalog
123 .columns
124 .iter()
125 .enumerate()
126 .map(|(idx, field)| ColumnCatalog {
127 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
128 is_hidden: false,
129 })
130 .collect();
131 (
132 columns,
133 vec![],
134 vec![],
135 vec![],
136 view.view_catalog.name.clone(),
137 None,
138 None,
139 )
140 } else {
141 return Err(not_found_err.into());
142 }
143 }
144 _ => {
145 return Err(not_found_err.into());
146 }
147 }
148 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
149 let columns = sink.sink_catalog.full_columns().to_vec();
150 let pk_columns = (sink.sink_catalog.downstream_pk.clone().unwrap_or_default())
151 .into_iter()
152 .map(|idx| columns[idx].column_desc.clone())
153 .collect_vec();
154 let dist_columns = sink
155 .sink_catalog
156 .distribution_key
157 .iter()
158 .map(|idx| columns[*idx].column_desc.clone())
159 .collect_vec();
160 let target_table_name = sink
161 .sink_catalog
162 .target_table
163 .and_then(|table_id| catalog_reader.get_table_name_by_id(table_id).ok());
164 (
165 columns,
166 pk_columns,
167 dist_columns,
168 vec![],
169 sink.sink_catalog.name.clone(),
170 None,
171 target_table_name,
172 )
173 } else {
174 return Err(not_found_err.into());
175 };
176
177 let mut rows = columns
179 .into_iter()
180 .flat_map(ShowColumnRow::from_catalog)
181 .collect_vec();
182
183 fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
184 where
185 T: Display,
186 {
187 format!(
188 "{}",
189 display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
190 )
191 }
192
193 if !pk_columns.is_empty() {
195 rows.push(ShowColumnRow {
196 name: ShowColumnName::special("primary key"),
197 r#type: concat(pk_columns.iter().map(|x| &x.name)),
198 is_hidden: None,
199 description: None,
200 });
201 }
202
203 if !dist_columns.is_empty() {
205 rows.push(ShowColumnRow {
206 name: ShowColumnName::special("distribution key"),
207 r#type: concat(dist_columns.iter().map(|x| &x.name)),
208 is_hidden: None,
209 description: None,
210 });
211 }
212
213 rows.extend(indices.iter().map(|index| {
215 let index_display = index.display();
216
217 ShowColumnRow {
218 name: ShowColumnName::special(&index.name),
219 r#type: if index_display.include_columns.is_empty() {
220 format!(
221 "index({}) distributed by({})",
222 display_comma_separated(&index_display.index_columns_with_ordering),
223 display_comma_separated(&index_display.distributed_by_columns),
224 )
225 } else {
226 format!(
227 "index({}) include({}) distributed by({})",
228 display_comma_separated(&index_display.index_columns_with_ordering),
229 display_comma_separated(&index_display.include_columns),
230 display_comma_separated(&index_display.distributed_by_columns),
231 )
232 },
233 is_hidden: None,
234 description: None,
236 }
237 }));
238
239 rows.push(ShowColumnRow {
240 name: ShowColumnName::special("table description"),
241 r#type: relname,
242 is_hidden: None,
243 description,
244 });
245
246 if let Some(target_table) = target_table_name {
247 rows.push(ShowColumnRow {
248 name: ShowColumnName::special("target table name"),
249 r#type: target_table,
250 is_hidden: None,
251 description: None,
252 });
253 };
254
255 Ok(PgResponse::builder(StatementType::DESCRIBE)
258 .rows(rows)
259 .into())
260}
261
262pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
263 match kind {
264 DescribeKind::Fragments => vec![PgFieldDescriptor::new(
265 "Fragments".to_owned(),
266 DataType::Varchar.to_oid(),
267 DataType::Varchar.type_len(),
268 )],
269 DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
270 }
271}
272
273pub async fn handle_describe_fragments(
274 handler_args: HandlerArgs,
275 object_name: ObjectName,
276) -> Result<RwPgResponse> {
277 let session = handler_args.session.clone();
278 let job_id = {
279 let mut binder = Binder::new_for_system(&session);
280
281 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
282 let not_found_err = CatalogError::not_found("stream job", object_name.to_string());
283
284 if let Ok(relation) = binder.bind_catalog_relation_by_object_name(&object_name, true) {
285 match relation {
286 Relation::Source(s) => {
287 if s.is_shared() {
288 s.catalog.id.as_share_source_job_id()
289 } else {
290 bail!(ErrorCode::NotSupported(
291 "non shared source has no fragments to describe".to_owned(),
292 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
293 ));
294 }
295 }
296 Relation::BaseTable(t) => t.table_catalog.id.as_job_id(),
297 Relation::SystemTable(_t) => {
298 bail!(ErrorCode::NotSupported(
299 "system table has no fragments to describe".to_owned(),
300 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
301 ));
302 }
303 Relation::Share(_s) => {
304 bail!(ErrorCode::NotSupported(
305 "view has no fragments to describe".to_owned(),
306 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
307 ));
308 }
309 _ => {
310 return Err(not_found_err.into());
312 }
313 }
314 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
315 sink.sink_catalog.id.as_job_id()
316 } else {
317 return Err(not_found_err.into());
318 }
319 };
320
321 let meta_client = session.env().meta_client();
322 let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
323 let res = generate_fragments_string(fragments)?;
324 Ok(res)
325}
326
327fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
330 let mut config = PrettyConfig {
331 need_boundaries: false,
332 width: 80,
333 ..Default::default()
334 };
335
336 let mut max_width = 80;
337
338 let mut blocks = vec![];
339 for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
340 let mut res = String::new();
341 let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
342 res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
343 let node = &fragment.actors[0].node;
344 let node = explain_node(node.as_ref().unwrap(), true);
345 let width = config.unicode(&mut res, &node);
346 max_width = max(width, max_width);
347 config.width = max_width;
348 blocks.push(res);
349 blocks.push("".to_owned());
350 }
351
352 let rows = blocks.iter().map(|b| ExplainRow {
353 query_plan: b.into(),
354 });
355 Ok(PgResponse::builder(StatementType::DESCRIBE)
356 .rows(rows)
357 .into())
358}
359
360fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
361 let one_line_explain = node.identity.clone();
363
364 let mut fields = Vec::with_capacity(3);
365 if verbose {
366 fields.push((
367 "output",
368 Pretty::Array(
369 node.fields
370 .iter()
371 .map(|f| Pretty::display(f.get_name()))
372 .collect(),
373 ),
374 ));
375 fields.push((
376 "stream key",
377 Pretty::Array(
378 node.stream_key
379 .iter()
380 .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
381 .collect(),
382 ),
383 ));
384 }
385 let children = node
386 .input
387 .iter()
388 .map(|input| explain_node(input, verbose))
389 .collect();
390 Pretty::simple_record(one_line_explain, fields, children)
391}
392
393pub async fn handle_describe_fragment(
394 handler_args: HandlerArgs,
395 fragment_id: FragmentId,
396) -> Result<RwPgResponse> {
397 let session = handler_args.session.clone();
398 let meta_client = session.env().meta_client();
399 let distribution = &meta_client
400 .get_fragment_by_id(fragment_id)
401 .await?
402 .ok_or_else(|| CatalogError::not_found("fragment", fragment_id.to_string()))?;
403 let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
404 Ok(res)
405}
406
407fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
408 let mut config = PrettyConfig {
409 need_boundaries: false,
410 width: 80,
411 ..Default::default()
412 };
413
414 let mut res = String::new();
415
416 res.push_str(&format!(
417 "Fragment {} (Table {})\n",
418 fragment_dist.fragment_id, fragment_dist.table_id
419 ));
420 let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
421 .unwrap_or(fragment::FragmentDistributionType::Unspecified);
422 res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
423 res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
424 res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
425
426 if !fragment_dist.state_table_ids.is_empty() {
427 res.push_str(&format!(
428 "State Tables: [{}]\n",
429 fragment_dist
430 .state_table_ids
431 .iter()
432 .map(|id| id.to_string())
433 .collect::<Vec<_>>()
434 .join(", ")
435 ));
436 }
437
438 if !fragment_dist.upstream_fragment_ids.is_empty() {
439 res.push_str(&format!(
440 "Upstream Fragments: [{}]\n",
441 fragment_dist
442 .upstream_fragment_ids
443 .iter()
444 .map(|id| id.to_string())
445 .collect::<Vec<_>>()
446 .join(", ")
447 ));
448 }
449
450 if let Some(node) = &fragment_dist.node {
451 res.push_str("Stream Plan:\n");
452 let node_pretty = explain_node(node, true);
453 config.unicode(&mut res, &node_pretty);
454 }
455
456 let rows = vec![ExplainRow { query_plan: res }];
457
458 Ok(PgResponse::builder(StatementType::DESCRIBE)
459 .rows(rows)
460 .into())
461}
462
463#[cfg(test)]
464mod tests {
465 use std::collections::HashMap;
466 use std::ops::Index;
467
468 use futures_async_stream::for_await;
469
470 use crate::test_utils::LocalFrontend;
471
472 #[tokio::test]
473 async fn test_describe_handler() {
474 let frontend = LocalFrontend::new(Default::default()).await;
475 frontend
476 .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
477 .await
478 .unwrap();
479
480 frontend
481 .run_sql("create index idx1 on t (v1 DESC, v2);")
482 .await
483 .unwrap();
484
485 let sql = "describe t";
486 let mut pg_response = frontend.run_sql(sql).await.unwrap();
487
488 let mut columns = HashMap::new();
489 #[for_await]
490 for row_set in pg_response.values_stream() {
491 let row_set = row_set.unwrap();
492 for row in row_set {
493 columns.insert(
494 std::str::from_utf8(row.index(0).as_ref().unwrap())
495 .unwrap()
496 .to_owned(),
497 std::str::from_utf8(row.index(1).as_ref().unwrap())
498 .unwrap()
499 .to_owned(),
500 );
501 }
502 }
503
504 let expected_columns: HashMap<String, String> = maplit::hashmap! {
505 "v1".into() => "integer".into(),
506 "v2".into() => "integer".into(),
507 "v3".into() => "integer".into(),
508 "v4".into() => "integer".into(),
509 "primary key".into() => "v3".into(),
510 "distribution key".into() => "v3".into(),
511 "_rw_timestamp".into() => "timestamp with time zone".into(),
512 "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
513 "table description".into() => "t".into(),
514 };
515
516 assert_eq!(columns, expected_columns);
517 }
518
519 #[tokio::test]
520 async fn test_describe_handler_with_target_table() {
521 let frontend = LocalFrontend::new(Default::default()).await;
522 frontend.run_sql("CREATE TABLE t(v int);").await.unwrap();
523
524 frontend.run_sql("CREATE TABLE tt(v int);").await.unwrap();
525
526 frontend.run_sql("CREATE SINK ssss INTO tt AS SELECT * FROM t WITH (type = 'append-only', force_append_only = 'true');").await.unwrap();
527
528 let sql = "describe ssss;";
529 let mut pg_response = frontend.run_sql(sql).await.unwrap();
530
531 let mut columns = HashMap::new();
532 #[for_await]
533 for row_set in pg_response.values_stream() {
534 let row_set = row_set.unwrap();
535 for row in row_set {
536 columns.insert(
537 std::str::from_utf8(row.index(0).as_ref().unwrap())
538 .unwrap()
539 .to_owned(),
540 std::str::from_utf8(row.index(1).as_ref().unwrap())
541 .unwrap()
542 .to_owned(),
543 );
544 }
545 }
546
547 let expected_columns: HashMap<String, String> = maplit::hashmap! {
548 "v".into() => "integer".into(),
549 "\"t._row_id\"".into() => "serial".into(),
550 "distribution key".into() => "t._row_id".into(),
551 "table description".into() => "ssss".into(),
552 "target table name".into() => "tt".into(),
553 };
554
555 assert_eq!(columns, expected_columns);
556 }
557}