risingwave_frontend/handler/
describe.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::CatalogError;
36use crate::error::{ErrorCode, Result};
37use crate::handler::show::ShowColumnName;
38use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
39
40pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
41    let session = handler_args.session;
42    let mut binder = Binder::new_for_system(&session);
43
44    Binder::validate_cross_db_reference(&session.database(), &object_name)?;
45    let not_found_err =
46        CatalogError::NotFound("table, source, sink or view", object_name.to_string());
47
48    // Vec<ColumnCatalog>, Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>, String, Option<String>
49    let (columns, pk_columns, dist_columns, indices, relname, description) = if let Ok(relation) =
50        binder.bind_relation_by_name(object_name.clone(), None, None, false)
51    {
52        match relation {
53            Relation::Source(s) => {
54                let pk_column_catalogs = s
55                    .catalog
56                    .pk_col_ids
57                    .iter()
58                    .map(|&column_id| {
59                        s.catalog
60                            .columns
61                            .iter()
62                            .filter(|x| x.column_id() == column_id)
63                            .map(|x| x.column_desc.clone())
64                            .exactly_one()
65                            .unwrap()
66                    })
67                    .collect_vec();
68                (
69                    s.catalog.columns,
70                    pk_column_catalogs,
71                    vec![],
72                    vec![],
73                    s.catalog.name,
74                    None, // Description
75                )
76            }
77            Relation::BaseTable(t) => {
78                let pk_column_catalogs = t
79                    .table_catalog
80                    .pk()
81                    .iter()
82                    .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
83                    .collect_vec();
84                let dist_columns = t
85                    .table_catalog
86                    .distribution_key()
87                    .iter()
88                    .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
89                    .collect_vec();
90                (
91                    t.table_catalog.columns.clone(),
92                    pk_column_catalogs,
93                    dist_columns,
94                    t.table_indexes,
95                    t.table_catalog.name.clone(),
96                    t.table_catalog.description.clone(),
97                )
98            }
99            Relation::SystemTable(t) => {
100                let pk_column_catalogs = t
101                    .sys_table_catalog
102                    .pk
103                    .iter()
104                    .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
105                    .collect_vec();
106                (
107                    t.sys_table_catalog.columns.clone(),
108                    pk_column_catalogs,
109                    vec![],
110                    vec![],
111                    t.sys_table_catalog.name.clone(),
112                    None, // Description
113                )
114            }
115            Relation::Share(_) => {
116                if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
117                    let columns = view
118                        .view_catalog
119                        .columns
120                        .iter()
121                        .enumerate()
122                        .map(|(idx, field)| ColumnCatalog {
123                            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
124                            is_hidden: false,
125                        })
126                        .collect();
127                    (
128                        columns,
129                        vec![],
130                        vec![],
131                        vec![],
132                        view.view_catalog.name.clone(),
133                        None,
134                    )
135                } else {
136                    return Err(not_found_err.into());
137                }
138            }
139            _ => {
140                return Err(not_found_err.into());
141            }
142        }
143    } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
144        let columns = sink.sink_catalog.full_columns().to_vec();
145        let pk_columns = sink
146            .sink_catalog
147            .downstream_pk_indices()
148            .into_iter()
149            .map(|idx| columns[idx].column_desc.clone())
150            .collect_vec();
151        let dist_columns = sink
152            .sink_catalog
153            .distribution_key
154            .iter()
155            .map(|idx| columns[*idx].column_desc.clone())
156            .collect_vec();
157        (
158            columns,
159            pk_columns,
160            dist_columns,
161            vec![],
162            sink.sink_catalog.name.clone(),
163            None,
164        )
165    } else {
166        return Err(not_found_err.into());
167    };
168
169    // Convert all column descs to rows
170    let mut rows = columns
171        .into_iter()
172        .flat_map(ShowColumnRow::from_catalog)
173        .collect_vec();
174
175    fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
176    where
177        T: Display,
178    {
179        format!(
180            "{}",
181            display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
182        )
183    }
184
185    // Convert primary key to rows
186    if !pk_columns.is_empty() {
187        rows.push(ShowColumnRow {
188            name: ShowColumnName::special("primary key"),
189            r#type: concat(pk_columns.iter().map(|x| &x.name)),
190            is_hidden: None,
191            description: None,
192        });
193    }
194
195    // Convert distribution keys to rows
196    if !dist_columns.is_empty() {
197        rows.push(ShowColumnRow {
198            name: ShowColumnName::special("distribution key"),
199            r#type: concat(dist_columns.iter().map(|x| &x.name)),
200            is_hidden: None,
201            description: None,
202        });
203    }
204
205    // Convert all indexes to rows
206    rows.extend(indices.iter().map(|index| {
207        let index_display = index.display();
208
209        ShowColumnRow {
210            name: ShowColumnName::special(&index.name),
211            r#type: if index_display.include_columns.is_empty() {
212                format!(
213                    "index({}) distributed by({})",
214                    display_comma_separated(&index_display.index_columns_with_ordering),
215                    display_comma_separated(&index_display.distributed_by_columns),
216                )
217            } else {
218                format!(
219                    "index({}) include({}) distributed by({})",
220                    display_comma_separated(&index_display.index_columns_with_ordering),
221                    display_comma_separated(&index_display.include_columns),
222                    display_comma_separated(&index_display.distributed_by_columns),
223                )
224            },
225            is_hidden: None,
226            // TODO: index description
227            description: None,
228        }
229    }));
230
231    rows.push(ShowColumnRow {
232        name: ShowColumnName::special("table description"),
233        r#type: relname,
234        is_hidden: None,
235        description,
236    });
237
238    // TODO: table name and description as title of response
239    // TODO: recover the original user statement
240    Ok(PgResponse::builder(StatementType::DESCRIBE)
241        .rows(rows)
242        .into())
243}
244
245pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
246    match kind {
247        DescribeKind::Fragments => vec![PgFieldDescriptor::new(
248            "Fragments".to_owned(),
249            DataType::Varchar.to_oid(),
250            DataType::Varchar.type_len(),
251        )],
252        DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
253    }
254}
255
256pub async fn handle_describe_fragments(
257    handler_args: HandlerArgs,
258    object_name: ObjectName,
259) -> Result<RwPgResponse> {
260    let session = handler_args.session.clone();
261    let job_id = {
262        let mut binder = Binder::new_for_system(&session);
263
264        Binder::validate_cross_db_reference(&session.database(), &object_name)?;
265        let not_found_err = CatalogError::NotFound("stream job", object_name.to_string());
266
267        if let Ok(relation) = binder.bind_catalog_relation_by_object_name(object_name.clone(), true)
268        {
269            match relation {
270                Relation::Source(s) => {
271                    if s.is_shared() {
272                        s.catalog.id
273                    } else {
274                        bail!(ErrorCode::NotSupported(
275                            "non shared source has no fragments to describe".to_owned(),
276                            "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
277                        ));
278                    }
279                }
280                Relation::BaseTable(t) => t.table_catalog.id.table_id,
281                Relation::SystemTable(_t) => {
282                    bail!(ErrorCode::NotSupported(
283                        "system table has no fragments to describe".to_owned(),
284                        "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
285                    ));
286                }
287                Relation::Share(_s) => {
288                    bail!(ErrorCode::NotSupported(
289                        "view has no fragments to describe".to_owned(),
290                        "Use `DESCRIBE` instead of `DESCRIBE FRAGMENTS`".to_owned(),
291                    ));
292                }
293                _ => {
294                    // Other relation types (Subquery, Join, etc.) are not directly describable.
295                    return Err(not_found_err.into());
296                }
297            }
298        } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
299            sink.sink_catalog.id.sink_id
300        } else {
301            return Err(not_found_err.into());
302        }
303    };
304
305    let meta_client = session.env().meta_client();
306    let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
307    let res = generate_fragments_string(fragments)?;
308    Ok(res)
309}
310
311/// The implementation largely copied from `crate::utils::stream_graph_formatter::StreamGraphFormatter`.
312/// The input is different, so we need separate implementation.
313fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
314    let mut config = PrettyConfig {
315        need_boundaries: false,
316        width: 80,
317        ..Default::default()
318    };
319
320    let mut max_width = 80;
321
322    let mut blocks = vec![];
323    for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
324        let mut res = String::new();
325        let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
326        res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
327        let node = &fragment.actors[0].node;
328        let node = explain_node(node.as_ref().unwrap(), true);
329        let width = config.unicode(&mut res, &node);
330        max_width = max(width, max_width);
331        config.width = max_width;
332        blocks.push(res);
333        blocks.push("".to_owned());
334    }
335
336    let rows = blocks.iter().map(|b| ExplainRow {
337        query_plan: b.into(),
338    });
339    Ok(PgResponse::builder(StatementType::DESCRIBE)
340        .rows(rows)
341        .into())
342}
343
344fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
345    // TODO: we need extra edge information to get which fragment MergeExecutor connects to.
346    let one_line_explain = node.identity.clone();
347
348    let mut fields = Vec::with_capacity(3);
349    if verbose {
350        fields.push((
351            "output",
352            Pretty::Array(
353                node.fields
354                    .iter()
355                    .map(|f| Pretty::display(f.get_name()))
356                    .collect(),
357            ),
358        ));
359        fields.push((
360            "stream key",
361            Pretty::Array(
362                node.stream_key
363                    .iter()
364                    .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
365                    .collect(),
366            ),
367        ));
368    }
369    let children = node
370        .input
371        .iter()
372        .map(|input| explain_node(input, verbose))
373        .collect();
374    Pretty::simple_record(one_line_explain, fields, children)
375}
376
377pub async fn handle_describe_fragment(
378    handler_args: HandlerArgs,
379    fragment_id: u32,
380) -> Result<RwPgResponse> {
381    let session = handler_args.session.clone();
382    let meta_client = session.env().meta_client();
383    let distribution = &meta_client
384        .get_fragment_by_id(fragment_id)
385        .await?
386        .ok_or_else(|| CatalogError::NotFound("fragment", fragment_id.to_string()))?;
387    let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
388    Ok(res)
389}
390
391fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
392    let mut config = PrettyConfig {
393        need_boundaries: false,
394        width: 80,
395        ..Default::default()
396    };
397
398    let mut res = String::new();
399
400    res.push_str(&format!(
401        "Fragment {} (Table {})\n",
402        fragment_dist.fragment_id, fragment_dist.table_id
403    ));
404    let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
405        .unwrap_or(fragment::FragmentDistributionType::Unspecified);
406    res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
407    res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
408    res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
409
410    if !fragment_dist.state_table_ids.is_empty() {
411        res.push_str(&format!(
412            "State Tables: [{}]\n",
413            fragment_dist
414                .state_table_ids
415                .iter()
416                .map(|id| id.to_string())
417                .collect::<Vec<_>>()
418                .join(", ")
419        ));
420    }
421
422    if !fragment_dist.upstream_fragment_ids.is_empty() {
423        res.push_str(&format!(
424            "Upstream Fragments: [{}]\n",
425            fragment_dist
426                .upstream_fragment_ids
427                .iter()
428                .map(|id| id.to_string())
429                .collect::<Vec<_>>()
430                .join(", ")
431        ));
432    }
433
434    if let Some(node) = &fragment_dist.node {
435        res.push_str("Stream Plan:\n");
436        let node_pretty = explain_node(node, true);
437        config.unicode(&mut res, &node_pretty);
438    }
439
440    let rows = vec![ExplainRow { query_plan: res }];
441
442    Ok(PgResponse::builder(StatementType::DESCRIBE)
443        .rows(rows)
444        .into())
445}
446
447#[cfg(test)]
448mod tests {
449    use std::collections::HashMap;
450    use std::ops::Index;
451
452    use futures_async_stream::for_await;
453
454    use crate::test_utils::LocalFrontend;
455
456    #[tokio::test]
457    async fn test_describe_handler() {
458        let frontend = LocalFrontend::new(Default::default()).await;
459        frontend
460            .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
461            .await
462            .unwrap();
463
464        frontend
465            .run_sql("create index idx1 on t (v1 DESC, v2);")
466            .await
467            .unwrap();
468
469        let sql = "describe t";
470        let mut pg_response = frontend.run_sql(sql).await.unwrap();
471
472        let mut columns = HashMap::new();
473        #[for_await]
474        for row_set in pg_response.values_stream() {
475            let row_set = row_set.unwrap();
476            for row in row_set {
477                columns.insert(
478                    std::str::from_utf8(row.index(0).as_ref().unwrap())
479                        .unwrap()
480                        .to_owned(),
481                    std::str::from_utf8(row.index(1).as_ref().unwrap())
482                        .unwrap()
483                        .to_owned(),
484                );
485            }
486        }
487
488        let expected_columns: HashMap<String, String> = maplit::hashmap! {
489            "v1".into() => "integer".into(),
490            "v2".into() => "integer".into(),
491            "v3".into() => "integer".into(),
492            "v4".into() => "integer".into(),
493            "primary key".into() => "v3".into(),
494            "distribution key".into() => "v3".into(),
495            "_rw_timestamp".into() => "timestamp with time zone".into(),
496            "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
497            "table description".into() => "t".into(),
498        };
499
500        assert_eq!(columns, expected_columns);
501    }
502}