risingwave_frontend/handler/
describe.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::FragmentDistribution;
26use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
27use risingwave_pb::meta::table_fragments::fragment;
28use risingwave_pb::stream_plan::StreamNode;
29use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
30
31use super::explain::ExplainRow;
32use super::show::ShowColumnRow;
33use super::{RwPgResponse, fields_to_descriptors};
34use crate::binder::{Binder, Relation};
35use crate::catalog::CatalogError;
36use crate::error::{ErrorCode, Result};
37use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
38
39pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
40    let session = handler_args.session;
41    let mut binder = Binder::new_for_system(&session);
42
43    Binder::validate_cross_db_reference(&session.database(), &object_name)?;
44    let not_found_err =
45        CatalogError::NotFound("table, source, sink or view", object_name.to_string());
46
47    // Vec<ColumnCatalog>, Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>, String, Option<String>
48    let (columns, pk_columns, dist_columns, indices, relname, description) = if let Ok(relation) =
49        binder.bind_relation_by_name(object_name.clone(), None, None, false)
50    {
51        match relation {
52            Relation::Source(s) => {
53                let pk_column_catalogs = s
54                    .catalog
55                    .pk_col_ids
56                    .iter()
57                    .map(|&column_id| {
58                        s.catalog
59                            .columns
60                            .iter()
61                            .filter(|x| x.column_id() == column_id)
62                            .map(|x| x.column_desc.clone())
63                            .exactly_one()
64                            .unwrap()
65                    })
66                    .collect_vec();
67                (
68                    s.catalog.columns,
69                    pk_column_catalogs,
70                    vec![],
71                    vec![],
72                    s.catalog.name,
73                    None, // Description
74                )
75            }
76            Relation::BaseTable(t) => {
77                let pk_column_catalogs = t
78                    .table_catalog
79                    .pk()
80                    .iter()
81                    .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
82                    .collect_vec();
83                let dist_columns = t
84                    .table_catalog
85                    .distribution_key()
86                    .iter()
87                    .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
88                    .collect_vec();
89                (
90                    t.table_catalog.columns.clone(),
91                    pk_column_catalogs,
92                    dist_columns,
93                    t.table_indexes,
94                    t.table_catalog.name.clone(),
95                    t.table_catalog.description.clone(),
96                )
97            }
98            Relation::SystemTable(t) => {
99                let pk_column_catalogs = t
100                    .sys_table_catalog
101                    .pk
102                    .iter()
103                    .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
104                    .collect_vec();
105                (
106                    t.sys_table_catalog.columns.clone(),
107                    pk_column_catalogs,
108                    vec![],
109                    vec![],
110                    t.sys_table_catalog.name.clone(),
111                    None, // Description
112                )
113            }
114            Relation::Share(_) => {
115                if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
116                    let columns = view
117                        .view_catalog
118                        .columns
119                        .iter()
120                        .enumerate()
121                        .map(|(idx, field)| ColumnCatalog {
122                            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
123                            is_hidden: false,
124                        })
125                        .collect();
126                    (
127                        columns,
128                        vec![],
129                        vec![],
130                        vec![],
131                        view.view_catalog.name.clone(),
132                        None,
133                    )
134                } else {
135                    return Err(not_found_err.into());
136                }
137            }
138            _ => {
139                return Err(not_found_err.into());
140            }
141        }
142    } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
143        let columns = sink.sink_catalog.full_columns().to_vec();
144        let pk_columns = sink
145            .sink_catalog
146            .downstream_pk_indices()
147            .into_iter()
148            .map(|idx| columns[idx].column_desc.clone())
149            .collect_vec();
150        let dist_columns = sink
151            .sink_catalog
152            .distribution_key
153            .iter()
154            .map(|idx| columns[*idx].column_desc.clone())
155            .collect_vec();
156        (
157            columns,
158            pk_columns,
159            dist_columns,
160            vec![],
161            sink.sink_catalog.name.clone(),
162            None,
163        )
164    } else {
165        return Err(not_found_err.into());
166    };
167
168    // Convert all column descs to rows
169    let mut rows = columns
170        .into_iter()
171        .flat_map(ShowColumnRow::from_catalog)
172        .collect_vec();
173
174    fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
175    where
176        T: Display,
177    {
178        format!(
179            "{}",
180            display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
181        )
182    }
183
184    // Convert primary key to rows
185    if !pk_columns.is_empty() {
186        rows.push(ShowColumnRow {
187            name: "primary key".into(),
188            r#type: concat(pk_columns.iter().map(|x| &x.name)),
189            is_hidden: None,
190            description: None,
191        });
192    }
193
194    // Convert distribution keys to rows
195    if !dist_columns.is_empty() {
196        rows.push(ShowColumnRow {
197            name: "distribution key".into(),
198            r#type: concat(dist_columns.iter().map(|x| &x.name)),
199            is_hidden: None,
200            description: None,
201        });
202    }
203
204    // Convert all indexes to rows
205    rows.extend(indices.iter().map(|index| {
206        let index_display = index.display();
207
208        ShowColumnRow {
209            name: index.name.clone(),
210            r#type: if index_display.include_columns.is_empty() {
211                format!(
212                    "index({}) distributed by({})",
213                    display_comma_separated(&index_display.index_columns_with_ordering),
214                    display_comma_separated(&index_display.distributed_by_columns),
215                )
216            } else {
217                format!(
218                    "index({}) include({}) distributed by({})",
219                    display_comma_separated(&index_display.index_columns_with_ordering),
220                    display_comma_separated(&index_display.include_columns),
221                    display_comma_separated(&index_display.distributed_by_columns),
222                )
223            },
224            is_hidden: None,
225            // TODO: index description
226            description: None,
227        }
228    }));
229
230    rows.push(ShowColumnRow {
231        name: "table description".into(),
232        r#type: relname,
233        is_hidden: None,
234        description,
235    });
236
237    // TODO: table name and description as title of response
238    // TODO: recover the original user statement
239    Ok(PgResponse::builder(StatementType::DESCRIBE)
240        .rows(rows)
241        .into())
242}
243
244pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
245    match kind {
246        DescribeKind::Fragments => vec![PgFieldDescriptor::new(
247            "Fragments".to_owned(),
248            DataType::Varchar.to_oid(),
249            DataType::Varchar.type_len(),
250        )],
251        DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
252    }
253}
254
255pub async fn handle_describe_fragments(
256    handler_args: HandlerArgs,
257    object_name: ObjectName,
258) -> Result<RwPgResponse> {
259    let session = handler_args.session.clone();
260    let job_id = {
261        let mut binder = Binder::new_for_system(&session);
262
263        Binder::validate_cross_db_reference(&session.database(), &object_name)?;
264        let not_found_err = CatalogError::NotFound("stream job", object_name.to_string());
265
266        if let Ok(relation) = binder.bind_relation_by_name(object_name.clone(), None, None, false) {
267            match relation {
268                Relation::Source(s) => {
269                    if s.is_shared() {
270                        s.catalog.id
271                    } else {
272                        bail!(ErrorCode::NotSupported(
273                            "non shared source has no fragments to describe".to_owned(),
274                            "Use `DESCRIBE` instead.".to_owned(),
275                        ));
276                    }
277                }
278                Relation::BaseTable(t) => t.table_catalog.id.table_id,
279                Relation::SystemTable(_t) => {
280                    bail!(ErrorCode::NotSupported(
281                        "system table has no fragments to describe".to_owned(),
282                        "Use `DESCRIBE` instead.".to_owned(),
283                    ));
284                }
285                Relation::Share(_s) => {
286                    bail!(ErrorCode::NotSupported(
287                        "view has no fragments to describe".to_owned(),
288                        "Use `DESCRIBE` instead.".to_owned(),
289                    ));
290                }
291                _ => {
292                    // Other relation types (Subquery, Join, etc.) are not directly describable.
293                    return Err(not_found_err.into());
294                }
295            }
296        } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
297            sink.sink_catalog.id.sink_id
298        } else {
299            return Err(not_found_err.into());
300        }
301    };
302
303    let meta_client = session.env().meta_client();
304    let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
305    let res = generate_fragments_string(fragments)?;
306    Ok(res)
307}
308
309/// The implementation largely copied from `crate::utils::stream_graph_formatter::StreamGraphFormatter`.
310/// The input is different, so we need separate implementation.
311fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
312    let mut config = PrettyConfig {
313        need_boundaries: false,
314        width: 80,
315        ..Default::default()
316    };
317
318    let mut max_width = 80;
319
320    let mut blocks = vec![];
321    for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
322        let mut res = String::new();
323        let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
324        res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
325        let node = &fragment.actors[0].node;
326        let node = explain_node(node.as_ref().unwrap(), true);
327        let width = config.unicode(&mut res, &node);
328        max_width = max(width, max_width);
329        config.width = max_width;
330        blocks.push(res);
331        blocks.push("".to_owned());
332    }
333
334    let rows = blocks.iter().map(|b| ExplainRow {
335        query_plan: b.into(),
336    });
337    Ok(PgResponse::builder(StatementType::DESCRIBE)
338        .rows(rows)
339        .into())
340}
341
342fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
343    // TODO: we need extra edge information to get which fragment MergeExecutor connects to.
344    let one_line_explain = node.identity.clone();
345
346    let mut fields = Vec::with_capacity(3);
347    if verbose {
348        fields.push((
349            "output",
350            Pretty::Array(
351                node.fields
352                    .iter()
353                    .map(|f| Pretty::display(f.get_name()))
354                    .collect(),
355            ),
356        ));
357        fields.push((
358            "stream key",
359            Pretty::Array(
360                node.stream_key
361                    .iter()
362                    .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
363                    .collect(),
364            ),
365        ));
366    }
367    let children = node
368        .input
369        .iter()
370        .map(|input| explain_node(input, verbose))
371        .collect();
372    Pretty::simple_record(one_line_explain, fields, children)
373}
374
375pub async fn handle_describe_fragment(
376    handler_args: HandlerArgs,
377    fragment_id: u32,
378) -> Result<RwPgResponse> {
379    let session = handler_args.session.clone();
380    let meta_client = session.env().meta_client();
381    let distribution = &meta_client
382        .get_fragment_by_id(fragment_id)
383        .await?
384        .ok_or_else(|| CatalogError::NotFound("fragment", fragment_id.to_string()))?;
385    let res: PgResponse<super::PgResponseStream> = generate_enhanced_fragment_string(distribution)?;
386    Ok(res)
387}
388
389fn generate_enhanced_fragment_string(fragment_dist: &FragmentDistribution) -> Result<RwPgResponse> {
390    let mut config = PrettyConfig {
391        need_boundaries: false,
392        width: 80,
393        ..Default::default()
394    };
395
396    let mut res = String::new();
397
398    res.push_str(&format!(
399        "Fragment {} (Table {})\n",
400        fragment_dist.fragment_id, fragment_dist.table_id
401    ));
402    let dist_type = fragment::FragmentDistributionType::try_from(fragment_dist.distribution_type)
403        .unwrap_or(fragment::FragmentDistributionType::Unspecified);
404    res.push_str(&format!("Distribution Type: {}\n", dist_type.as_str_name()));
405    res.push_str(&format!("Parallelism: {}\n", fragment_dist.parallelism));
406    res.push_str(&format!("VNode Count: {}\n", fragment_dist.vnode_count));
407
408    if !fragment_dist.state_table_ids.is_empty() {
409        res.push_str(&format!(
410            "State Tables: [{}]\n",
411            fragment_dist
412                .state_table_ids
413                .iter()
414                .map(|id| id.to_string())
415                .collect::<Vec<_>>()
416                .join(", ")
417        ));
418    }
419
420    if !fragment_dist.upstream_fragment_ids.is_empty() {
421        res.push_str(&format!(
422            "Upstream Fragments: [{}]\n",
423            fragment_dist
424                .upstream_fragment_ids
425                .iter()
426                .map(|id| id.to_string())
427                .collect::<Vec<_>>()
428                .join(", ")
429        ));
430    }
431
432    if let Some(node) = &fragment_dist.node {
433        res.push_str("Stream Plan:\n");
434        let node_pretty = explain_node(node, true);
435        config.unicode(&mut res, &node_pretty);
436    }
437
438    let rows = vec![ExplainRow { query_plan: res }];
439
440    Ok(PgResponse::builder(StatementType::DESCRIBE)
441        .rows(rows)
442        .into())
443}
444
445#[cfg(test)]
446mod tests {
447    use std::collections::HashMap;
448    use std::ops::Index;
449
450    use futures_async_stream::for_await;
451
452    use crate::test_utils::LocalFrontend;
453
454    #[tokio::test]
455    async fn test_describe_handler() {
456        let frontend = LocalFrontend::new(Default::default()).await;
457        frontend
458            .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
459            .await
460            .unwrap();
461
462        frontend
463            .run_sql("create index idx1 on t (v1 DESC, v2);")
464            .await
465            .unwrap();
466
467        let sql = "describe t";
468        let mut pg_response = frontend.run_sql(sql).await.unwrap();
469
470        let mut columns = HashMap::new();
471        #[for_await]
472        for row_set in pg_response.values_stream() {
473            let row_set = row_set.unwrap();
474            for row in row_set {
475                columns.insert(
476                    std::str::from_utf8(row.index(0).as_ref().unwrap())
477                        .unwrap()
478                        .to_owned(),
479                    std::str::from_utf8(row.index(1).as_ref().unwrap())
480                        .unwrap()
481                        .to_owned(),
482                );
483            }
484        }
485
486        let expected_columns: HashMap<String, String> = maplit::hashmap! {
487            "v1".into() => "integer".into(),
488            "v2".into() => "integer".into(),
489            "v3".into() => "integer".into(),
490            "v4".into() => "integer".into(),
491            "primary key".into() => "v3".into(),
492            "distribution key".into() => "v3".into(),
493            "_rw_timestamp".into() => "timestamp with time zone".into(),
494            "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
495            "table description".into() => "t".into(),
496        };
497
498        assert_eq!(columns, expected_columns);
499    }
500}