risingwave_frontend/handler/
describe.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use itertools::Itertools;
18use pgwire::pg_field_descriptor::PgFieldDescriptor;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
21use risingwave_common::types::Fields;
22use risingwave_sqlparser::ast::{ObjectName, display_comma_separated};
23
24use super::show::ShowColumnRow;
25use super::{RwPgResponse, fields_to_descriptors};
26use crate::binder::{Binder, Relation};
27use crate::catalog::CatalogError;
28use crate::error::Result;
29use crate::handler::{HandlerArgs, RwPgResponseBuilderExt};
30
31pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result<RwPgResponse> {
32    let session = handler_args.session;
33    let mut binder = Binder::new_for_system(&session);
34
35    Binder::validate_cross_db_reference(&session.database(), &object_name)?;
36    let not_found_err =
37        CatalogError::NotFound("table, source, sink or view", object_name.to_string());
38
39    // Vec<ColumnCatalog>, Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>, String, Option<String>
40    let (columns, pk_columns, dist_columns, indices, relname, description) = if let Ok(relation) =
41        binder.bind_relation_by_name(object_name.clone(), None, None, false)
42    {
43        match relation {
44            Relation::Source(s) => {
45                let pk_column_catalogs = s
46                    .catalog
47                    .pk_col_ids
48                    .iter()
49                    .map(|&column_id| {
50                        s.catalog
51                            .columns
52                            .iter()
53                            .filter(|x| x.column_id() == column_id)
54                            .map(|x| x.column_desc.clone())
55                            .exactly_one()
56                            .unwrap()
57                    })
58                    .collect_vec();
59                (
60                    s.catalog.columns,
61                    pk_column_catalogs,
62                    vec![],
63                    vec![],
64                    s.catalog.name,
65                    None, // Description
66                )
67            }
68            Relation::BaseTable(t) => {
69                let pk_column_catalogs = t
70                    .table_catalog
71                    .pk()
72                    .iter()
73                    .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone())
74                    .collect_vec();
75                let dist_columns = t
76                    .table_catalog
77                    .distribution_key()
78                    .iter()
79                    .map(|idx| t.table_catalog.columns[*idx].column_desc.clone())
80                    .collect_vec();
81                (
82                    t.table_catalog.columns.clone(),
83                    pk_column_catalogs,
84                    dist_columns,
85                    t.table_indexes,
86                    t.table_catalog.name.clone(),
87                    t.table_catalog.description.clone(),
88                )
89            }
90            Relation::SystemTable(t) => {
91                let pk_column_catalogs = t
92                    .sys_table_catalog
93                    .pk
94                    .iter()
95                    .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone())
96                    .collect_vec();
97                (
98                    t.sys_table_catalog.columns.clone(),
99                    pk_column_catalogs,
100                    vec![],
101                    vec![],
102                    t.sys_table_catalog.name.clone(),
103                    None, // Description
104                )
105            }
106            Relation::Share(_) => {
107                if let Ok(view) = binder.bind_view_by_name(object_name.clone()) {
108                    let columns = view
109                        .view_catalog
110                        .columns
111                        .iter()
112                        .enumerate()
113                        .map(|(idx, field)| ColumnCatalog {
114                            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
115                            is_hidden: false,
116                        })
117                        .collect();
118                    (
119                        columns,
120                        vec![],
121                        vec![],
122                        vec![],
123                        view.view_catalog.name.clone(),
124                        None,
125                    )
126                } else {
127                    return Err(not_found_err.into());
128                }
129            }
130            _ => {
131                return Err(not_found_err.into());
132            }
133        }
134    } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) {
135        let columns = sink.sink_catalog.full_columns().to_vec();
136        let pk_columns = sink
137            .sink_catalog
138            .downstream_pk_indices()
139            .into_iter()
140            .map(|idx| columns[idx].column_desc.clone())
141            .collect_vec();
142        let dist_columns = sink
143            .sink_catalog
144            .distribution_key
145            .iter()
146            .map(|idx| columns[*idx].column_desc.clone())
147            .collect_vec();
148        (
149            columns,
150            pk_columns,
151            dist_columns,
152            vec![],
153            sink.sink_catalog.name.clone(),
154            None,
155        )
156    } else {
157        return Err(not_found_err.into());
158    };
159
160    // Convert all column descs to rows
161    let mut rows = columns
162        .into_iter()
163        .flat_map(ShowColumnRow::from_catalog)
164        .collect_vec();
165
166    fn concat<T>(display_elems: impl IntoIterator<Item = T>) -> String
167    where
168        T: Display,
169    {
170        format!(
171            "{}",
172            display_comma_separated(&display_elems.into_iter().collect::<Vec<_>>())
173        )
174    }
175
176    // Convert primary key to rows
177    if !pk_columns.is_empty() {
178        rows.push(ShowColumnRow {
179            name: "primary key".into(),
180            r#type: concat(pk_columns.iter().map(|x| &x.name)),
181            is_hidden: None,
182            description: None,
183        });
184    }
185
186    // Convert distribution keys to rows
187    if !dist_columns.is_empty() {
188        rows.push(ShowColumnRow {
189            name: "distribution key".into(),
190            r#type: concat(dist_columns.iter().map(|x| &x.name)),
191            is_hidden: None,
192            description: None,
193        });
194    }
195
196    // Convert all indexes to rows
197    rows.extend(indices.iter().map(|index| {
198        let index_display = index.display();
199
200        ShowColumnRow {
201            name: index.name.clone(),
202            r#type: if index_display.include_columns.is_empty() {
203                format!(
204                    "index({}) distributed by({})",
205                    display_comma_separated(&index_display.index_columns_with_ordering),
206                    display_comma_separated(&index_display.distributed_by_columns),
207                )
208            } else {
209                format!(
210                    "index({}) include({}) distributed by({})",
211                    display_comma_separated(&index_display.index_columns_with_ordering),
212                    display_comma_separated(&index_display.include_columns),
213                    display_comma_separated(&index_display.distributed_by_columns),
214                )
215            },
216            is_hidden: None,
217            // TODO: index description
218            description: None,
219        }
220    }));
221
222    rows.push(ShowColumnRow {
223        name: "table description".into(),
224        r#type: relname,
225        is_hidden: None,
226        description,
227    });
228
229    // TODO: table name and description as title of response
230    // TODO: recover the original user statement
231    Ok(PgResponse::builder(StatementType::DESCRIBE)
232        .rows(rows)
233        .into())
234}
235
236pub fn infer_describe() -> Vec<PgFieldDescriptor> {
237    fields_to_descriptors(ShowColumnRow::fields())
238}
239
240#[cfg(test)]
241mod tests {
242    use std::collections::HashMap;
243    use std::ops::Index;
244
245    use futures_async_stream::for_await;
246
247    use crate::test_utils::LocalFrontend;
248
249    #[tokio::test]
250    async fn test_describe_handler() {
251        let frontend = LocalFrontend::new(Default::default()).await;
252        frontend
253            .run_sql("create table t (v1 int, v2 int, v3 int primary key, v4 int);")
254            .await
255            .unwrap();
256
257        frontend
258            .run_sql("create index idx1 on t (v1 DESC, v2);")
259            .await
260            .unwrap();
261
262        let sql = "describe t";
263        let mut pg_response = frontend.run_sql(sql).await.unwrap();
264
265        let mut columns = HashMap::new();
266        #[for_await]
267        for row_set in pg_response.values_stream() {
268            let row_set = row_set.unwrap();
269            for row in row_set {
270                columns.insert(
271                    std::str::from_utf8(row.index(0).as_ref().unwrap())
272                        .unwrap()
273                        .to_owned(),
274                    std::str::from_utf8(row.index(1).as_ref().unwrap())
275                        .unwrap()
276                        .to_owned(),
277                );
278            }
279        }
280
281        let expected_columns: HashMap<String, String> = maplit::hashmap! {
282            "v1".into() => "integer".into(),
283            "v2".into() => "integer".into(),
284            "v3".into() => "integer".into(),
285            "v4".into() => "integer".into(),
286            "primary key".into() => "v3".into(),
287            "distribution key".into() => "v3".into(),
288            "_rw_timestamp".into() => "timestamp with time zone".into(),
289            "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
290            "table description".into() => "t".into(),
291        };
292
293        assert_eq!(columns, expected_columns);
294    }
295}