risingwave_frontend/handler/
describe.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::{CatalogError, FragmentId};
36use crate::error::{ErrorCode, Result};
37use crate::handler::show::ShowColumnName;
38use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
39
40pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
41    let session = handler_args.session;
42    let mut binder = Binder::new_for_system(&session);
43    let catalog_reader = session.env().catalog_reader().read_guard();
44
45    Binder::validate_cross_db_reference(&session.database(), &object_name)?;
46    let not_found_err =
47        CatalogError::not_found("table, source, sink or view", object_name.to_string());
48
49    // Vec<ColumnCatalog>, Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>, String, Option<String>
50    let (columns, pk_columns, dist_columns, indices, relname, description, target_table_name) =
51        if let Ok(relation) = binder.bind_relation_by_name(&object_name, None, None, false) {
52            match relation {
53                Relation::Source(s) => {
54                    let pk_column_catalogs = s
55                        .catalog
56                        .pk_col_ids
57                        .iter()
58                        .map(|&column_id| {
59                            s.catalog
60                                .columns
61                                .iter()
62                                .filter(|x| x.column_id() == column_id)
63                                .map(|x| x.column_desc.clone())
64                                .exactly_one()
65                                .unwrap()
66                        })
67                        .collect_vec();
68                    (
69                        s.catalog.columns,
70                        pk_column_catalogs,
71                        vec![],
72                        vec![],
73                        s.catalog.name,
74                        None, // Description
75                        None,
76                    )
77                }
78                Relation::BaseTable(t) => {
79                    let pk_column_catalogs = t
80                        .table_catalog
81                        .pk()
82                        .iter()
83                        .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
84                        .collect_vec();
85                    let dist_columns = t
86                        .table_catalog
87                        .distribution_key()
88                        .iter()
89                        .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
90                        .collect_vec();
91                    (
92                        t.table_catalog.columns.clone(),
93                        pk_column_catalogs,
94                        dist_columns,
95                        t.table_indexes,
96                        t.table_catalog.name.clone(),
97                        t.table_catalog.description.clone(),
98                        None,
99                    )
100                }
101                Relation::SystemTable(t) => {
102                    let pk_column_catalogs = t
103                        .sys_table_catalog
104                        .pk
105                        .iter()
106                        .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
107                        .collect_vec();
108                    (
109                        t.sys_table_catalog.columns.clone(),
110                        pk_column_catalogs,
111                        vec![],
112                        vec![],
113                        t.sys_table_catalog.name.clone(),
114                        None, // Description
115                        None,
116                    )
117                }
118                Relation::Share(_) => {
119                    if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
120                        let columns = view
121                            .view_catalog
122                            .columns
123                            .iter()
124                            .enumerate()
125                            .map(|(idx, field)| ColumnCatalog {
126                                column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
127                                is_hidden: false,
128                            })
129                            .collect();
130                        (
131                            columns,
132                            vec![],
133                            vec![],
134                            vec![],
135                            view.view_catalog.name.clone(),
136                            None,
137                            None,
138                        )
139                    } else {
140                        return Err(not_found_err.into());
141                    }
142                }
143                _ => {
144                    return Err(not_found_err.into());
145                }
146            }
147        } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
148            let columns = sink.sink_catalog.full_columns().to_vec();
149            let pk_columns = (sink.sink_catalog.downstream_pk.clone().unwrap_or_default())
150                .into_iter()
151                .map(|idx| columns[idx].column_desc.clone())
152                .collect_vec();
153            let dist_columns = sink
154                .sink_catalog
155                .distribution_key
156                .iter()
157                .map(|idx| columns[*idx].column_desc.clone())
158                .collect_vec();
159            let target_table_name = sink
160                .sink_catalog
161                .target_table
162                .and_then(|table_id| catalog_reader.get_table_name_by_id(table_id).ok());
163            (
164                columns,
165                pk_columns,
166                dist_columns,
167                vec![],
168                sink.sink_catalog.name.clone(),
169                None,
170                target_table_name,
171            )
172        } else {
173            return Err(not_found_err.into());
174        };
175
176    // Convert all column descs to rows
177    let mut rows = columns
178        .into_iter()
179        .flat_map(ShowColumnRow::from_catalog)
180        .collect_vec();
181
182    fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
183    where
184        T: Display,
185    {
186        format!(
187            "{}",
188            display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
189        )
190    }
191
192    // Convert primary key to rows
193    if !pk_columns.is_empty() {
194        rows.push(ShowColumnRow {
195            name: ShowColumnName::special("primary key"),
196            r#type: concat(pk_columns.iter().map(|x| &x.name)),
197            is_hidden: None,
198            description: None,
199        });
200    }
201
202    // Convert distribution keys to rows
203    if !dist_columns.is_empty() {
204        rows.push(ShowColumnRow {
205            name: ShowColumnName::special("distribution key"),
206            r#type: concat(dist_columns.iter().map(|x| &x.name)),
207            is_hidden: None,
208            description: None,
209        });
210    }
211
212    // Convert all indexes to rows
213    rows.extend(indices.iter().map(|index| {
214        let index_display = index.display();
215
216        ShowColumnRow {
217            name: ShowColumnName::special(&index.name),
218            r#type: if index_display.include_columns.is_empty() {
219                format!(
220                    "index({}) distributed by({})",
221                    display_comma_separated(&index_display.index_columns_with_ordering),
222                    display_comma_separated(&index_display.distributed_by_columns),
223                )
224            } else {
225                format!(
226                    "index({}) include({}) distributed by({})",
227                    display_comma_separated(&index_display.index_columns_with_ordering),
228                    display_comma_separated(&index_display.include_columns),
229                    display_comma_separated(&index_display.distributed_by_columns),
230                )
231            },
232            is_hidden: None,
233            // TODO: index description
234            description: None,
235        }
236    }));
237
238    rows.push(ShowColumnRow {
239        name: ShowColumnName::special("table description"),
240        r#type: relname,
241        is_hidden: None,
242        description,
243    });
244
245    if let Some(target_table) = target_table_name {
246        rows.push(ShowColumnRow {
247            name: ShowColumnName::special("target table name"),
248            r#type: target_table,
249            is_hidden: None,
250            description: None,
251        });
252    };
253
254    // TODO: table name and description as title of response
255    // TODO: recover the original user statement
256    Ok(PgResponse::builder(StatementType::DESCRIBE)
257        .rows(rows)
258        .into())
259}
260
261pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
262    match kind {
263        DescribeKind::Fragments => vec![PgFieldDescriptor::new(
264            "Fragments".to_owned(),
265            DataType::Varchar.to_oid(),
266            DataType::Varchar.type_len(),
267        )],
268        DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
269    }
270}
271
272pub async fn handle_describe_fragments(
273    handler_args: HandlerArgs,
274    object_name: ObjectName,
275) -> Result<RwPgResponse> {
276    let session = handler_args.session.clone();
277    let job_id = {
278        let mut binder = Binder::new_for_system(&session);
279
280        Binder::validate_cross_db_reference(&session.database(), &object_name)?;
281        let not_found_err = CatalogError::not_found("stream job", object_name.to_string());
282
283        if let Ok(relation) = binder.bind_catalog_relation_by_object_name(&object_name, true) {
284            match relation {
285                Relation::Source(s) => {
286                    if s.is_shared() {
287                        s.catalog.id.as_share_source_job_id()
288                    } else {
289                        bail!(ErrorCode::NotSupported(
290                            "non shared source has no fragments to describe".to_owned(),
291                            "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
292                        ));
293                    }
294                }
295                Relation::BaseTable(t) => t.table_catalog.id.as_job_id(),
296                Relation::SystemTable(_t) => {
297                    bail!(ErrorCode::NotSupported(
298                        "system table has no fragments to describe".to_owned(),
299                        "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
300                    ));
301                }
302                Relation::Share(_s) => {
303                    bail!(ErrorCode::NotSupported(
304                        "view has no fragments to describe".to_owned(),
305                        "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
306                    ));
307                }
308                _ => {
309                    // Other relation types (Subquery, Join, etc.) are not directly describable.
310                    return Err(not_found_err.into());
311                }
312            }
313        } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
314            sink.sink_catalog.id.as_job_id()
315        } else {
316            return Err(not_found_err.into());
317        }
318    };
319
320    let meta_client = session.env().meta_client();
321    let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
322    let res = generate_fragments_string(fragments)?;
323    Ok(res)
324}
325
326/// The implementation largely copied from `crate::utils::stream_graph_formatter::StreamGraphFormatter`.
327/// The input is different, so we need separate implementation.
328fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
329    let mut config = PrettyConfig {
330        need_boundaries: false,
331        width: 80,
332        ..Default::default()
333    };
334
335    let mut max_width = 80;
336
337    let mut blocks = vec![];
338    for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
339        let mut res = String::new();
340        let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
341        res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
342        let node = &fragment.actors[0].node;
343        let node = explain_node(node.as_ref().unwrap(), true);
344        let width = config.unicode(&mut res, &node);
345        max_width = max(width, max_width);
346        config.width = max_width;
347        blocks.push(res);
348        blocks.push("".to_owned());
349    }
350
351    let rows = blocks.iter().map(|b| ExplainRow {
352        query_plan: b.into(),
353    });
354    Ok(PgResponse::builder(StatementType::DESCRIBE)
355        .rows(rows)
356        .into())
357}
358
359fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
360    // TODO: we need extra edge information to get which fragment MergeExecutor connects to.
361    let one_line_explain = node.identity.clone();
362
363    let mut fields = Vec::with_capacity(3);
364    if verbose {
365        fields.push((
366            "output",
367            Pretty::Array(
368                node.fields
369                    .iter()
370                    .map(|f| Pretty::display(f.get_name()))
371                    .collect(),
372            ),
373        ));
374        fields.push((
375            "stream key",
376            Pretty::Array(
377                node.stream_key
378                    .iter()
379                    .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
380                    .collect(),
381            ),
382        ));
383    }
384    let children = node
385        .input
386        .iter()
387        .map(|input| explain_node(input, verbose))
388        .collect();
389    Pretty::simple_record(one_line_explain, fields, children)
390}
391
392pub async fn handle_describe_fragment(
393    handler_args: HandlerArgs,
394    fragment_id: FragmentId,
395) -> Result<RwPgResponse> {
396    let session = handler_args.session.clone();
397    let meta_client = session.env().meta_client();
398    let distribution = &meta_client
399        .get_fragment_by_id(fragment_id)
400        .await?
401        .ok_or_else(|| CatalogError::not_found("fragment", fragment_id.to_string()))?;
402    let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
403    Ok(res)
404}
405
406fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
407    let mut config = PrettyConfig {
408        need_boundaries: false,
409        width: 80,
410        ..Default::default()
411    };
412
413    let mut res = String::new();
414
415    res.push_str(&format!(
416        "Fragment {} (Table {})\n",
417        fragment_dist.fragment_id, fragment_dist.table_id
418    ));
419    let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
420        .unwrap_or(fragment::FragmentDistributionType::Unspecified);
421    res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
422    res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
423    res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
424
425    if !fragment_dist.state_table_ids.is_empty() {
426        res.push_str(&format!(
427            "State Tables: [{}]\n",
428            fragment_dist
429                .state_table_ids
430                .iter()
431                .map(|id| id.to_string())
432                .collect::<Vec<_>>()
433                .join(", ")
434        ));
435    }
436
437    if !fragment_dist.upstream_fragment_ids.is_empty() {
438        res.push_str(&format!(
439            "Upstream Fragments: [{}]\n",
440            fragment_dist
441                .upstream_fragment_ids
442                .iter()
443                .map(|id| id.to_string())
444                .collect::<Vec<_>>()
445                .join(", ")
446        ));
447    }
448
449    if let Some(node) = &fragment_dist.node {
450        res.push_str("Stream Plan:\n");
451        let node_pretty = explain_node(node, true);
452        config.unicode(&mut res, &node_pretty);
453    }
454
455    let rows = vec![ExplainRow { query_plan: res }];
456
457    Ok(PgResponse::builder(StatementType::DESCRIBE)
458        .rows(rows)
459        .into())
460}
461
462#[cfg(test)]
463mod tests {
464    use std::collections::HashMap;
465    use std::ops::Index;
466
467    use futures_async_stream::for_await;
468
469    use crate::test_utils::LocalFrontend;
470
471    #[tokio::test]
472    async fn test_describe_handler() {
473        let frontend = LocalFrontend::new(Default::default()).await;
474        frontend
475            .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
476            .await
477            .unwrap();
478
479        frontend
480            .run_sql("create index idx1 on t (v1 DESC, v2);")
481            .await
482            .unwrap();
483
484        let sql = "describe t";
485        let mut pg_response = frontend.run_sql(sql).await.unwrap();
486
487        let mut columns = HashMap::new();
488        #[for_await]
489        for row_set in pg_response.values_stream() {
490            let row_set = row_set.unwrap();
491            for row in row_set {
492                columns.insert(
493                    std::str::from_utf8(row.index(0).as_ref().unwrap())
494                        .unwrap()
495                        .to_owned(),
496                    std::str::from_utf8(row.index(1).as_ref().unwrap())
497                        .unwrap()
498                        .to_owned(),
499                );
500            }
501        }
502
503        let expected_columns: HashMap<String, String> = maplit::hashmap! {
504            "v1".into() => "integer".into(),
505            "v2".into() => "integer".into(),
506            "v3".into() => "integer".into(),
507            "v4".into() => "integer".into(),
508            "primary key".into() => "v3".into(),
509            "distribution key".into() => "v3".into(),
510            "_rw_timestamp".into() => "timestamp with time zone".into(),
511            "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
512            "table description".into() => "t".into(),
513        };
514
515        assert_eq!(columns, expected_columns);
516    }
517
518    #[tokio::test]
519    async fn test_describe_handler_with_target_table() {
520        let frontend = LocalFrontend::new(Default::default()).await;
521        frontend.run_sql("CREATE TABLE t(v int);").await.unwrap();
522
523        frontend.run_sql("CREATE TABLE tt(v int);").await.unwrap();
524
525        frontend.run_sql("CREATE SINK ssss INTO tt AS SELECT * FROM t WITH (type = 'append-only', force_append_only = 'true');").await.unwrap();
526
527        let sql = "describe ssss;";
528        let mut pg_response = frontend.run_sql(sql).await.unwrap();
529
530        let mut columns = HashMap::new();
531        #[for_await]
532        for row_set in pg_response.values_stream() {
533            let row_set = row_set.unwrap();
534            for row in row_set {
535                columns.insert(
536                    std::str::from_utf8(row.index(0).as_ref().unwrap())
537                        .unwrap()
538                        .to_owned(),
539                    std::str::from_utf8(row.index(1).as_ref().unwrap())
540                        .unwrap()
541                        .to_owned(),
542                );
543            }
544        }
545
546        let expected_columns: HashMap<String, String> = maplit::hashmap! {
547            "v".into() => "integer".into(),
548            "\"t._row_id\"".into() => "serial".into(),
549            "distribution key".into() => "t._row_id".into(),
550            "table description".into() => "ssss".into(),
551            "target table name".into() => "tt".into(),
552        };
553
554        assert_eq!(columns, expected_columns);
555    }
556}