Skip to main content

risingwave_frontend/handler/
describe.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::{CatalogError, FragmentId};
36use crate::error::{ErrorCode, Result};
37use crate::handler::show::ShowColumnName;
38use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
39
40pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
41    let session = handler_args.session;
42    let mut binder = Binder::new_for_system(&session);
43    let catalog_reader = session.env().catalog_reader().read_guard();
44
45    Binder::validate_cross_db_reference(&session.database(), &object_name)?;
46    let not_found_err =
47        CatalogError::not_found("table, source, sink or view", object_name.to_string());
48
49    // Vec<ColumnCatalog>, Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>, String, Option<String>
50    let (columns, pk_columns, dist_columns, indices, relname, description, target_table_name) =
51        if let Ok(relation) = binder.bind_relation_by_name(&object_name, None, None, false) {
52            match relation {
53                Relation::Source(s) => {
54                    let pk_column_catalogs = s
55                        .catalog
56                        .pk_col_ids
57                        .iter()
58                        .map(|&column_id| {
59                            Itertools::exactly_one(
60                                s.catalog
61                                    .columns
62                                    .iter()
63                                    .filter(|x| x.column_id() == column_id)
64                                    .map(|x| x.column_desc.clone()),
65                            )
66                            .unwrap()
67                        })
68                        .collect_vec();
69                    (
70                        s.catalog.columns,
71                        pk_column_catalogs,
72                        vec![],
73                        vec![],
74                        s.catalog.name,
75                        None, // Description
76                        None,
77                    )
78                }
79                Relation::BaseTable(t) => {
80                    let pk_column_catalogs = t
81                        .table_catalog
82                        .pk()
83                        .iter()
84                        .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
85                        .collect_vec();
86                    let dist_columns = t
87                        .table_catalog
88                        .distribution_key()
89                        .iter()
90                        .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
91                        .collect_vec();
92                    (
93                        t.table_catalog.columns.clone(),
94                        pk_column_catalogs,
95                        dist_columns,
96                        t.table_indexes,
97                        t.table_catalog.name.clone(),
98                        t.table_catalog.description.clone(),
99                        None,
100                    )
101                }
102                Relation::SystemTable(t) => {
103                    let pk_column_catalogs = t
104                        .sys_table_catalog
105                        .pk
106                        .iter()
107                        .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
108                        .collect_vec();
109                    (
110                        t.sys_table_catalog.columns.clone(),
111                        pk_column_catalogs,
112                        vec![],
113                        vec![],
114                        t.sys_table_catalog.name.clone(),
115                        None, // Description
116                        None,
117                    )
118                }
119                Relation::Share(_) => {
120                    if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
121                        let columns = view
122                            .view_catalog
123                            .columns
124                            .iter()
125                            .enumerate()
126                            .map(|(idx, field)| ColumnCatalog {
127                                column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
128                                is_hidden: false,
129                            })
130                            .collect();
131                        (
132                            columns,
133                            vec![],
134                            vec![],
135                            vec![],
136                            view.view_catalog.name.clone(),
137                            None,
138                            None,
139                        )
140                    } else {
141                        return Err(not_found_err.into());
142                    }
143                }
144                _ => {
145                    return Err(not_found_err.into());
146                }
147            }
148        } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
149            let columns = sink.sink_catalog.full_columns().to_vec();
150            let pk_columns = (sink.sink_catalog.downstream_pk.clone().unwrap_or_default())
151                .into_iter()
152                .map(|idx| columns[idx].column_desc.clone())
153                .collect_vec();
154            let dist_columns = sink
155                .sink_catalog
156                .distribution_key
157                .iter()
158                .map(|idx| columns[*idx].column_desc.clone())
159                .collect_vec();
160            let target_table_name = sink
161                .sink_catalog
162                .target_table
163                .and_then(|table_id| catalog_reader.get_table_name_by_id(table_id).ok());
164            (
165                columns,
166                pk_columns,
167                dist_columns,
168                vec![],
169                sink.sink_catalog.name.clone(),
170                None,
171                target_table_name,
172            )
173        } else {
174            return Err(not_found_err.into());
175        };
176
177    // Convert all column descs to rows
178    let mut rows = columns
179        .into_iter()
180        .flat_map(ShowColumnRow::from_catalog)
181        .collect_vec();
182
183    fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
184    where
185        T: Display,
186    {
187        format!(
188            "{}",
189            display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
190        )
191    }
192
193    // Convert primary key to rows
194    if !pk_columns.is_empty() {
195        rows.push(ShowColumnRow {
196            name: ShowColumnName::special("primary key"),
197            r#type: concat(pk_columns.iter().map(|x| &x.name)),
198            is_hidden: None,
199            description: None,
200        });
201    }
202
203    // Convert distribution keys to rows
204    if !dist_columns.is_empty() {
205        rows.push(ShowColumnRow {
206            name: ShowColumnName::special("distribution key"),
207            r#type: concat(dist_columns.iter().map(|x| &x.name)),
208            is_hidden: None,
209            description: None,
210        });
211    }
212
213    // Convert all indexes to rows
214    rows.extend(indices.iter().map(|index| {
215        let index_display = index.display();
216
217        ShowColumnRow {
218            name: ShowColumnName::special(&index.name),
219            r#type: if index_display.include_columns.is_empty() {
220                format!(
221                    "index({}) distributed by({})",
222                    display_comma_separated(&index_display.index_columns_with_ordering),
223                    display_comma_separated(&index_display.distributed_by_columns),
224                )
225            } else {
226                format!(
227                    "index({}) include({}) distributed by({})",
228                    display_comma_separated(&index_display.index_columns_with_ordering),
229                    display_comma_separated(&index_display.include_columns),
230                    display_comma_separated(&index_display.distributed_by_columns),
231                )
232            },
233            is_hidden: None,
234            // TODO: index description
235            description: None,
236        }
237    }));
238
239    rows.push(ShowColumnRow {
240        name: ShowColumnName::special("table description"),
241        r#type: relname,
242        is_hidden: None,
243        description,
244    });
245
246    if let Some(target_table) = target_table_name {
247        rows.push(ShowColumnRow {
248            name: ShowColumnName::special("target table name"),
249            r#type: target_table,
250            is_hidden: None,
251            description: None,
252        });
253    };
254
255    // TODO: table name and description as title of response
256    // TODO: recover the original user statement
257    Ok(PgResponse::builder(StatementType::DESCRIBE)
258        .rows(rows)
259        .into())
260}
261
262pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
263    match kind {
264        DescribeKind::Fragments => vec![PgFieldDescriptor::new(
265            "Fragments".to_owned(),
266            DataType::Varchar.to_oid(),
267            DataType::Varchar.type_len(),
268        )],
269        DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
270    }
271}
272
273pub async fn handle_describe_fragments(
274    handler_args: HandlerArgs,
275    object_name: ObjectName,
276) -> Result<RwPgResponse> {
277    let session = handler_args.session.clone();
278    let job_id = {
279        let mut binder = Binder::new_for_system(&session);
280
281        Binder::validate_cross_db_reference(&session.database(), &object_name)?;
282        let not_found_err = CatalogError::not_found("stream job", object_name.to_string());
283
284        if let Ok(relation) = binder.bind_catalog_relation_by_object_name(&object_name, true) {
285            match relation {
286                Relation::Source(s) => {
287                    if s.is_shared() {
288                        s.catalog.id.as_share_source_job_id()
289                    } else {
290                        bail!(ErrorCode::NotSupported(
291                            "non shared source has no fragments to describe".to_owned(),
292                            "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
293                        ));
294                    }
295                }
296                Relation::BaseTable(t) => t.table_catalog.id.as_job_id(),
297                Relation::SystemTable(_t) => {
298                    bail!(ErrorCode::NotSupported(
299                        "system table has no fragments to describe".to_owned(),
300                        "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
301                    ));
302                }
303                Relation::Share(_s) => {
304                    bail!(ErrorCode::NotSupported(
305                        "view has no fragments to describe".to_owned(),
306                        "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
307                    ));
308                }
309                _ => {
310                    // Other relation types (Subquery, Join, etc.) are not directly describable.
311                    return Err(not_found_err.into());
312                }
313            }
314        } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
315            sink.sink_catalog.id.as_job_id()
316        } else {
317            return Err(not_found_err.into());
318        }
319    };
320
321    let meta_client = session.env().meta_client();
322    let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
323    let res = generate_fragments_string(fragments)?;
324    Ok(res)
325}
326
327/// The implementation largely copied from `crate::utils::stream_graph_formatter::StreamGraphFormatter`.
328/// The input is different, so we need separate implementation.
329fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
330    let mut config = PrettyConfig {
331        need_boundaries: false,
332        width: 80,
333        ..Default::default()
334    };
335
336    let mut max_width = 80;
337
338    let mut blocks = vec![];
339    for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
340        let mut res = String::new();
341        let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
342        res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
343        let node = &fragment.actors[0].node;
344        let node = explain_node(node.as_ref().unwrap(), true);
345        let width = config.unicode(&mut res, &node);
346        max_width = max(width, max_width);
347        config.width = max_width;
348        blocks.push(res);
349        blocks.push("".to_owned());
350    }
351
352    let rows = blocks.iter().map(|b| ExplainRow {
353        query_plan: b.into(),
354    });
355    Ok(PgResponse::builder(StatementType::DESCRIBE)
356        .rows(rows)
357        .into())
358}
359
360fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
361    // TODO: we need extra edge information to get which fragment MergeExecutor connects to.
362    let one_line_explain = node.identity.clone();
363
364    let mut fields = Vec::with_capacity(3);
365    if verbose {
366        fields.push((
367            "output",
368            Pretty::Array(
369                node.fields
370                    .iter()
371                    .map(|f| Pretty::display(f.get_name()))
372                    .collect(),
373            ),
374        ));
375        fields.push((
376            "stream key",
377            Pretty::Array(
378                node.stream_key
379                    .iter()
380                    .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
381                    .collect(),
382            ),
383        ));
384    }
385    let children = node
386        .input
387        .iter()
388        .map(|input| explain_node(input, verbose))
389        .collect();
390    Pretty::simple_record(one_line_explain, fields, children)
391}
392
393pub async fn handle_describe_fragment(
394    handler_args: HandlerArgs,
395    fragment_id: FragmentId,
396) -> Result<RwPgResponse> {
397    let session = handler_args.session.clone();
398    let meta_client = session.env().meta_client();
399    let distribution = &meta_client
400        .get_fragment_by_id(fragment_id)
401        .await?
402        .ok_or_else(|| CatalogError::not_found("fragment", fragment_id.to_string()))?;
403    let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
404    Ok(res)
405}
406
407fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
408    let mut config = PrettyConfig {
409        need_boundaries: false,
410        width: 80,
411        ..Default::default()
412    };
413
414    let mut res = String::new();
415
416    res.push_str(&format!(
417        "Fragment {} (Table {})\n",
418        fragment_dist.fragment_id, fragment_dist.table_id
419    ));
420    let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
421        .unwrap_or(fragment::FragmentDistributionType::Unspecified);
422    res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
423    res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
424    res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
425
426    if !fragment_dist.state_table_ids.is_empty() {
427        res.push_str(&format!(
428            "State Tables: [{}]\n",
429            fragment_dist
430                .state_table_ids
431                .iter()
432                .map(|id| id.to_string())
433                .collect::<Vec<_>>()
434                .join(", ")
435        ));
436    }
437
438    if !fragment_dist.upstream_fragment_ids.is_empty() {
439        res.push_str(&format!(
440            "Upstream Fragments: [{}]\n",
441            fragment_dist
442                .upstream_fragment_ids
443                .iter()
444                .map(|id| id.to_string())
445                .collect::<Vec<_>>()
446                .join(", ")
447        ));
448    }
449
450    if let Some(node) = &fragment_dist.node {
451        res.push_str("Stream Plan:\n");
452        let node_pretty = explain_node(node, true);
453        config.unicode(&mut res, &node_pretty);
454    }
455
456    let rows = vec![ExplainRow { query_plan: res }];
457
458    Ok(PgResponse::builder(StatementType::DESCRIBE)
459        .rows(rows)
460        .into())
461}
462
463#[cfg(test)]
464mod tests {
465    use std::collections::HashMap;
466    use std::ops::Index;
467
468    use futures_async_stream::for_await;
469
470    use crate::test_utils::LocalFrontend;
471
472    #[tokio::test]
473    async fn test_describe_handler() {
474        let frontend = LocalFrontend::new(Default::default()).await;
475        frontend
476            .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
477            .await
478            .unwrap();
479
480        frontend
481            .run_sql("create index idx1 on t (v1 DESC, v2);")
482            .await
483            .unwrap();
484
485        let sql = "describe t";
486        let mut pg_response = frontend.run_sql(sql).await.unwrap();
487
488        let mut columns = HashMap::new();
489        #[for_await]
490        for row_set in pg_response.values_stream() {
491            let row_set = row_set.unwrap();
492            for row in row_set {
493                columns.insert(
494                    std::str::from_utf8(row.index(0).as_ref().unwrap())
495                        .unwrap()
496                        .to_owned(),
497                    std::str::from_utf8(row.index(1).as_ref().unwrap())
498                        .unwrap()
499                        .to_owned(),
500                );
501            }
502        }
503
504        let expected_columns: HashMap<String, String> = maplit::hashmap! {
505            "v1".into() => "integer".into(),
506            "v2".into() => "integer".into(),
507            "v3".into() => "integer".into(),
508            "v4".into() => "integer".into(),
509            "primary key".into() => "v3".into(),
510            "distribution key".into() => "v3".into(),
511            "_rw_timestamp".into() => "timestamp with time zone".into(),
512            "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
513            "table description".into() => "t".into(),
514        };
515
516        assert_eq!(columns, expected_columns);
517    }
518
519    #[tokio::test]
520    async fn test_describe_handler_with_target_table() {
521        let frontend = LocalFrontend::new(Default::default()).await;
522        frontend.run_sql("CREATE TABLE t(v int);").await.unwrap();
523
524        frontend.run_sql("CREATE TABLE tt(v int);").await.unwrap();
525
526        frontend.run_sql("CREATE SINK ssss INTO tt AS SELECT * FROM t WITH (type = 'append-only', force_append_only = 'true');").await.unwrap();
527
528        let sql = "describe ssss;";
529        let mut pg_response = frontend.run_sql(sql).await.unwrap();
530
531        let mut columns = HashMap::new();
532        #[for_await]
533        for row_set in pg_response.values_stream() {
534            let row_set = row_set.unwrap();
535            for row in row_set {
536                columns.insert(
537                    std::str::from_utf8(row.index(0).as_ref().unwrap())
538                        .unwrap()
539                        .to_owned(),
540                    std::str::from_utf8(row.index(1).as_ref().unwrap())
541                        .unwrap()
542                        .to_owned(),
543                );
544            }
545        }
546
547        let expected_columns: HashMap<String, String> = maplit::hashmap! {
548            "v".into() => "integer".into(),
549            "\"t._row_id\"".into() => "serial".into(),
550            "distribution key".into() => "t._row_id".into(),
551            "table description".into() => "ssss".into(),
552            "target table name".into() => "tt".into(),
553        };
554
555        assert_eq!(columns, expected_columns);
556    }
557}