risingwave_frontend/handler/
describe.rs1use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::CatalogError;
36use crate::error::{ErrorCode, Result};
37use crate::handler::show::ShowColumnName;
38use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
39
40pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
41 let session = handler_args.session;
42 let mut binder = Binder::new_for_system(&session);
43
44 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
45 let not_found_err =
46 CatalogError::NotFound("table, source, sink or view", object_name.to_string());
47
48 let (columns, pk_columns, dist_columns, indices, relname, description) =
50 if let Ok(relation) = binder.bind_relation_by_name(&object_name, None, None, false) {
51 match relation {
52 Relation::Source(s) => {
53 let pk_column_catalogs = s
54 .catalog
55 .pk_col_ids
56 .iter()
57 .map(|&column_id| {
58 s.catalog
59 .columns
60 .iter()
61 .filter(|x| x.column_id() == column_id)
62 .map(|x| x.column_desc.clone())
63 .exactly_one()
64 .unwrap()
65 })
66 .collect_vec();
67 (
68 s.catalog.columns,
69 pk_column_catalogs,
70 vec![],
71 vec![],
72 s.catalog.name,
73 None, )
75 }
76 Relation::BaseTable(t) => {
77 let pk_column_catalogs = t
78 .table_catalog
79 .pk()
80 .iter()
81 .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
82 .collect_vec();
83 let dist_columns = t
84 .table_catalog
85 .distribution_key()
86 .iter()
87 .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
88 .collect_vec();
89 (
90 t.table_catalog.columns.clone(),
91 pk_column_catalogs,
92 dist_columns,
93 t.table_indexes,
94 t.table_catalog.name.clone(),
95 t.table_catalog.description.clone(),
96 )
97 }
98 Relation::SystemTable(t) => {
99 let pk_column_catalogs = t
100 .sys_table_catalog
101 .pk
102 .iter()
103 .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
104 .collect_vec();
105 (
106 t.sys_table_catalog.columns.clone(),
107 pk_column_catalogs,
108 vec![],
109 vec![],
110 t.sys_table_catalog.name.clone(),
111 None, )
113 }
114 Relation::Share(_) => {
115 if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
116 let columns = view
117 .view_catalog
118 .columns
119 .iter()
120 .enumerate()
121 .map(|(idx, field)| ColumnCatalog {
122 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
123 is_hidden: false,
124 })
125 .collect();
126 (
127 columns,
128 vec![],
129 vec![],
130 vec![],
131 view.view_catalog.name.clone(),
132 None,
133 )
134 } else {
135 return Err(not_found_err.into());
136 }
137 }
138 _ => {
139 return Err(not_found_err.into());
140 }
141 }
142 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
143 let columns = sink.sink_catalog.full_columns().to_vec();
144 let pk_columns = (sink.sink_catalog.downstream_pk.clone().unwrap_or_default())
145 .into_iter()
146 .map(|idx| columns[idx].column_desc.clone())
147 .collect_vec();
148 let dist_columns = sink
149 .sink_catalog
150 .distribution_key
151 .iter()
152 .map(|idx| columns[*idx].column_desc.clone())
153 .collect_vec();
154 (
155 columns,
156 pk_columns,
157 dist_columns,
158 vec![],
159 sink.sink_catalog.name.clone(),
160 None,
161 )
162 } else {
163 return Err(not_found_err.into());
164 };
165
166 let mut rows = columns
168 .into_iter()
169 .flat_map(ShowColumnRow::from_catalog)
170 .collect_vec();
171
172 fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
173 where
174 T: Display,
175 {
176 format!(
177 "{}",
178 display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
179 )
180 }
181
182 if !pk_columns.is_empty() {
184 rows.push(ShowColumnRow {
185 name: ShowColumnName::special("primary key"),
186 r#type: concat(pk_columns.iter().map(|x| &x.name)),
187 is_hidden: None,
188 description: None,
189 });
190 }
191
192 if !dist_columns.is_empty() {
194 rows.push(ShowColumnRow {
195 name: ShowColumnName::special("distribution key"),
196 r#type: concat(dist_columns.iter().map(|x| &x.name)),
197 is_hidden: None,
198 description: None,
199 });
200 }
201
202 rows.extend(indices.iter().map(|index| {
204 let index_display = index.display();
205
206 ShowColumnRow {
207 name: ShowColumnName::special(&index.name),
208 r#type: if index_display.include_columns.is_empty() {
209 format!(
210 "index({}) distributed by({})",
211 display_comma_separated(&index_display.index_columns_with_ordering),
212 display_comma_separated(&index_display.distributed_by_columns),
213 )
214 } else {
215 format!(
216 "index({}) include({}) distributed by({})",
217 display_comma_separated(&index_display.index_columns_with_ordering),
218 display_comma_separated(&index_display.include_columns),
219 display_comma_separated(&index_display.distributed_by_columns),
220 )
221 },
222 is_hidden: None,
223 description: None,
225 }
226 }));
227
228 rows.push(ShowColumnRow {
229 name: ShowColumnName::special("table description"),
230 r#type: relname,
231 is_hidden: None,
232 description,
233 });
234
235 Ok(PgResponse::builder(StatementType::DESCRIBE)
238 .rows(rows)
239 .into())
240}
241
242pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
243 match kind {
244 DescribeKind::Fragments => vec![PgFieldDescriptor::new(
245 "Fragments".to_owned(),
246 DataType::Varchar.to_oid(),
247 DataType::Varchar.type_len(),
248 )],
249 DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
250 }
251}
252
253pub async fn handle_describe_fragments(
254 handler_args: HandlerArgs,
255 object_name: ObjectName,
256) -> Result<RwPgResponse> {
257 let session = handler_args.session.clone();
258 let job_id = {
259 let mut binder = Binder::new_for_system(&session);
260
261 Binder::validate_cross_db_reference(&session.database(), &object_name)?;
262 let not_found_err = CatalogError::NotFound("stream job", object_name.to_string());
263
264 if let Ok(relation) = binder.bind_catalog_relation_by_object_name(&object_name, true) {
265 match relation {
266 Relation::Source(s) => {
267 if s.is_shared() {
268 s.catalog.id
269 } else {
270 bail!(ErrorCode::NotSupported(
271 "non shared source has no fragments to describe".to_owned(),
272 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
273 ));
274 }
275 }
276 Relation::BaseTable(t) => t.table_catalog.id.as_raw_id(),
277 Relation::SystemTable(_t) => {
278 bail!(ErrorCode::NotSupported(
279 "system table has no fragments to describe".to_owned(),
280 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
281 ));
282 }
283 Relation::Share(_s) => {
284 bail!(ErrorCode::NotSupported(
285 "view has no fragments to describe".to_owned(),
286 "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
287 ));
288 }
289 _ => {
290 return Err(not_found_err.into());
292 }
293 }
294 } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
295 sink.sink_catalog.id.sink_id
296 } else {
297 return Err(not_found_err.into());
298 }
299 };
300
301 let meta_client = session.env().meta_client();
302 let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
303 let res = generate_fragments_string(fragments)?;
304 Ok(res)
305}
306
307fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
310 let mut config = PrettyConfig {
311 need_boundaries: false,
312 width: 80,
313 ..Default::default()
314 };
315
316 let mut max_width = 80;
317
318 let mut blocks = vec![];
319 for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
320 let mut res = String::new();
321 let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
322 res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
323 let node = &fragment.actors[0].node;
324 let node = explain_node(node.as_ref().unwrap(), true);
325 let width = config.unicode(&mut res, &node);
326 max_width = max(width, max_width);
327 config.width = max_width;
328 blocks.push(res);
329 blocks.push("".to_owned());
330 }
331
332 let rows = blocks.iter().map(|b| ExplainRow {
333 query_plan: b.into(),
334 });
335 Ok(PgResponse::builder(StatementType::DESCRIBE)
336 .rows(rows)
337 .into())
338}
339
340fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
341 let one_line_explain = node.identity.clone();
343
344 let mut fields = Vec::with_capacity(3);
345 if verbose {
346 fields.push((
347 "output",
348 Pretty::Array(
349 node.fields
350 .iter()
351 .map(|f| Pretty::display(f.get_name()))
352 .collect(),
353 ),
354 ));
355 fields.push((
356 "stream key",
357 Pretty::Array(
358 node.stream_key
359 .iter()
360 .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
361 .collect(),
362 ),
363 ));
364 }
365 let children = node
366 .input
367 .iter()
368 .map(|input| explain_node(input, verbose))
369 .collect();
370 Pretty::simple_record(one_line_explain, fields, children)
371}
372
373pub async fn handle_describe_fragment(
374 handler_args: HandlerArgs,
375 fragment_id: u32,
376) -> Result<RwPgResponse> {
377 let session = handler_args.session.clone();
378 let meta_client = session.env().meta_client();
379 let distribution = &meta_client
380 .get_fragment_by_id(fragment_id)
381 .await?
382 .ok_or_else(|| CatalogError::NotFound("fragment", fragment_id.to_string()))?;
383 let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
384 Ok(res)
385}
386
387fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
388 let mut config = PrettyConfig {
389 need_boundaries: false,
390 width: 80,
391 ..Default::default()
392 };
393
394 let mut res = String::new();
395
396 res.push_str(&format!(
397 "Fragment {} (Table {})\n",
398 fragment_dist.fragment_id, fragment_dist.table_id
399 ));
400 let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
401 .unwrap_or(fragment::FragmentDistributionType::Unspecified);
402 res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
403 res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
404 res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
405
406 if !fragment_dist.state_table_ids.is_empty() {
407 res.push_str(&format!(
408 "State Tables: [{}]\n",
409 fragment_dist
410 .state_table_ids
411 .iter()
412 .map(|id| id.to_string())
413 .collect::<Vec<_>>()
414 .join(", ")
415 ));
416 }
417
418 if !fragment_dist.upstream_fragment_ids.is_empty() {
419 res.push_str(&format!(
420 "Upstream Fragments: [{}]\n",
421 fragment_dist
422 .upstream_fragment_ids
423 .iter()
424 .map(|id| id.to_string())
425 .collect::<Vec<_>>()
426 .join(", ")
427 ));
428 }
429
430 if let Some(node) = &fragment_dist.node {
431 res.push_str("Stream Plan:\n");
432 let node_pretty = explain_node(node, true);
433 config.unicode(&mut res, &node_pretty);
434 }
435
436 let rows = vec![ExplainRow { query_plan: res }];
437
438 Ok(PgResponse::builder(StatementType::DESCRIBE)
439 .rows(rows)
440 .into())
441}
442
443#[cfg(test)]
444mod tests {
445 use std::collections::HashMap;
446 use std::ops::Index;
447
448 use futures_async_stream::for_await;
449
450 use crate::test_utils::LocalFrontend;
451
452 #[tokio::test]
453 async fn test_describe_handler() {
454 let frontend = LocalFrontend::new(Default::default()).await;
455 frontend
456 .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
457 .await
458 .unwrap();
459
460 frontend
461 .run_sql("create index idx1 on t (v1 DESC, v2);")
462 .await
463 .unwrap();
464
465 let sql = "describe t";
466 let mut pg_response = frontend.run_sql(sql).await.unwrap();
467
468 let mut columns = HashMap::new();
469 #[for_await]
470 for row_set in pg_response.values_stream() {
471 let row_set = row_set.unwrap();
472 for row in row_set {
473 columns.insert(
474 std::str::from_utf8(row.index(0).as_ref().unwrap())
475 .unwrap()
476 .to_owned(),
477 std::str::from_utf8(row.index(1).as_ref().unwrap())
478 .unwrap()
479 .to_owned(),
480 );
481 }
482 }
483
484 let expected_columns: HashMap<String, String> = maplit::hashmap! {
485 "v1".into() => "integer".into(),
486 "v2".into() => "integer".into(),
487 "v3".into() => "integer".into(),
488 "v4".into() => "integer".into(),
489 "primary key".into() => "v3".into(),
490 "distribution key".into() => "v3".into(),
491 "_rw_timestamp".into() => "timestamp with time zone".into(),
492 "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
493 "table description".into() => "t".into(),
494 };
495
496 assert_eq!(columns, expected_columns);
497 }
498}