risingwave_frontend/handler/
show.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
23use risingwave_common::bail_not_implemented;
24use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
25use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
26use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
27use risingwave_common::util::addr::HostAddr;
28use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
29use risingwave_expr::scalar::like::{i_like_default, like_default};
30use risingwave_pb::catalog::connection;
31use risingwave_pb::frontend_service::{
32    GetAllCursorsRequest, GetAllSubCursorsRequest, GetRunningSqlsRequest,
33};
34use risingwave_pb::id::WorkerId;
35use risingwave_rpc_client::FrontendClientPoolRef;
36use risingwave_sqlparser::ast::{
37    Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
38};
39use thiserror_ext::AsReport;
40
41use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
42use crate::binder::{Binder, Relation};
43use crate::catalog::catalog_service::CatalogReadGuard;
44use crate::catalog::root_catalog::SchemaPath;
45use crate::catalog::schema_catalog::SchemaCatalog;
46use crate::catalog::{CatalogError, IndexCatalog};
47use crate::error::{Result, RwError};
48use crate::handler::HandlerArgs;
49use crate::handler::create_connection::print_connection_params;
50use crate::session::{SessionImpl, WorkerProcessId};
51use crate::user::has_access_to_object;
52use crate::user::user_catalog::UserCatalog;
53
54pub fn get_columns_from_table(
55    session: &SessionImpl,
56    table_name: ObjectName,
57) -> Result<Vec<ColumnCatalog>> {
58    let mut binder = Binder::new_for_system(session);
59    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
60    let column_catalogs = match relation {
61        Relation::Source(s) => s.catalog.columns,
62        Relation::BaseTable(t) => t.table_catalog.columns.clone(),
63        Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
64        _ => {
65            return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
66        }
67    };
68
69    Ok(column_catalogs)
70}
71
72pub fn get_columns_from_sink(
73    session: &SessionImpl,
74    sink_name: ObjectName,
75) -> Result<Vec<ColumnCatalog>> {
76    let binder = Binder::new_for_system(session);
77    let sink = binder.bind_sink_by_name(sink_name)?;
78    Ok(sink.sink_catalog.full_columns().to_vec())
79}
80
81pub fn get_columns_from_view(
82    session: &SessionImpl,
83    view_name: ObjectName,
84) -> Result<Vec<ColumnCatalog>> {
85    let binder = Binder::new_for_system(session);
86    let view = binder.bind_view_by_name(view_name)?;
87
88    Ok(view
89        .view_catalog
90        .columns
91        .iter()
92        .enumerate()
93        .map(|(idx, field)| ColumnCatalog {
94            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
95            is_hidden: false,
96        })
97        .collect())
98}
99
100pub fn get_indexes_from_table(
101    session: &SessionImpl,
102    table_name: ObjectName,
103) -> Result<Vec<Arc<IndexCatalog>>> {
104    let mut binder = Binder::new_for_system(session);
105    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
106    let indexes = match relation {
107        Relation::BaseTable(t) => t.table_indexes,
108        _ => {
109            return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
110        }
111    };
112
113    Ok(indexes)
114}
115
116fn schema_or_search_path(
117    session: &Arc<SessionImpl>,
118    schema: &Option<Ident>,
119    search_path: &SearchPath,
120) -> Vec<String> {
121    if let Some(s) = schema {
122        vec![s.real_value()]
123    } else {
124        search_path
125            .real_path()
126            .iter()
127            .map(|s| {
128                if s.eq(USER_NAME_WILD_CARD) {
129                    session.user_name()
130                } else {
131                    s.clone()
132                }
133            })
134            .collect()
135    }
136}
137
138fn iter_schema_items<F, T>(
139    session: &Arc<SessionImpl>,
140    schema: &Option<Ident>,
141    reader: &CatalogReadGuard,
142    current_user: &UserCatalog,
143    mut f: F,
144) -> Vec<T>
145where
146    F: FnMut(&SchemaCatalog) -> Vec<T>,
147{
148    let search_path = session.config().search_path();
149
150    schema_or_search_path(session, schema, &search_path)
151        .into_iter()
152        .filter_map(|schema| {
153            if let Ok(schema_catalog) =
154                reader.get_schema_by_name(&session.database(), schema.as_ref())
155                && (current_user.is_super
156                    || current_user.has_schema_usage_privilege(schema_catalog.id()))
157            {
158                Some(schema_catalog)
159            } else {
160                None
161            }
162        })
163        .flat_map(|s| f(s).into_iter())
164        .collect()
165}
166
167/// Wrapper for `ObjectName` to be used as a field in a row.
168// TODO: replace remaining `name: String` with `name: ObjectNameField`
169struct ObjectNameField(ObjectName);
170
171fn with_schema_name(schema_name: &str, name: &str) -> ObjectNameField {
172    if schema_name.is_empty() {
173        ObjectNameField(ObjectName(vec![name.into()]))
174    } else {
175        ObjectNameField(ObjectName(vec![schema_name.into(), name.into()]))
176    }
177}
178
179impl WithDataType for ObjectNameField {
180    fn default_data_type() -> DataType {
181        DataType::Varchar
182    }
183}
184
185impl ToOwnedDatum for ObjectNameField {
186    fn to_owned_datum(self) -> Datum {
187        Some(self.0.to_string().into())
188    }
189}
190
191#[derive(Fields)]
192#[fields(style = "Title Case")]
193struct ShowObjectRow {
194    name: ObjectNameField,
195}
196
197impl ShowObjectRow {
198    fn base_name(&self) -> String {
199        self.name.0.base_name()
200    }
201}
202
203#[derive(Fields)]
204#[fields(style = "Title Case")]
205struct ShowDatabaseRow {
206    name: String,
207}
208
209#[derive(Fields)]
210#[fields(style = "Title Case")]
211struct ShowSchemaRow {
212    name: String,
213}
214
215#[derive(Fields)]
216#[fields(style = "Title Case")]
217pub struct ShowColumnRow {
218    pub name: ShowColumnName,
219    pub r#type: String,
220    pub is_hidden: Option<String>, // XXX: why not bool?
221    pub description: Option<String>,
222}
223
224#[derive(Clone, Debug)]
225enum ShowColumnNameSegment {
226    Field(Ident),
227    ListElement,
228}
229
230impl ShowColumnNameSegment {
231    pub fn field(name: &str) -> Self {
232        ShowColumnNameSegment::Field(Ident::from_real_value(name))
233    }
234}
235
236/// The name of a column in the output of `SHOW COLUMNS` or `DESCRIBE`.
237#[derive(Clone, Debug)]
238pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
239
240impl ShowColumnName {
241    /// Create a special column name without quoting. Used only for extra information like `primary key`
242    /// in the output of `DESCRIBE`.
243    pub fn special(name: &str) -> Self {
244        ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
245            name,
246        ))])
247    }
248}
249
250impl WithDataType for ShowColumnName {
251    fn default_data_type() -> DataType {
252        DataType::Varchar
253    }
254}
255
256impl ToOwnedDatum for ShowColumnName {
257    fn to_owned_datum(self) -> Datum {
258        use std::fmt::Write;
259
260        let mut s = String::new();
261        for segment in self.0 {
262            match segment {
263                ShowColumnNameSegment::Field(ident) => {
264                    if !s.is_empty() {
265                        // TODO: shall we add parentheses, so that it's valid field access SQL?
266                        s.push('.');
267                    }
268                    write!(s, "{ident}").unwrap();
269                }
270                ShowColumnNameSegment::ListElement => {
271                    s.push_str("[1]");
272                }
273            }
274        }
275        s.to_owned_datum()
276    }
277}
278
279impl ShowColumnRow {
280    /// Create a row with the given information. If the data type is a struct or list,
281    /// flatten the data type to also generate rows for its fields.
282    fn flatten(
283        name: ShowColumnName,
284        data_type: DataType,
285        is_hidden: bool,
286        description: Option<String>,
287    ) -> Vec<Self> {
288        // TODO(struct): use struct's type name once supported.
289        let r#type = match &data_type {
290            DataType::Struct(_) => "struct".to_owned(),
291            DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
292            d => d.to_string(),
293        };
294
295        let mut rows = vec![ShowColumnRow {
296            name: name.clone(),
297            r#type,
298            is_hidden: Some(is_hidden.to_string()),
299            description,
300        }];
301
302        match data_type {
303            DataType::Struct(st) => {
304                rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
305                    let mut name = name.clone();
306                    name.0.push(ShowColumnNameSegment::field(field_name));
307                    Self::flatten(name, field_data_type.clone(), is_hidden, None)
308                }));
309            }
310
311            DataType::List(list) if let DataType::Struct(_) = list.elem() => {
312                let mut name = name.clone();
313                name.0.push(ShowColumnNameSegment::ListElement);
314                rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
315            }
316
317            _ => {}
318        }
319
320        rows
321    }
322
323    pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
324        Self::flatten(
325            ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
326            col.column_desc.data_type,
327            col.is_hidden,
328            col.column_desc.description,
329        )
330    }
331}
332
333#[derive(Fields)]
334#[fields(style = "Title Case")]
335struct ShowConnectionRow {
336    name: ObjectNameField,
337    r#type: String,
338    properties: String,
339}
340
341#[derive(Fields)]
342#[fields(style = "Title Case")]
343struct ShowFunctionRow {
344    name: ObjectNameField,
345    arguments: String,
346    return_type: String,
347    language: String,
348    link: Option<String>,
349}
350
351#[derive(Fields)]
352#[fields(style = "Title Case")]
353struct ShowIndexRow {
354    name: String,
355    on: String,
356    key: String,
357    include: String,
358    distributed_by: String,
359}
360
361impl From<Arc<IndexCatalog>> for ShowIndexRow {
362    fn from(index: Arc<IndexCatalog>) -> Self {
363        let index_display = index.display();
364        ShowIndexRow {
365            name: index.name.clone(),
366            on: index.primary_table.name.clone(),
367            key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
368            include: display_comma_separated(&index_display.include_columns).to_string(),
369            distributed_by: display_comma_separated(&index_display.distributed_by_columns)
370                .to_string(),
371        }
372    }
373}
374
375#[derive(Fields)]
376#[fields(style = "Title Case")]
377struct ShowClusterRow {
378    id: WorkerId,
379    addr: String,
380    r#type: String,
381    state: String,
382    parallelism: Option<i32>,
383    is_streaming: Option<bool>,
384    is_serving: Option<bool>,
385    is_unschedulable: Option<bool>,
386    started_at: Option<Timestamptz>,
387}
388
389#[derive(Fields)]
390#[fields(style = "Title Case")]
391struct ShowJobRow {
392    id: i64,
393    statement: String,
394    create_type: String,
395    progress: String,
396}
397
398#[derive(Fields)]
399#[fields(style = "Title Case")]
400struct ShowProcessListRow {
401    worker_id: String,
402    id: String,
403    user: String,
404    host: String,
405    database: String,
406    time: Option<String>,
407    info: Option<String>,
408}
409
410#[derive(Fields)]
411#[fields(style = "Title Case")]
412struct ShowCreateObjectRow {
413    name: String,
414    create_sql: String,
415}
416
417#[derive(Fields)]
418#[fields(style = "Title Case")]
419struct ShowSubscriptionRow {
420    name: ObjectNameField,
421    retention_seconds: i64,
422}
423
424#[derive(Fields)]
425#[fields(style = "Title Case")]
426struct ShowCursorRow {
427    worker_id: String,
428    session_id: String,
429    user: String,
430    host: String,
431    database: String,
432    cursor_name: String,
433    info: Option<String>,
434}
435
436#[derive(Fields)]
437#[fields(style = "Title Case")]
438struct ShowSubscriptionCursorRow {
439    worker_id: String,
440    session_id: String,
441    user: String,
442    host: String,
443    database: String,
444    cursor_name: String,
445    subscription_name: String,
446    state: String,
447    idle_duration_ms: i64,
448    info: Option<String>,
449}
450
451/// Infer the row description for different show objects.
452pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
453    fields_to_descriptors(match objects {
454        ShowObject::Database => ShowDatabaseRow::fields(),
455        ShowObject::Schema => ShowSchemaRow::fields(),
456        ShowObject::Columns { .. } => ShowColumnRow::fields(),
457        ShowObject::Connection { .. } => ShowConnectionRow::fields(),
458        ShowObject::Function { .. } => ShowFunctionRow::fields(),
459        ShowObject::Indexes { .. } => ShowIndexRow::fields(),
460        ShowObject::Cluster => ShowClusterRow::fields(),
461        ShowObject::Jobs => ShowJobRow::fields(),
462        ShowObject::ProcessList => ShowProcessListRow::fields(),
463        ShowObject::Cursor => ShowCursorRow::fields(),
464        ShowObject::SubscriptionCursor => ShowSubscriptionCursorRow::fields(),
465        ShowObject::Subscription { .. } => ShowSubscriptionRow::fields(),
466
467        ShowObject::Table { .. }
468        | ShowObject::InternalTable { .. }
469        | ShowObject::View { .. }
470        | ShowObject::MaterializedView { .. }
471        | ShowObject::Source { .. }
472        | ShowObject::Sink { .. }
473        | ShowObject::Secret { .. } => ShowObjectRow::fields(),
474    })
475}
476
477pub async fn handle_show_object(
478    handler_args: HandlerArgs,
479    command: ShowObject,
480    filter: Option<ShowStatementFilter>,
481) -> Result<RwPgResponse> {
482    let session = handler_args.session;
483
484    if let Some(ShowStatementFilter::Where(..)) = filter {
485        bail_not_implemented!("WHERE clause in SHOW statement");
486    }
487
488    let catalog_reader = session.env().catalog_reader();
489    let user_reader = session.env().user_info_reader();
490    let get_catalog_reader = || {
491        let reader = catalog_reader.read_guard();
492        let user_reader = user_reader.read_guard();
493        let current_user = user_reader
494            .get_user_by_name(&session.user_name())
495            .expect("user not found")
496            .clone();
497        (reader, current_user)
498    };
499
500    let rows: Vec<ShowObjectRow> = match command {
501        ShowObject::Table { schema } => {
502            let (reader, current_user) = get_catalog_reader();
503            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
504                schema
505                    .iter_user_table()
506                    .map(|t| with_schema_name(&schema.name, &t.name))
507                    .map(|name| ShowObjectRow { name })
508                    .collect()
509            })
510        }
511        ShowObject::InternalTable { schema } => {
512            let (reader, current_user) = get_catalog_reader();
513            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
514                schema
515                    .iter_internal_table()
516                    .map(|t| with_schema_name(&schema.name, &t.name))
517                    .map(|name| ShowObjectRow { name })
518                    .collect()
519            })
520        }
521        ShowObject::Database => {
522            let reader = catalog_reader.read_guard();
523            let rows = reader
524                .get_all_database_names()
525                .into_iter()
526                .map(|name| ShowDatabaseRow { name });
527            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
528                .rows(rows)
529                .into());
530        }
531        ShowObject::Schema => {
532            let reader = catalog_reader.read_guard();
533            let rows = reader
534                .get_all_schema_names(&session.database())?
535                .into_iter()
536                .map(|name| ShowSchemaRow { name });
537            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
538                .rows(rows)
539                .into());
540        }
541        ShowObject::View { schema } => {
542            let (reader, current_user) = get_catalog_reader();
543            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
544                schema
545                    .iter_view()
546                    .map(|t| with_schema_name(&schema.name, &t.name))
547                    .map(|name| ShowObjectRow { name })
548                    .collect()
549            })
550        }
551        ShowObject::MaterializedView { schema } => {
552            let (reader, current_user) = get_catalog_reader();
553            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
554                schema
555                    .iter_created_mvs()
556                    .map(|t| with_schema_name(&schema.name, &t.name))
557                    .map(|name| ShowObjectRow { name })
558                    .collect()
559            })
560        }
561        ShowObject::Source { schema } => {
562            let (reader, current_user) = get_catalog_reader();
563            let mut sources =
564                iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
565                    schema
566                        .iter_source()
567                        .map(|t| with_schema_name(&schema.name, &t.name))
568                        .map(|name| ShowObjectRow { name })
569                        .collect()
570                });
571            sources.extend(
572                session
573                    .temporary_source_manager()
574                    .keys()
575                    .into_iter()
576                    .map(|t| with_schema_name("", &t))
577                    .map(|name| ShowObjectRow { name }),
578            );
579            sources
580        }
581        ShowObject::Sink { schema } => {
582            let (reader, current_user) = get_catalog_reader();
583            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
584                schema
585                    .iter_sink()
586                    .map(|t| with_schema_name(&schema.name, &t.name))
587                    .map(|name| ShowObjectRow { name })
588                    .collect()
589            })
590        }
591        ShowObject::Subscription { schema } => {
592            let (reader, current_user) = get_catalog_reader();
593            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
594                schema
595                    .iter_subscription()
596                    .map(|t| ShowSubscriptionRow {
597                        name: with_schema_name(&schema.name, &t.name),
598                        retention_seconds: t.retention_seconds as i64,
599                    })
600                    .collect()
601            });
602            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
603                .rows(rows)
604                .into());
605        }
606        ShowObject::Secret { schema } => {
607            let (reader, current_user) = get_catalog_reader();
608            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
609                schema
610                    .iter_secret()
611                    .map(|t| with_schema_name(&schema.name, &t.name))
612                    .map(|name| ShowObjectRow { name })
613                    .collect()
614            })
615        }
616        ShowObject::Columns { table } => {
617            let Ok(columns) = get_columns_from_table(&session, table.clone())
618                .or(get_columns_from_sink(&session, table.clone()))
619                .or(get_columns_from_view(&session, table.clone()))
620            else {
621                return Err(CatalogError::not_found(
622                    "table, source, sink or view",
623                    table.to_string(),
624                )
625                .into());
626            };
627
628            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
629                .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
630                .into());
631        }
632        ShowObject::Indexes { table } => {
633            let indexes = get_indexes_from_table(&session, table)?;
634
635            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
636                .rows(indexes.into_iter().map(ShowIndexRow::from))
637                .into());
638        }
639        ShowObject::Connection { schema } => {
640            let (reader, current_user) = get_catalog_reader();
641            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
642                schema.iter_connections()
643                .map(|c| {
644                    let name = c.name.clone();
645                    let r#type = match &c.info {
646                        connection::Info::PrivateLinkService(_) => {
647                            PRIVATELINK_CONNECTION.to_owned()
648                        },
649                        connection::Info::ConnectionParams(params) => {
650                            params.get_connection_type().unwrap().as_str_name().to_owned()
651                        }
652                    };
653                    let source_names = schema
654                        .get_source_ids_by_connection(c.id)
655                        .unwrap_or_default()
656                        .into_iter()
657                        .filter_map(|sid| schema.get_source_by_id(sid).map(|catalog| catalog.name.as_str()))
658                        .collect_vec();
659                    let sink_names = schema
660                        .get_sink_ids_by_connection(c.id)
661                        .unwrap_or_default()
662                        .into_iter()
663                        .filter_map(|sid| schema.get_sink_by_id(sid).map(|catalog| catalog.name.as_str()))
664                        .collect_vec();
665                    let properties = match &c.info {
666                        connection::Info::PrivateLinkService(i) => {
667                            format!(
668                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
669                                i.get_provider().unwrap().as_str_name(),
670                                i.service_name,
671                                i.endpoint_id,
672                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
673                                serde_json::to_string(&source_names).unwrap(),
674                                serde_json::to_string(&sink_names).unwrap(),
675                            )
676                        }
677                        connection::Info::ConnectionParams(params) => {
678                            // todo: show dep relations
679                            print_connection_params(&session.database(), params, &reader)
680                        }
681                    };
682                    ShowConnectionRow {
683                        name: with_schema_name(&schema.name, &name),
684                        r#type,
685                        properties,
686                    }
687                }).collect_vec()
688            });
689            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
690                .rows(rows)
691                .into());
692        }
693        ShowObject::Function { schema } => {
694            let (reader, current_user) = get_catalog_reader();
695            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
696                schema
697                    .iter_function()
698                    .map(|t| ShowFunctionRow {
699                        name: with_schema_name(&schema.name, &t.name),
700                        arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
701                        return_type: t.return_type.to_string(),
702                        language: t.language.clone(),
703                        link: t.link.clone(),
704                    })
705                    .collect()
706            });
707            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
708                .rows(rows)
709                .into());
710        }
711        ShowObject::Cluster => {
712            let workers = session.env().meta_client().list_all_nodes().await?;
713            let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
714                let addr: HostAddr = worker.host.as_ref().unwrap().into();
715                let property = worker.property.as_ref();
716                ShowClusterRow {
717                    id: worker.id,
718                    addr: addr.to_string(),
719                    r#type: worker.get_type().unwrap().as_str_name().into(),
720                    state: worker.get_state().unwrap().as_str_name().to_owned(),
721                    parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
722                    is_streaming: property.map(|p| p.is_streaming),
723                    is_serving: property.map(|p| p.is_serving),
724                    is_unschedulable: property.map(|p| p.is_unschedulable),
725                    started_at: worker
726                        .started_at
727                        .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
728                }
729            });
730            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
731                .rows(rows)
732                .into());
733        }
734        ShowObject::Jobs => {
735            let resp = session.env().meta_client().get_ddl_progress().await?;
736            let rows = resp.into_iter().map(|job| ShowJobRow {
737                id: job.id as i64,
738                statement: job.statement,
739                create_type: job.create_type,
740                progress: job.progress,
741            });
742            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
743                .rows(rows)
744                .into());
745        }
746        ShowObject::ProcessList => {
747            let rows = show_process_list_impl(
748                session.env().frontend_client_pool(),
749                session.env().worker_node_manager_ref(),
750            )
751            .await;
752            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
753                .rows(rows)
754                .into());
755        }
756        ShowObject::Cursor => {
757            let rows = show_all_cursors_impl(
758                session.env().frontend_client_pool(),
759                session.env().worker_node_manager_ref(),
760            )
761            .await;
762            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
763                .rows(rows)
764                .into());
765        }
766        ShowObject::SubscriptionCursor => {
767            let rows = show_all_sub_cursors_impl(
768                session.env().frontend_client_pool(),
769                session.env().worker_node_manager_ref(),
770            )
771            .await;
772
773            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
774                .rows(rows)
775                .into());
776        }
777    };
778
779    // Apply filters.
780    let rows = rows.into_iter().filter(|row| match &filter {
781        Some(ShowStatementFilter::Like(pattern)) => like_default(&row.base_name(), pattern),
782        Some(ShowStatementFilter::ILike(pattern)) => i_like_default(&row.base_name(), pattern),
783        Some(ShowStatementFilter::Where(..)) => unreachable!(),
784        None => true,
785    });
786
787    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
788        .rows(rows)
789        .into())
790}
791
792pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
793    fields_to_descriptors(ShowCreateObjectRow::fields())
794}
795
796pub fn handle_show_create_object(
797    handle_args: HandlerArgs,
798    show_create_type: ShowCreateType,
799    name: ObjectName,
800) -> Result<RwPgResponse> {
801    let session = handle_args.session;
802    let catalog_reader = session.env().catalog_reader().read_guard();
803    let database = session.database();
804    let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
805    let search_path = session.config().search_path();
806    let user_name = &session.user_name();
807    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
808    let user_reader = session.env().user_info_reader().read_guard();
809    let current_user = user_reader
810        .get_user_by_name(user_name)
811        .expect("user not found");
812
813    let (sql, schema_name) = match show_create_type {
814        ShowCreateType::MaterializedView => {
815            let (mv, schema) = schema_path
816                .try_find(|schema_name| {
817                    Ok::<_, RwError>(
818                        catalog_reader
819                            .get_schema_by_name(&database, schema_name)?
820                            .get_created_table_by_name(&object_name)
821                            .filter(|t| {
822                                t.is_mview() && has_access_to_object(current_user, t.id, t.owner)
823                            }),
824                    )
825                })?
826                .ok_or_else(|| CatalogError::not_found("materialized view", name.to_string()))?;
827            (mv.create_sql(), schema)
828        }
829        ShowCreateType::View => {
830            let (view, schema) =
831                catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
832            if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
833                return Err(CatalogError::not_found("view", name.to_string()).into());
834            }
835            (view.create_sql(schema.to_owned()), schema)
836        }
837        ShowCreateType::Table => {
838            let (table, schema) = schema_path
839                .try_find(|schema_name| {
840                    Ok::<_, RwError>(
841                        catalog_reader
842                            .get_schema_by_name(&database, schema_name)?
843                            .get_created_table_by_name(&object_name)
844                            .filter(|t| {
845                                t.is_user_table()
846                                    && has_access_to_object(current_user, t.id, t.owner)
847                            }),
848                    )
849                })?
850                .ok_or_else(|| CatalogError::not_found("table", name.to_string()))?;
851
852            (table.create_sql_purified(), schema)
853        }
854        ShowCreateType::Sink => {
855            let (sink, schema) =
856                catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
857            if !has_access_to_object(current_user, sink.id, sink.owner) {
858                return Err(CatalogError::not_found("sink", name.to_string()).into());
859            }
860            (sink.create_sql(), schema)
861        }
862        ShowCreateType::Source => {
863            let (source, schema) = schema_path
864                .try_find(|schema_name| {
865                    Ok::<_, RwError>(
866                        catalog_reader
867                            .get_schema_by_name(&database, schema_name)?
868                            .get_source_by_name(&object_name)
869                            .filter(|s| {
870                                s.associated_table_id.is_none()
871                                    && has_access_to_object(current_user, s.id, s.owner)
872                            }),
873                    )
874                })?
875                .ok_or_else(|| CatalogError::not_found("source", name.to_string()))?;
876            (source.create_sql_purified(), schema)
877        }
878        ShowCreateType::Index => {
879            let (index, schema) = schema_path
880                .try_find(|schema_name| {
881                    Ok::<_, RwError>(
882                        catalog_reader
883                            .get_schema_by_name(&database, schema_name)?
884                            .get_created_table_by_name(&object_name)
885                            .filter(|t| {
886                                t.is_index() && has_access_to_object(current_user, t.id, t.owner)
887                            }),
888                    )
889                })?
890                .ok_or_else(|| CatalogError::not_found("index", name.to_string()))?;
891            (index.create_sql(), schema)
892        }
893        ShowCreateType::Function => {
894            bail_not_implemented!("show create on: {}", show_create_type);
895        }
896        ShowCreateType::Subscription => {
897            let (subscription, schema) =
898                catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
899            if !has_access_to_object(current_user, subscription.id, subscription.owner) {
900                return Err(CatalogError::not_found("subscription", name.to_string()).into());
901            }
902            (subscription.create_sql(), schema)
903        }
904    };
905    let name = format!("{}.{}", schema_name, object_name);
906
907    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
908        .rows([ShowCreateObjectRow {
909            name,
910            create_sql: sql,
911        }])
912        .into())
913}
914
915async fn show_all_sub_cursors_impl(
916    frontend_client_pool: FrontendClientPoolRef,
917    worker_node_manager: WorkerNodeManagerRef,
918) -> Vec<ShowSubscriptionCursorRow> {
919    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowSubscriptionCursorRow> {
920        vec![ShowSubscriptionCursorRow {
921            worker_id: format!("{}", worker_id),
922            session_id: "".to_owned(),
923            user: "".to_owned(),
924            host: "".to_owned(),
925            database: "".to_owned(),
926            cursor_name: "".to_owned(),
927            subscription_name: "".to_owned(),
928            state: "".to_owned(),
929            idle_duration_ms: 0,
930            info: Some(format!(
931                "Failed to show subscription cursor from worker {worker_id} due to: {err_msg}"
932            )),
933        }]
934    }
935
936    let futures = worker_node_manager
937        .list_frontend_nodes()
938        .into_iter()
939        .map(|worker| {
940            let frontend_client_pool_ = frontend_client_pool.clone();
941            async move {
942                let client = match frontend_client_pool_.get(&worker).await {
943                    Ok(client) => client,
944                    Err(e) => {
945                        return on_error(worker.id, format!("{}", e.as_report()));
946                    }
947                };
948
949                let resp = match client.get_all_sub_cursors(GetAllSubCursorsRequest {}).await {
950                    Ok(resp) => resp,
951                    Err(e) => {
952                        return on_error(worker.id, format!("{}", e.as_report()));
953                    }
954                };
955
956                resp.into_inner()
957                    .subscription_cursors
958                    .into_iter()
959                    .flat_map(|sub_cursor| {
960                        let worker_id = worker.id.to_string();
961                        let session_id = sub_cursor.session_id.to_string();
962
963                        let user = sub_cursor.user_name;
964                        let host = sub_cursor.peer_addr;
965                        let database = sub_cursor.database;
966
967                        let size = sub_cursor.states.len();
968                        let mut sub_cursors = vec![];
969                        for index in 0..size {
970                            sub_cursors.push(ShowSubscriptionCursorRow {
971                                worker_id: worker_id.clone(),
972                                session_id: session_id.clone(),
973                                user: user.clone(),
974                                host: host.clone(),
975                                database: database.clone(),
976                                cursor_name: sub_cursor.cursor_names[index].clone(),
977                                subscription_name: sub_cursor.subscription_names[index].clone(),
978                                state: sub_cursor.states[index].clone(),
979                                idle_duration_ms: sub_cursor.idle_durations[index] as i64,
980                                info: None,
981                            })
982                        }
983                        sub_cursors
984                    })
985                    .collect_vec()
986            }
987        })
988        .collect_vec();
989    join_all(futures).await.into_iter().flatten().collect()
990}
991
992async fn show_all_cursors_impl(
993    frontend_client_pool: FrontendClientPoolRef,
994    worker_node_manager: WorkerNodeManagerRef,
995) -> Vec<ShowCursorRow> {
996    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowCursorRow> {
997        vec![ShowCursorRow {
998            worker_id: format!("{}", worker_id),
999            session_id: "".to_owned(),
1000            user: "".to_owned(),
1001            host: "".to_owned(),
1002            database: "".to_owned(),
1003            cursor_name: "".to_owned(),
1004            info: Some(format!(
1005                "Failed to show cursor from worker {worker_id} due to: {err_msg}"
1006            )),
1007        }]
1008    }
1009    let futures = worker_node_manager
1010        .list_frontend_nodes()
1011        .into_iter()
1012        .map(|worker| {
1013            let frontend_client_pool_ = frontend_client_pool.clone();
1014            async move {
1015                let client = match frontend_client_pool_.get(&worker).await {
1016                    Ok(client) => client,
1017                    Err(e) => {
1018                        return on_error(worker.id, format!("{}", e.as_report()));
1019                    }
1020                };
1021
1022                let resp = match client.get_all_cursors(GetAllCursorsRequest {}).await {
1023                    Ok(resp) => resp,
1024                    Err(e) => {
1025                        return on_error(worker.id, format!("{}", e.as_report()));
1026                    }
1027                };
1028
1029                resp.into_inner()
1030                    .all_cursors
1031                    .into_iter()
1032                    .flat_map(|cursors| {
1033                        let worker_id = worker.id.to_string();
1034                        let session_id = cursors.session_id.to_string();
1035
1036                        let user = cursors.user_name;
1037                        let host = cursors.peer_addr;
1038                        let database = cursors.database;
1039
1040                        cursors
1041                            .cursor_names
1042                            .into_iter()
1043                            .map(|cursor_name| ShowCursorRow {
1044                                worker_id: worker_id.clone(),
1045                                session_id: session_id.clone(),
1046                                user: user.clone(),
1047                                host: host.clone(),
1048                                database: database.clone(),
1049                                cursor_name,
1050                                info: None,
1051                            })
1052                            .collect_vec()
1053                    })
1054                    .collect_vec()
1055            }
1056        })
1057        .collect_vec();
1058    join_all(futures).await.into_iter().flatten().collect()
1059}
1060
1061async fn show_process_list_impl(
1062    frontend_client_pool: FrontendClientPoolRef,
1063    worker_node_manager: WorkerNodeManagerRef,
1064) -> Vec<ShowProcessListRow> {
1065    // Create a placeholder row for the worker in case of any errors while fetching its running SQLs.
1066    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowProcessListRow> {
1067        vec![ShowProcessListRow {
1068            worker_id: format!("{}", worker_id),
1069            id: "".to_owned(),
1070            user: "".to_owned(),
1071            host: "".to_owned(),
1072            database: "".to_owned(),
1073            time: None,
1074            info: Some(format!(
1075                "Failed to show process list from worker {worker_id} due to: {err_msg}"
1076            )),
1077        }]
1078    }
1079    let futures = worker_node_manager
1080        .list_frontend_nodes()
1081        .into_iter()
1082        .map(|worker| {
1083            let frontend_client_pool_ = frontend_client_pool.clone();
1084            async move {
1085                let client = match frontend_client_pool_.get(&worker).await {
1086                    Ok(client) => client,
1087                    Err(e) => {
1088                        return on_error(worker.id, format!("{}", e.as_report()));
1089                    }
1090                };
1091                let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
1092                    Ok(resp) => resp,
1093                    Err(e) => {
1094                        return on_error(worker.id, format!("{}", e.as_report()));
1095                    }
1096                };
1097                resp.into_inner()
1098                    .running_sqls
1099                    .into_iter()
1100                    .map(|sql| ShowProcessListRow {
1101                        worker_id: format!("{}", worker.id),
1102                        id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
1103                        user: sql.user_name,
1104                        host: sql.peer_addr,
1105                        database: sql.database,
1106                        time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
1107                        info: sql
1108                            .sql
1109                            .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
1110                    })
1111                    .collect_vec()
1112            }
1113        })
1114        .collect_vec();
1115    join_all(futures).await.into_iter().flatten().collect()
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120    use std::ops::Index;
1121
1122    use futures_async_stream::for_await;
1123
1124    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1125
1126    #[tokio::test]
1127    async fn test_show_source() {
1128        let frontend = LocalFrontend::new(Default::default()).await;
1129
1130        let sql = r#"CREATE SOURCE t1 (column1 varchar)
1131        WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1132        FORMAT PLAIN ENCODE JSON"#;
1133        frontend.run_sql(sql).await.unwrap();
1134
1135        let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
1136        rows.sort();
1137        assert_eq!(rows, vec!["Row([Some(b\"public.t1\")])".to_owned(),]);
1138    }
1139
1140    #[tokio::test]
1141    async fn test_show_column() {
1142        let proto_file = create_proto_file(PROTO_FILE_DATA);
1143        let sql = format!(
1144            r#"CREATE SOURCE t
1145    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1146    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1147            proto_file.path().to_str().unwrap()
1148        );
1149        let frontend = LocalFrontend::new(Default::default()).await;
1150        frontend.run_sql(sql).await.unwrap();
1151
1152        let sql = "show columns from t";
1153        let mut pg_response = frontend.run_sql(sql).await.unwrap();
1154
1155        let mut columns = Vec::new();
1156        #[for_await]
1157        for row_set in pg_response.values_stream() {
1158            let row_set = row_set.unwrap();
1159            for row in row_set {
1160                columns.push((
1161                    std::str::from_utf8(row.index(0).as_ref().unwrap())
1162                        .unwrap()
1163                        .to_owned(),
1164                    std::str::from_utf8(row.index(1).as_ref().unwrap())
1165                        .unwrap()
1166                        .to_owned(),
1167                ));
1168            }
1169        }
1170
1171        expect_test::expect![[r#"
1172            [
1173                (
1174                    "id",
1175                    "integer",
1176                ),
1177                (
1178                    "country",
1179                    "struct",
1180                ),
1181                (
1182                    "country.address",
1183                    "character varying",
1184                ),
1185                (
1186                    "country.city",
1187                    "struct",
1188                ),
1189                (
1190                    "country.city.address",
1191                    "character varying",
1192                ),
1193                (
1194                    "country.city.zipcode",
1195                    "character varying",
1196                ),
1197                (
1198                    "country.zipcode",
1199                    "character varying",
1200                ),
1201                (
1202                    "zipcode",
1203                    "bigint",
1204                ),
1205                (
1206                    "rate",
1207                    "real",
1208                ),
1209                (
1210                    "_rw_kafka_timestamp",
1211                    "timestamp with time zone",
1212                ),
1213                (
1214                    "_rw_kafka_partition",
1215                    "character varying",
1216                ),
1217                (
1218                    "_rw_kafka_offset",
1219                    "character varying",
1220                ),
1221                (
1222                    "_row_id",
1223                    "serial",
1224                ),
1225            ]
1226        "#]]
1227        .assert_debug_eq(&columns);
1228    }
1229}