risingwave_frontend/handler/
describe.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::CatalogError;
36use crate::error::{ErrorCode, Result};
37use crate::handler::show::ShowColumnName;
38use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
39
40pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
41    let session = handler_args.session;
42    let mut binder = Binder::new_for_system(&session);
43
44    Binder::validate_cross_db_reference(&session.database(), &object_name)?;
45    let not_found_err =
46        CatalogError::NotFound("table, source, sink or view", object_name.to_string());
47
48    // Vec<ColumnCatalog>, Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>, String, Option<String>
49    let (columns, pk_columns, dist_columns, indices, relname, description) =
50        if let Ok(relation) = binder.bind_relation_by_name(&object_name, None, None, false) {
51            match relation {
52                Relation::Source(s) => {
53                    let pk_column_catalogs = s
54                        .catalog
55                        .pk_col_ids
56                        .iter()
57                        .map(|&column_id| {
58                            s.catalog
59                                .columns
60                                .iter()
61                                .filter(|x| x.column_id() == column_id)
62                                .map(|x| x.column_desc.clone())
63                                .exactly_one()
64                                .unwrap()
65                        })
66                        .collect_vec();
67                    (
68                        s.catalog.columns,
69                        pk_column_catalogs,
70                        vec![],
71                        vec![],
72                        s.catalog.name,
73                        None, // Description
74                    )
75                }
76                Relation::BaseTable(t) => {
77                    let pk_column_catalogs = t
78                        .table_catalog
79                        .pk()
80                        .iter()
81                        .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
82                        .collect_vec();
83                    let dist_columns = t
84                        .table_catalog
85                        .distribution_key()
86                        .iter()
87                        .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
88                        .collect_vec();
89                    (
90                        t.table_catalog.columns.clone(),
91                        pk_column_catalogs,
92                        dist_columns,
93                        t.table_indexes,
94                        t.table_catalog.name.clone(),
95                        t.table_catalog.description.clone(),
96                    )
97                }
98                Relation::SystemTable(t) => {
99                    let pk_column_catalogs = t
100                        .sys_table_catalog
101                        .pk
102                        .iter()
103                        .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
104                        .collect_vec();
105                    (
106                        t.sys_table_catalog.columns.clone(),
107                        pk_column_catalogs,
108                        vec![],
109                        vec![],
110                        t.sys_table_catalog.name.clone(),
111                        None, // Description
112                    )
113                }
114                Relation::Share(_) => {
115                    if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
116                        let columns = view
117                            .view_catalog
118                            .columns
119                            .iter()
120                            .enumerate()
121                            .map(|(idx, field)| ColumnCatalog {
122                                column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
123                                is_hidden: false,
124                            })
125                            .collect();
126                        (
127                            columns,
128                            vec![],
129                            vec![],
130                            vec![],
131                            view.view_catalog.name.clone(),
132                            None,
133                        )
134                    } else {
135                        return Err(not_found_err.into());
136                    }
137                }
138                _ => {
139                    return Err(not_found_err.into());
140                }
141            }
142        } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
143            let columns = sink.sink_catalog.full_columns().to_vec();
144            let pk_columns = (sink.sink_catalog.downstream_pk.clone().unwrap_or_default())
145                .into_iter()
146                .map(|idx| columns[idx].column_desc.clone())
147                .collect_vec();
148            let dist_columns = sink
149                .sink_catalog
150                .distribution_key
151                .iter()
152                .map(|idx| columns[*idx].column_desc.clone())
153                .collect_vec();
154            (
155                columns,
156                pk_columns,
157                dist_columns,
158                vec![],
159                sink.sink_catalog.name.clone(),
160                None,
161            )
162        } else {
163            return Err(not_found_err.into());
164        };
165
166    // Convert all column descs to rows
167    let mut rows = columns
168        .into_iter()
169        .flat_map(ShowColumnRow::from_catalog)
170        .collect_vec();
171
172    fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
173    where
174        T: Display,
175    {
176        format!(
177            "{}",
178            display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
179        )
180    }
181
182    // Convert primary key to rows
183    if !pk_columns.is_empty() {
184        rows.push(ShowColumnRow {
185            name: ShowColumnName::special("primary key"),
186            r#type: concat(pk_columns.iter().map(|x| &x.name)),
187            is_hidden: None,
188            description: None,
189        });
190    }
191
192    // Convert distribution keys to rows
193    if !dist_columns.is_empty() {
194        rows.push(ShowColumnRow {
195            name: ShowColumnName::special("distribution key"),
196            r#type: concat(dist_columns.iter().map(|x| &x.name)),
197            is_hidden: None,
198            description: None,
199        });
200    }
201
202    // Convert all indexes to rows
203    rows.extend(indices.iter().map(|index| {
204        let index_display = index.display();
205
206        ShowColumnRow {
207            name: ShowColumnName::special(&index.name),
208            r#type: if index_display.include_columns.is_empty() {
209                format!(
210                    "index({}) distributed by({})",
211                    display_comma_separated(&index_display.index_columns_with_ordering),
212                    display_comma_separated(&index_display.distributed_by_columns),
213                )
214            } else {
215                format!(
216                    "index({}) include({}) distributed by({})",
217                    display_comma_separated(&index_display.index_columns_with_ordering),
218                    display_comma_separated(&index_display.include_columns),
219                    display_comma_separated(&index_display.distributed_by_columns),
220                )
221            },
222            is_hidden: None,
223            // TODO: index description
224            description: None,
225        }
226    }));
227
228    rows.push(ShowColumnRow {
229        name: ShowColumnName::special("table description"),
230        r#type: relname,
231        is_hidden: None,
232        description,
233    });
234
235    // TODO: table name and description as title of response
236    // TODO: recover the original user statement
237    Ok(PgResponse::builder(StatementType::DESCRIBE)
238        .rows(rows)
239        .into())
240}
241
242pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
243    match kind {
244        DescribeKind::Fragments => vec![PgFieldDescriptor::new(
245            "Fragments".to_owned(),
246            DataType::Varchar.to_oid(),
247            DataType::Varchar.type_len(),
248        )],
249        DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
250    }
251}
252
253pub async fn handle_describe_fragments(
254    handler_args: HandlerArgs,
255    object_name: ObjectName,
256) -> Result<RwPgResponse> {
257    let session = handler_args.session.clone();
258    let job_id = {
259        let mut binder = Binder::new_for_system(&session);
260
261        Binder::validate_cross_db_reference(&session.database(), &object_name)?;
262        let not_found_err = CatalogError::NotFound("stream job", object_name.to_string());
263
264        if let Ok(relation) = binder.bind_catalog_relation_by_object_name(&object_name, true) {
265            match relation {
266                Relation::Source(s) => {
267                    if s.is_shared() {
268                        s.catalog.id
269                    } else {
270                        bail!(ErrorCode::NotSupported(
271                            "non shared source has no fragments to describe".to_owned(),
272                            "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
273                        ));
274                    }
275                }
276                Relation::BaseTable(t) => t.table_catalog.id.as_raw_id(),
277                Relation::SystemTable(_t) => {
278                    bail!(ErrorCode::NotSupported(
279                        "system table has no fragments to describe".to_owned(),
280                        "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
281                    ));
282                }
283                Relation::Share(_s) => {
284                    bail!(ErrorCode::NotSupported(
285                        "view has no fragments to describe".to_owned(),
286                        "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
287                    ));
288                }
289                _ => {
290                    // Other relation types (Subquery, Join, etc.) are not directly describable.
291                    return Err(not_found_err.into());
292                }
293            }
294        } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
295            sink.sink_catalog.id.sink_id
296        } else {
297            return Err(not_found_err.into());
298        }
299    };
300
301    let meta_client = session.env().meta_client();
302    let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
303    let res = generate_fragments_string(fragments)?;
304    Ok(res)
305}
306
307/// The implementation largely copied from `crate::utils::stream_graph_formatter::StreamGraphFormatter`.
308/// The input is different, so we need separate implementation.
309fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
310    let mut config = PrettyConfig {
311        need_boundaries: false,
312        width: 80,
313        ..Default::default()
314    };
315
316    let mut max_width = 80;
317
318    let mut blocks = vec![];
319    for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
320        let mut res = String::new();
321        let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
322        res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
323        let node = &fragment.actors[0].node;
324        let node = explain_node(node.as_ref().unwrap(), true);
325        let width = config.unicode(&mut res, &node);
326        max_width = max(width, max_width);
327        config.width = max_width;
328        blocks.push(res);
329        blocks.push("".to_owned());
330    }
331
332    let rows = blocks.iter().map(|b| ExplainRow {
333        query_plan: b.into(),
334    });
335    Ok(PgResponse::builder(StatementType::DESCRIBE)
336        .rows(rows)
337        .into())
338}
339
340fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
341    // TODO: we need extra edge information to get which fragment MergeExecutor connects to.
342    let one_line_explain = node.identity.clone();
343
344    let mut fields = Vec::with_capacity(3);
345    if verbose {
346        fields.push((
347            "output",
348            Pretty::Array(
349                node.fields
350                    .iter()
351                    .map(|f| Pretty::display(f.get_name()))
352                    .collect(),
353            ),
354        ));
355        fields.push((
356            "stream key",
357            Pretty::Array(
358                node.stream_key
359                    .iter()
360                    .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
361                    .collect(),
362            ),
363        ));
364    }
365    let children = node
366        .input
367        .iter()
368        .map(|input| explain_node(input, verbose))
369        .collect();
370    Pretty::simple_record(one_line_explain, fields, children)
371}
372
373pub async fn handle_describe_fragment(
374    handler_args: HandlerArgs,
375    fragment_id: u32,
376) -> Result<RwPgResponse> {
377    let session = handler_args.session.clone();
378    let meta_client = session.env().meta_client();
379    let distribution = &meta_client
380        .get_fragment_by_id(fragment_id)
381        .await?
382        .ok_or_else(|| CatalogError::NotFound("fragment", fragment_id.to_string()))?;
383    let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
384    Ok(res)
385}
386
387fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
388    let mut config = PrettyConfig {
389        need_boundaries: false,
390        width: 80,
391        ..Default::default()
392    };
393
394    let mut res = String::new();
395
396    res.push_str(&format!(
397        "Fragment {} (Table {})\n",
398        fragment_dist.fragment_id, fragment_dist.table_id
399    ));
400    let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
401        .unwrap_or(fragment::FragmentDistributionType::Unspecified);
402    res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
403    res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
404    res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
405
406    if !fragment_dist.state_table_ids.is_empty() {
407        res.push_str(&format!(
408            "State Tables: [{}]\n",
409            fragment_dist
410                .state_table_ids
411                .iter()
412                .map(|id| id.to_string())
413                .collect::<Vec<_>>()
414                .join(", ")
415        ));
416    }
417
418    if !fragment_dist.upstream_fragment_ids.is_empty() {
419        res.push_str(&format!(
420            "Upstream Fragments: [{}]\n",
421            fragment_dist
422                .upstream_fragment_ids
423                .iter()
424                .map(|id| id.to_string())
425                .collect::<Vec<_>>()
426                .join(", ")
427        ));
428    }
429
430    if let Some(node) = &fragment_dist.node {
431        res.push_str("Stream Plan:\n");
432        let node_pretty = explain_node(node, true);
433        config.unicode(&mut res, &node_pretty);
434    }
435
436    let rows = vec![ExplainRow { query_plan: res }];
437
438    Ok(PgResponse::builder(StatementType::DESCRIBE)
439        .rows(rows)
440        .into())
441}
442
443#[cfg(test)]
444mod tests {
445    use std::collections::HashMap;
446    use std::ops::Index;
447
448    use futures_async_stream::for_await;
449
450    use crate::test_utils::LocalFrontend;
451
452    #[tokio::test]
453    async fn test_describe_handler() {
454        let frontend = LocalFrontend::new(Default::default()).await;
455        frontend
456            .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
457            .await
458            .unwrap();
459
460        frontend
461            .run_sql("create index idx1 on t (v1 DESC, v2);")
462            .await
463            .unwrap();
464
465        let sql = "describe t";
466        let mut pg_response = frontend.run_sql(sql).await.unwrap();
467
468        let mut columns = HashMap::new();
469        #[for_await]
470        for row_set in pg_response.values_stream() {
471            let row_set = row_set.unwrap();
472            for row in row_set {
473                columns.insert(
474                    std::str::from_utf8(row.index(0).as_ref().unwrap())
475                        .unwrap()
476                        .to_owned(),
477                    std::str::from_utf8(row.index(1).as_ref().unwrap())
478                        .unwrap()
479                        .to_owned(),
480                );
481            }
482        }
483
484        let expected_columns: HashMap<String, String> = maplit::hashmap! {
485            "v1".into() => "integer".into(),
486            "v2".into() => "integer".into(),
487            "v3".into() => "integer".into(),
488            "v4".into() => "integer".into(),
489            "primary key".into() => "v3".into(),
490            "distribution key".into() => "v3".into(),
491            "_rw_timestamp".into() => "timestamp with time zone".into(),
492            "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
493            "table description".into() => "t".into(),
494        };
495
496        assert_eq!(columns, expected_columns);
497    }
498}