risingwave_frontend/handler/
describe.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::fmt::Display;
17
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pretty_xmlish::{Pretty, PrettyConfig};
22use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
23use risingwave_common::types::{DataType, Fields};
24use risingwave_expr::bail;
25use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
26use risingwave_pb::stream_plan::StreamNode;
27use risingwave_sqlparser::ast::{DescribeKind, ObjectName, display_comma_separated};
28
29use super::explain::ExplainRow;
30use super::show::ShowColumnRow;
31use super::{RwPgResponse, fields_to_descriptors};
32use crate::binder::{Binder, Relation};
33use crate::catalog::CatalogError;
34use crate::error::{ErrorCode, Result};
35use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
36
37pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
38    let session = handler_args.session;
39    let mut binder = Binder::new_for_system(&session);
40
41    Binder::validate_cross_db_reference(&session.database(), &object_name)?;
42    let not_found_err =
43        CatalogError::NotFound("table, source, sink or view", object_name.to_string());
44
45    // Vec<ColumnCatalog>, Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>, String, Option<String>
46    let (columns, pk_columns, dist_columns, indices, relname, description) = if let Ok(relation) =
47        binder.bind_relation_by_name(object_name.clone(), None, None, false)
48    {
49        match relation {
50            Relation::Source(s) => {
51                let pk_column_catalogs = s
52                    .catalog
53                    .pk_col_ids
54                    .iter()
55                    .map(|&column_id| {
56                        s.catalog
57                            .columns
58                            .iter()
59                            .filter(|x| x.column_id() == column_id)
60                            .map(|x| x.column_desc.clone())
61                            .exactly_one()
62                            .unwrap()
63                    })
64                    .collect_vec();
65                (
66                    s.catalog.columns,
67                    pk_column_catalogs,
68                    vec![],
69                    vec![],
70                    s.catalog.name,
71                    None, // Description
72                )
73            }
74            Relation::BaseTable(t) => {
75                let pk_column_catalogs = t
76                    .table_catalog
77                    .pk()
78                    .iter()
79                    .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
80                    .collect_vec();
81                let dist_columns = t
82                    .table_catalog
83                    .distribution_key()
84                    .iter()
85                    .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
86                    .collect_vec();
87                (
88                    t.table_catalog.columns.clone(),
89                    pk_column_catalogs,
90                    dist_columns,
91                    t.table_indexes,
92                    t.table_catalog.name.clone(),
93                    t.table_catalog.description.clone(),
94                )
95            }
96            Relation::SystemTable(t) => {
97                let pk_column_catalogs = t
98                    .sys_table_catalog
99                    .pk
100                    .iter()
101                    .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
102                    .collect_vec();
103                (
104                    t.sys_table_catalog.columns.clone(),
105                    pk_column_catalogs,
106                    vec![],
107                    vec![],
108                    t.sys_table_catalog.name.clone(),
109                    None, // Description
110                )
111            }
112            Relation::Share(_) => {
113                if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
114                    let columns = view
115                        .view_catalog
116                        .columns
117                        .iter()
118                        .enumerate()
119                        .map(|(idx, field)| ColumnCatalog {
120                            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
121                            is_hidden: false,
122                        })
123                        .collect();
124                    (
125                        columns,
126                        vec![],
127                        vec![],
128                        vec![],
129                        view.view_catalog.name.clone(),
130                        None,
131                    )
132                } else {
133                    return Err(not_found_err.into());
134                }
135            }
136            _ => {
137                return Err(not_found_err.into());
138            }
139        }
140    } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
141        let columns = sink.sink_catalog.full_columns().to_vec();
142        let pk_columns = sink
143            .sink_catalog
144            .downstream_pk_indices()
145            .into_iter()
146            .map(|idx| columns[idx].column_desc.clone())
147            .collect_vec();
148        let dist_columns = sink
149            .sink_catalog
150            .distribution_key
151            .iter()
152            .map(|idx| columns[*idx].column_desc.clone())
153            .collect_vec();
154        (
155            columns,
156            pk_columns,
157            dist_columns,
158            vec![],
159            sink.sink_catalog.name.clone(),
160            None,
161        )
162    } else {
163        return Err(not_found_err.into());
164    };
165
166    // Convert all column descs to rows
167    let mut rows = columns
168        .into_iter()
169        .flat_map(ShowColumnRow::from_catalog)
170        .collect_vec();
171
172    fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
173    where
174        T: Display,
175    {
176        format!(
177            "{}",
178            display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
179        )
180    }
181
182    // Convert primary key to rows
183    if !pk_columns.is_empty() {
184        rows.push(ShowColumnRow {
185            name: "primary key".into(),
186            r#type: concat(pk_columns.iter().map(|x| &x.name)),
187            is_hidden: None,
188            description: None,
189        });
190    }
191
192    // Convert distribution keys to rows
193    if !dist_columns.is_empty() {
194        rows.push(ShowColumnRow {
195            name: "distribution key".into(),
196            r#type: concat(dist_columns.iter().map(|x| &x.name)),
197            is_hidden: None,
198            description: None,
199        });
200    }
201
202    // Convert all indexes to rows
203    rows.extend(indices.iter().map(|index| {
204        let index_display = index.display();
205
206        ShowColumnRow {
207            name: index.name.clone(),
208            r#type: if index_display.include_columns.is_empty() {
209                format!(
210                    "index({}) distributed by({})",
211                    display_comma_separated(&index_display.index_columns_with_ordering),
212                    display_comma_separated(&index_display.distributed_by_columns),
213                )
214            } else {
215                format!(
216                    "index({}) include({}) distributed by({})",
217                    display_comma_separated(&index_display.index_columns_with_ordering),
218                    display_comma_separated(&index_display.include_columns),
219                    display_comma_separated(&index_display.distributed_by_columns),
220                )
221            },
222            is_hidden: None,
223            // TODO: index description
224            description: None,
225        }
226    }));
227
228    rows.push(ShowColumnRow {
229        name: "table description".into(),
230        r#type: relname,
231        is_hidden: None,
232        description,
233    });
234
235    // TODO: table name and description as title of response
236    // TODO: recover the original user statement
237    Ok(PgResponse::builder(StatementType::DESCRIBE)
238        .rows(rows)
239        .into())
240}
241
242pub fn infer_describe(kind: &DescribeKind) -> Vec<PgFieldDescriptor> {
243    match kind {
244        DescribeKind::Fragments => vec![PgFieldDescriptor::new(
245            "Fragments".to_owned(),
246            DataType::Varchar.to_oid(),
247            DataType::Varchar.type_len(),
248        )],
249        DescribeKind::Plain => fields_to_descriptors(ShowColumnRow::fields()),
250    }
251}
252pub async fn handle_describe_fragments(
253    handler_args: HandlerArgs,
254    object_name: ObjectName,
255) -> Result<RwPgResponse> {
256    let session = handler_args.session.clone();
257    let job_id = {
258        let mut binder = Binder::new_for_system(&session);
259
260        Binder::validate_cross_db_reference(&session.database(), &object_name)?;
261        let not_found_err = CatalogError::NotFound("stream job", object_name.to_string());
262
263        if let Ok(relation) = binder.bind_relation_by_name(object_name.clone(), None, None, false) {
264            match relation {
265                Relation::Source(s) => {
266                    if s.is_shared() {
267                        s.catalog.id
268                    } else {
269                        bail!(ErrorCode::NotSupported(
270                            "non shared source has no fragments to describe".to_owned(),
271                            "Use `DESCRIBE` instead.".to_owned(),
272                        ));
273                    }
274                }
275                Relation::BaseTable(t) => t.table_catalog.id.table_id,
276                Relation::SystemTable(_t) => {
277                    bail!(ErrorCode::NotSupported(
278                        "system table has no fragments to describe".to_owned(),
279                        "Use `DESCRIBE` instead.".to_owned(),
280                    ));
281                }
282                Relation::Share(_s) => {
283                    bail!(ErrorCode::NotSupported(
284                        "view has no fragments to describe".to_owned(),
285                        "Use `DESCRIBE` instead.".to_owned(),
286                    ));
287                }
288                _ => {
289                    // Other relation types (Subquery, Join, etc.) are not directly describable.
290                    return Err(not_found_err.into());
291                }
292            }
293        } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
294            sink.sink_catalog.id.sink_id
295        } else {
296            return Err(not_found_err.into());
297        }
298    };
299
300    let meta_client = session.env().meta_client();
301    let fragments = &meta_client.list_table_fragments(&[job_id]).await?[&job_id];
302    let res = generate_fragments_string(fragments)?;
303    Ok(res)
304}
305
306/// The implementation largely copied from `crate::utils::stream_graph_formatter::StreamGraphFormatter`.
307/// The input is different, so we need separate implementation.
308fn generate_fragments_string(fragments: &TableFragmentInfo) -> Result<RwPgResponse> {
309    let mut config = PrettyConfig {
310        need_boundaries: false,
311        width: 80,
312        ..Default::default()
313    };
314
315    let mut max_width = 80;
316
317    let mut blocks = vec![];
318    for fragment in fragments.fragments.iter().sorted_by_key(|f| f.id) {
319        let mut res = String::new();
320        let actor_ids = fragment.actors.iter().map(|a| a.id).format(",");
321        res.push_str(&format!("Fragment {} (Actor {})\n", fragment.id, actor_ids));
322        let node = &fragment.actors[0].node;
323        let node = explain_node(node.as_ref().unwrap(), true);
324        let width = config.unicode(&mut res, &node);
325        max_width = max(width, max_width);
326        config.width = max_width;
327        blocks.push(res);
328        blocks.push("".to_owned());
329    }
330
331    let rows = blocks.iter().map(|b| ExplainRow {
332        query_plan: b.into(),
333    });
334    Ok(PgResponse::builder(StatementType::DESCRIBE)
335        .rows(rows)
336        .into())
337}
338
339fn explain_node<'a>(node: &StreamNode, verbose: bool) -> Pretty<'a> {
340    // TODO: we need extra edge information to get which fragment MergeExecutor connects to.
341    let one_line_explain = node.identity.clone();
342
343    let mut fields = Vec::with_capacity(3);
344    if verbose {
345        fields.push((
346            "output",
347            Pretty::Array(
348                node.fields
349                    .iter()
350                    .map(|f| Pretty::display(f.get_name()))
351                    .collect(),
352            ),
353        ));
354        fields.push((
355            "stream key",
356            Pretty::Array(
357                node.stream_key
358                    .iter()
359                    .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
360                    .collect(),
361            ),
362        ));
363    }
364    let children = node
365        .input
366        .iter()
367        .map(|input| explain_node(input, verbose))
368        .collect();
369    Pretty::simple_record(one_line_explain, fields, children)
370}
371
372#[cfg(test)]
373mod tests {
374    use std::collections::HashMap;
375    use std::ops::Index;
376
377    use futures_async_stream::for_await;
378
379    use crate::test_utils::LocalFrontend;
380
381    #[tokio::test]
382    async fn test_describe_handler() {
383        let frontend = LocalFrontend::new(Default::default()).await;
384        frontend
385            .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
386            .await
387            .unwrap();
388
389        frontend
390            .run_sql("create index idx1 on t (v1 DESC, v2);")
391            .await
392            .unwrap();
393
394        let sql = "describe t";
395        let mut pg_response = frontend.run_sql(sql).await.unwrap();
396
397        let mut columns = HashMap::new();
398        #[for_await]
399        for row_set in pg_response.values_stream() {
400            let row_set = row_set.unwrap();
401            for row in row_set {
402                columns.insert(
403                    std::str::from_utf8(row.index(0).as_ref().unwrap())
404                        .unwrap()
405                        .to_owned(),
406                    std::str::from_utf8(row.index(1).as_ref().unwrap())
407                        .unwrap()
408                        .to_owned(),
409                );
410            }
411        }
412
413        let expected_columns: HashMap<String, String> = maplit::hashmap! {
414            "v1".into() => "integer".into(),
415            "v2".into() => "integer".into(),
416            "v3".into() => "integer".into(),
417            "v4".into() => "integer".into(),
418            "primary key".into() => "v3".into(),
419            "distribution key".into() => "v3".into(),
420            "_rw_timestamp".into() => "timestamp with time zone".into(),
421            "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
422            "table description".into() => "t".into(),
423        };
424
425        assert_eq!(columns, expected_columns);
426    }
427}