risingwave_frontend/handler/
show.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::pg_server::Session;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::GetRunningSqlsRequest;
33use risingwave_rpc_client::FrontendClientPoolRef;
34use risingwave_sqlparser::ast::{
35    Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
36};
37use thiserror_ext::AsReport;
38
39use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
40use crate::binder::{Binder, Relation};
41use crate::catalog::catalog_service::CatalogReadGuard;
42use crate::catalog::root_catalog::SchemaPath;
43use crate::catalog::schema_catalog::SchemaCatalog;
44use crate::catalog::{CatalogError, IndexCatalog};
45use crate::error::{Result, RwError};
46use crate::handler::HandlerArgs;
47use crate::handler::create_connection::print_connection_params;
48use crate::session::cursor_manager::SubscriptionCursor;
49use crate::session::{SessionImpl, WorkerProcessId};
50use crate::user::has_access_to_object;
51use crate::user::user_catalog::UserCatalog;
52
53pub fn get_columns_from_table(
54    session: &SessionImpl,
55    table_name: ObjectName,
56) -> Result<Vec<ColumnCatalog>> {
57    let mut binder = Binder::new_for_system(session);
58    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
59    let column_catalogs = match relation {
60        Relation::Source(s) => s.catalog.columns,
61        Relation::BaseTable(t) => t.table_catalog.columns.clone(),
62        Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
63        _ => {
64            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
65        }
66    };
67
68    Ok(column_catalogs)
69}
70
71pub fn get_columns_from_sink(
72    session: &SessionImpl,
73    sink_name: ObjectName,
74) -> Result<Vec<ColumnCatalog>> {
75    let binder = Binder::new_for_system(session);
76    let sink = binder.bind_sink_by_name(sink_name)?;
77    Ok(sink.sink_catalog.full_columns().to_vec())
78}
79
80pub fn get_columns_from_view(
81    session: &SessionImpl,
82    view_name: ObjectName,
83) -> Result<Vec<ColumnCatalog>> {
84    let binder = Binder::new_for_system(session);
85    let view = binder.bind_view_by_name(view_name)?;
86
87    Ok(view
88        .view_catalog
89        .columns
90        .iter()
91        .enumerate()
92        .map(|(idx, field)| ColumnCatalog {
93            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
94            is_hidden: false,
95        })
96        .collect())
97}
98
99pub fn get_indexes_from_table(
100    session: &SessionImpl,
101    table_name: ObjectName,
102) -> Result<Vec<Arc<IndexCatalog>>> {
103    let mut binder = Binder::new_for_system(session);
104    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
105    let indexes = match relation {
106        Relation::BaseTable(t) => t.table_indexes,
107        _ => {
108            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
109        }
110    };
111
112    Ok(indexes)
113}
114
115fn schema_or_search_path(
116    session: &Arc<SessionImpl>,
117    schema: &Option<Ident>,
118    search_path: &SearchPath,
119) -> Vec<String> {
120    if let Some(s) = schema {
121        vec![s.real_value()]
122    } else {
123        search_path
124            .real_path()
125            .iter()
126            .map(|s| {
127                if s.eq(USER_NAME_WILD_CARD) {
128                    session.user_name()
129                } else {
130                    s.clone()
131                }
132            })
133            .collect()
134    }
135}
136
137fn iter_schema_items<F, T>(
138    session: &Arc<SessionImpl>,
139    schema: &Option<Ident>,
140    reader: &CatalogReadGuard,
141    current_user: &UserCatalog,
142    mut f: F,
143) -> Vec<T>
144where
145    F: FnMut(&SchemaCatalog) -> Vec<T>,
146{
147    let search_path = session.config().search_path();
148
149    schema_or_search_path(session, schema, &search_path)
150        .into_iter()
151        .filter_map(|schema| {
152            if let Ok(schema_catalog) =
153                reader.get_schema_by_name(&session.database(), schema.as_ref())
154                && (current_user.is_super
155                    || current_user.has_schema_usage_privilege(schema_catalog.id()))
156            {
157                Some(schema_catalog)
158            } else {
159                None
160            }
161        })
162        .flat_map(|s| f(s).into_iter())
163        .collect()
164}
165
166/// Wrapper for `ObjectName` to be used as a field in a row.
167// TODO: replace remaining `name: String` with `name: ObjectNameField`
168struct ObjectNameField(ObjectName);
169
170fn with_schema_name(schema_name: &str, name: &str) -> ObjectNameField {
171    if schema_name.is_empty() {
172        ObjectNameField(ObjectName(vec![name.into()]))
173    } else {
174        ObjectNameField(ObjectName(vec![schema_name.into(), name.into()]))
175    }
176}
177
178impl WithDataType for ObjectNameField {
179    fn default_data_type() -> DataType {
180        DataType::Varchar
181    }
182}
183
184impl ToOwnedDatum for ObjectNameField {
185    fn to_owned_datum(self) -> Datum {
186        Some(self.0.to_string().into())
187    }
188}
189
190#[derive(Fields)]
191#[fields(style = "Title Case")]
192struct ShowObjectRow {
193    name: ObjectNameField,
194}
195
196impl ShowObjectRow {
197    fn base_name(&self) -> String {
198        self.name.0.base_name()
199    }
200}
201
202#[derive(Fields)]
203#[fields(style = "Title Case")]
204struct ShowDatabaseRow {
205    name: String,
206}
207
208#[derive(Fields)]
209#[fields(style = "Title Case")]
210struct ShowSchemaRow {
211    name: String,
212}
213
214#[derive(Fields)]
215#[fields(style = "Title Case")]
216pub struct ShowColumnRow {
217    pub name: ShowColumnName,
218    pub r#type: String,
219    pub is_hidden: Option<String>, // XXX: why not bool?
220    pub description: Option<String>,
221}
222
223#[derive(Clone, Debug)]
224enum ShowColumnNameSegment {
225    Field(Ident),
226    ListElement,
227}
228
229impl ShowColumnNameSegment {
230    pub fn field(name: &str) -> Self {
231        ShowColumnNameSegment::Field(Ident::from_real_value(name))
232    }
233}
234
235/// The name of a column in the output of `SHOW COLUMNS` or `DESCRIBE`.
236#[derive(Clone, Debug)]
237pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
238
239impl ShowColumnName {
240    /// Create a special column name without quoting. Used only for extra information like `primary key`
241    /// in the output of `DESCRIBE`.
242    pub fn special(name: &str) -> Self {
243        ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
244            name,
245        ))])
246    }
247}
248
249impl WithDataType for ShowColumnName {
250    fn default_data_type() -> DataType {
251        DataType::Varchar
252    }
253}
254
255impl ToOwnedDatum for ShowColumnName {
256    fn to_owned_datum(self) -> Datum {
257        use std::fmt::Write;
258
259        let mut s = String::new();
260        for segment in self.0 {
261            match segment {
262                ShowColumnNameSegment::Field(ident) => {
263                    if !s.is_empty() {
264                        // TODO: shall we add parentheses, so that it's valid field access SQL?
265                        s.push('.');
266                    }
267                    write!(s, "{ident}").unwrap();
268                }
269                ShowColumnNameSegment::ListElement => {
270                    s.push_str("[1]");
271                }
272            }
273        }
274        s.to_owned_datum()
275    }
276}
277
278impl ShowColumnRow {
279    /// Create a row with the given information. If the data type is a struct or list,
280    /// flatten the data type to also generate rows for its fields.
281    fn flatten(
282        name: ShowColumnName,
283        data_type: DataType,
284        is_hidden: bool,
285        description: Option<String>,
286    ) -> Vec<Self> {
287        // TODO(struct): use struct's type name once supported.
288        let r#type = match &data_type {
289            DataType::Struct(_) => "struct".to_owned(),
290            DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
291            d => d.to_string(),
292        };
293
294        let mut rows = vec![ShowColumnRow {
295            name: name.clone(),
296            r#type,
297            is_hidden: Some(is_hidden.to_string()),
298            description,
299        }];
300
301        match data_type {
302            DataType::Struct(st) => {
303                rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
304                    let mut name = name.clone();
305                    name.0.push(ShowColumnNameSegment::field(field_name));
306                    Self::flatten(name, field_data_type.clone(), is_hidden, None)
307                }));
308            }
309
310            DataType::List(list) if let DataType::Struct(_) = list.elem() => {
311                let mut name = name.clone();
312                name.0.push(ShowColumnNameSegment::ListElement);
313                rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
314            }
315
316            _ => {}
317        }
318
319        rows
320    }
321
322    pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
323        Self::flatten(
324            ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
325            col.column_desc.data_type,
326            col.is_hidden,
327            col.column_desc.description,
328        )
329    }
330}
331
332#[derive(Fields)]
333#[fields(style = "Title Case")]
334struct ShowConnectionRow {
335    name: ObjectNameField,
336    r#type: String,
337    properties: String,
338}
339
340#[derive(Fields)]
341#[fields(style = "Title Case")]
342struct ShowFunctionRow {
343    name: ObjectNameField,
344    arguments: String,
345    return_type: String,
346    language: String,
347    link: Option<String>,
348}
349
350#[derive(Fields)]
351#[fields(style = "Title Case")]
352struct ShowIndexRow {
353    name: String,
354    on: String,
355    key: String,
356    include: String,
357    distributed_by: String,
358}
359
360impl From<Arc<IndexCatalog>> for ShowIndexRow {
361    fn from(index: Arc<IndexCatalog>) -> Self {
362        let index_display = index.display();
363        ShowIndexRow {
364            name: index.name.clone(),
365            on: index.primary_table.name.clone(),
366            key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
367            include: display_comma_separated(&index_display.include_columns).to_string(),
368            distributed_by: display_comma_separated(&index_display.distributed_by_columns)
369                .to_string(),
370        }
371    }
372}
373
374#[derive(Fields)]
375#[fields(style = "Title Case")]
376struct ShowClusterRow {
377    id: i32,
378    addr: String,
379    r#type: String,
380    state: String,
381    parallelism: Option<i32>,
382    is_streaming: Option<bool>,
383    is_serving: Option<bool>,
384    is_unschedulable: Option<bool>,
385    started_at: Option<Timestamptz>,
386}
387
388#[derive(Fields)]
389#[fields(style = "Title Case")]
390struct ShowJobRow {
391    id: i64,
392    statement: String,
393    create_type: String,
394    progress: String,
395}
396
397#[derive(Fields)]
398#[fields(style = "Title Case")]
399struct ShowProcessListRow {
400    worker_id: String,
401    id: String,
402    user: String,
403    host: String,
404    database: String,
405    time: Option<String>,
406    info: Option<String>,
407}
408
409#[derive(Fields)]
410#[fields(style = "Title Case")]
411struct ShowCreateObjectRow {
412    name: String,
413    create_sql: String,
414}
415
416#[derive(Fields)]
417#[fields(style = "Title Case")]
418struct ShowSubscriptionRow {
419    name: ObjectNameField,
420    retention_seconds: i64,
421}
422
423#[derive(Fields)]
424#[fields(style = "Title Case")]
425struct ShowCursorRow {
426    session_id: String,
427    user: String,
428    host: String,
429    database: String,
430    cursor_name: String,
431}
432
433#[derive(Fields)]
434#[fields(style = "Title Case")]
435struct ShowSubscriptionCursorRow {
436    session_id: String,
437    user: String,
438    host: String,
439    database: String,
440    cursor_name: String,
441    subscription_name: String,
442    state: String,
443    idle_duration_ms: i64,
444}
445
446/// Infer the row description for different show objects.
447pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
448    fields_to_descriptors(match objects {
449        ShowObject::Database => ShowDatabaseRow::fields(),
450        ShowObject::Schema => ShowSchemaRow::fields(),
451        ShowObject::Columns { .. } => ShowColumnRow::fields(),
452        ShowObject::Connection { .. } => ShowConnectionRow::fields(),
453        ShowObject::Function { .. } => ShowFunctionRow::fields(),
454        ShowObject::Indexes { .. } => ShowIndexRow::fields(),
455        ShowObject::Cluster => ShowClusterRow::fields(),
456        ShowObject::Jobs => ShowJobRow::fields(),
457        ShowObject::ProcessList => ShowProcessListRow::fields(),
458        ShowObject::Cursor => ShowCursorRow::fields(),
459        ShowObject::SubscriptionCursor => ShowSubscriptionCursorRow::fields(),
460        ShowObject::Subscription { .. } => ShowSubscriptionRow::fields(),
461
462        ShowObject::Table { .. }
463        | ShowObject::InternalTable { .. }
464        | ShowObject::View { .. }
465        | ShowObject::MaterializedView { .. }
466        | ShowObject::Source { .. }
467        | ShowObject::Sink { .. }
468        | ShowObject::Secret { .. } => ShowObjectRow::fields(),
469    })
470}
471
472pub async fn handle_show_object(
473    handler_args: HandlerArgs,
474    command: ShowObject,
475    filter: Option<ShowStatementFilter>,
476) -> Result<RwPgResponse> {
477    let session = handler_args.session;
478
479    if let Some(ShowStatementFilter::Where(..)) = filter {
480        bail_not_implemented!("WHERE clause in SHOW statement");
481    }
482
483    let catalog_reader = session.env().catalog_reader();
484    let user_reader = session.env().user_info_reader();
485    let get_catalog_reader = || {
486        let reader = catalog_reader.read_guard();
487        let user_reader = user_reader.read_guard();
488        let current_user = user_reader
489            .get_user_by_name(&session.user_name())
490            .expect("user not found")
491            .clone();
492        (reader, current_user)
493    };
494
495    let rows: Vec<ShowObjectRow> = match command {
496        ShowObject::Table { schema } => {
497            let (reader, current_user) = get_catalog_reader();
498            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
499                schema
500                    .iter_user_table()
501                    .map(|t| with_schema_name(&schema.name, &t.name))
502                    .map(|name| ShowObjectRow { name })
503                    .collect()
504            })
505        }
506        ShowObject::InternalTable { schema } => {
507            let (reader, current_user) = get_catalog_reader();
508            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
509                schema
510                    .iter_internal_table()
511                    .map(|t| with_schema_name(&schema.name, &t.name))
512                    .map(|name| ShowObjectRow { name })
513                    .collect()
514            })
515        }
516        ShowObject::Database => {
517            let reader = catalog_reader.read_guard();
518            let rows = reader
519                .get_all_database_names()
520                .into_iter()
521                .map(|name| ShowDatabaseRow { name });
522            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
523                .rows(rows)
524                .into());
525        }
526        ShowObject::Schema => {
527            let reader = catalog_reader.read_guard();
528            let rows = reader
529                .get_all_schema_names(&session.database())?
530                .into_iter()
531                .map(|name| ShowSchemaRow { name });
532            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
533                .rows(rows)
534                .into());
535        }
536        ShowObject::View { schema } => {
537            let (reader, current_user) = get_catalog_reader();
538            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
539                schema
540                    .iter_view()
541                    .map(|t| with_schema_name(&schema.name, &t.name))
542                    .map(|name| ShowObjectRow { name })
543                    .collect()
544            })
545        }
546        ShowObject::MaterializedView { schema } => {
547            let (reader, current_user) = get_catalog_reader();
548            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
549                schema
550                    .iter_created_mvs()
551                    .map(|t| with_schema_name(&schema.name, &t.name))
552                    .map(|name| ShowObjectRow { name })
553                    .collect()
554            })
555        }
556        ShowObject::Source { schema } => {
557            let (reader, current_user) = get_catalog_reader();
558            let mut sources =
559                iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
560                    schema
561                        .iter_source()
562                        .map(|t| with_schema_name(&schema.name, &t.name))
563                        .map(|name| ShowObjectRow { name })
564                        .collect()
565                });
566            sources.extend(
567                session
568                    .temporary_source_manager()
569                    .keys()
570                    .into_iter()
571                    .map(|t| with_schema_name("", &t))
572                    .map(|name| ShowObjectRow { name }),
573            );
574            sources
575        }
576        ShowObject::Sink { schema } => {
577            let (reader, current_user) = get_catalog_reader();
578            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
579                schema
580                    .iter_sink()
581                    .map(|t| with_schema_name(&schema.name, &t.name))
582                    .map(|name| ShowObjectRow { name })
583                    .collect()
584            })
585        }
586        ShowObject::Subscription { schema } => {
587            let (reader, current_user) = get_catalog_reader();
588            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
589                schema
590                    .iter_subscription()
591                    .map(|t| ShowSubscriptionRow {
592                        name: with_schema_name(&schema.name, &t.name),
593                        retention_seconds: t.retention_seconds as i64,
594                    })
595                    .collect()
596            });
597            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
598                .rows(rows)
599                .into());
600        }
601        ShowObject::Secret { schema } => {
602            let (reader, current_user) = get_catalog_reader();
603            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
604                schema
605                    .iter_secret()
606                    .map(|t| with_schema_name(&schema.name, &t.name))
607                    .map(|name| ShowObjectRow { name })
608                    .collect()
609            })
610        }
611        ShowObject::Columns { table } => {
612            let Ok(columns) = get_columns_from_table(&session, table.clone())
613                .or(get_columns_from_sink(&session, table.clone()))
614                .or(get_columns_from_view(&session, table.clone()))
615            else {
616                return Err(CatalogError::NotFound(
617                    "table, source, sink or view",
618                    table.to_string(),
619                )
620                .into());
621            };
622
623            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
624                .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
625                .into());
626        }
627        ShowObject::Indexes { table } => {
628            let indexes = get_indexes_from_table(&session, table)?;
629
630            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
631                .rows(indexes.into_iter().map(ShowIndexRow::from))
632                .into());
633        }
634        ShowObject::Connection { schema } => {
635            let (reader, current_user) = get_catalog_reader();
636            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
637                schema.iter_connections()
638                .map(|c| {
639                    let name = c.name.clone();
640                    let r#type = match &c.info {
641                        connection::Info::PrivateLinkService(_) => {
642                            PRIVATELINK_CONNECTION.to_owned()
643                        },
644                        connection::Info::ConnectionParams(params) => {
645                            params.get_connection_type().unwrap().as_str_name().to_owned()
646                        }
647                    };
648                    let source_names = schema
649                        .get_source_ids_by_connection(c.id)
650                        .unwrap_or_default()
651                        .into_iter()
652                        .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
653                        .collect_vec();
654                    let sink_names = schema
655                        .get_sink_ids_by_connection(c.id)
656                        .unwrap_or_default()
657                        .into_iter()
658                        .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
659                        .collect_vec();
660                    let properties = match &c.info {
661                        connection::Info::PrivateLinkService(i) => {
662                            format!(
663                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
664                                i.get_provider().unwrap().as_str_name(),
665                                i.service_name,
666                                i.endpoint_id,
667                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
668                                serde_json::to_string(&source_names).unwrap(),
669                                serde_json::to_string(&sink_names).unwrap(),
670                            )
671                        }
672                        connection::Info::ConnectionParams(params) => {
673                            // todo: show dep relations
674                            print_connection_params(&session.database(), params, &reader)
675                        }
676                    };
677                    ShowConnectionRow {
678                        name: with_schema_name(&schema.name, &name),
679                        r#type,
680                        properties,
681                    }
682                }).collect_vec()
683            });
684            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
685                .rows(rows)
686                .into());
687        }
688        ShowObject::Function { schema } => {
689            let (reader, current_user) = get_catalog_reader();
690            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
691                schema
692                    .iter_function()
693                    .map(|t| ShowFunctionRow {
694                        name: with_schema_name(&schema.name, &t.name),
695                        arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
696                        return_type: t.return_type.to_string(),
697                        language: t.language.clone(),
698                        link: t.link.clone(),
699                    })
700                    .collect()
701            });
702            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
703                .rows(rows)
704                .into());
705        }
706        ShowObject::Cluster => {
707            let workers = session.env().meta_client().list_all_nodes().await?;
708            let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
709                let addr: HostAddr = worker.host.as_ref().unwrap().into();
710                let property = worker.property.as_ref();
711                ShowClusterRow {
712                    id: worker.id as _,
713                    addr: addr.to_string(),
714                    r#type: worker.get_type().unwrap().as_str_name().into(),
715                    state: worker.get_state().unwrap().as_str_name().to_owned(),
716                    parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
717                    is_streaming: property.map(|p| p.is_streaming),
718                    is_serving: property.map(|p| p.is_serving),
719                    is_unschedulable: property.map(|p| p.is_unschedulable),
720                    started_at: worker
721                        .started_at
722                        .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
723                }
724            });
725            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
726                .rows(rows)
727                .into());
728        }
729        ShowObject::Jobs => {
730            let resp = session.env().meta_client().get_ddl_progress().await?;
731            let rows = resp.into_iter().map(|job| ShowJobRow {
732                id: job.id as i64,
733                statement: job.statement,
734                create_type: job.create_type,
735                progress: job.progress,
736            });
737            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
738                .rows(rows)
739                .into());
740        }
741        ShowObject::ProcessList => {
742            let rows = show_process_list_impl(
743                session.env().frontend_client_pool(),
744                session.env().worker_node_manager_ref(),
745            )
746            .await;
747            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
748                .rows(rows)
749                .into());
750        }
751        ShowObject::Cursor => {
752            let sessions = session
753                .env()
754                .sessions_map()
755                .read()
756                .values()
757                .cloned()
758                .collect_vec();
759            let mut rows = vec![];
760            for s in sessions {
761                let session_id = format!("{}", s.id().0);
762                let user = s.user_name();
763                let host = format!("{}", s.peer_addr());
764                let database = s.database();
765
766                s.get_cursor_manager()
767                    .iter_query_cursors(|cursor_name: &String, _| {
768                        rows.push(ShowCursorRow {
769                            session_id: session_id.clone(),
770                            user: user.clone(),
771                            host: host.clone(),
772                            database: database.clone(),
773                            cursor_name: cursor_name.to_owned(),
774                        });
775                    })
776                    .await;
777            }
778            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
779                .rows(rows)
780                .into());
781        }
782        ShowObject::SubscriptionCursor => {
783            let sessions = session
784                .env()
785                .sessions_map()
786                .read()
787                .values()
788                .cloned()
789                .collect_vec();
790            let mut rows = vec![];
791            for s in sessions {
792                let session_id = format!("{}", s.id().0);
793                let user = s.user_name();
794                let host = format!("{}", s.peer_addr());
795                let database = s.database().clone();
796
797                s.get_cursor_manager()
798                    .iter_subscription_cursors(
799                        |cursor_name: &String, cursor: &SubscriptionCursor| {
800                            rows.push(ShowSubscriptionCursorRow {
801                                session_id: session_id.clone(),
802                                user: user.clone(),
803                                host: host.clone(),
804                                database: database.clone(),
805                                cursor_name: cursor_name.to_owned(),
806                                subscription_name: cursor.subscription_name().to_owned(),
807                                state: cursor.state_info_string(),
808                                idle_duration_ms: cursor.idle_duration().as_millis() as i64,
809                            });
810                        },
811                    )
812                    .await;
813            }
814
815            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
816                .rows(rows)
817                .into());
818        }
819    };
820
821    // Apply filters.
822    let rows = rows.into_iter().filter(|row| match &filter {
823        Some(ShowStatementFilter::Like(pattern)) => like_default(&row.base_name(), pattern),
824        Some(ShowStatementFilter::ILike(pattern)) => i_like_default(&row.base_name(), pattern),
825        Some(ShowStatementFilter::Where(..)) => unreachable!(),
826        None => true,
827    });
828
829    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
830        .rows(rows)
831        .into())
832}
833
834pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
835    fields_to_descriptors(ShowCreateObjectRow::fields())
836}
837
838pub fn handle_show_create_object(
839    handle_args: HandlerArgs,
840    show_create_type: ShowCreateType,
841    name: ObjectName,
842) -> Result<RwPgResponse> {
843    let session = handle_args.session;
844    let catalog_reader = session.env().catalog_reader().read_guard();
845    let database = session.database();
846    let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
847    let search_path = session.config().search_path();
848    let user_name = &session.user_name();
849    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
850    let user_reader = session.env().user_info_reader().read_guard();
851    let current_user = user_reader
852        .get_user_by_name(user_name)
853        .expect("user not found");
854
855    let (sql, schema_name) = match show_create_type {
856        ShowCreateType::MaterializedView => {
857            let (mv, schema) = schema_path
858                .try_find(|schema_name| {
859                    Ok::<_, RwError>(
860                        catalog_reader
861                            .get_schema_by_name(&database, schema_name)?
862                            .get_created_table_by_name(&object_name)
863                            .filter(|t| {
864                                t.is_mview()
865                                    && has_access_to_object(current_user, t.id.table_id, t.owner)
866                            }),
867                    )
868                })?
869                .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
870            (mv.create_sql(), schema)
871        }
872        ShowCreateType::View => {
873            let (view, schema) =
874                catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
875            if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
876                return Err(CatalogError::NotFound("view", name.to_string()).into());
877            }
878            (view.create_sql(schema.to_owned()), schema)
879        }
880        ShowCreateType::Table => {
881            let (table, schema) = schema_path
882                .try_find(|schema_name| {
883                    Ok::<_, RwError>(
884                        catalog_reader
885                            .get_schema_by_name(&database, schema_name)?
886                            .get_created_table_by_name(&object_name)
887                            .filter(|t| {
888                                t.is_user_table()
889                                    && has_access_to_object(current_user, t.id.table_id, t.owner)
890                            }),
891                    )
892                })?
893                .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
894
895            (table.create_sql_purified(), schema)
896        }
897        ShowCreateType::Sink => {
898            let (sink, schema) =
899                catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
900            if !has_access_to_object(current_user, sink.id.sink_id, sink.owner.user_id) {
901                return Err(CatalogError::NotFound("sink", name.to_string()).into());
902            }
903            (sink.create_sql(), schema)
904        }
905        ShowCreateType::Source => {
906            let (source, schema) = schema_path
907                .try_find(|schema_name| {
908                    Ok::<_, RwError>(
909                        catalog_reader
910                            .get_schema_by_name(&database, schema_name)?
911                            .get_source_by_name(&object_name)
912                            .filter(|s| {
913                                s.associated_table_id.is_none()
914                                    && has_access_to_object(current_user, s.id, s.owner)
915                            }),
916                    )
917                })?
918                .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
919            (source.create_sql_purified(), schema)
920        }
921        ShowCreateType::Index => {
922            let (index, schema) = schema_path
923                .try_find(|schema_name| {
924                    Ok::<_, RwError>(
925                        catalog_reader
926                            .get_schema_by_name(&database, schema_name)?
927                            .get_created_table_by_name(&object_name)
928                            .filter(|t| {
929                                t.is_index()
930                                    && has_access_to_object(current_user, t.id.table_id, t.owner)
931                            }),
932                    )
933                })?
934                .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
935            (index.create_sql(), schema)
936        }
937        ShowCreateType::Function => {
938            bail_not_implemented!("show create on: {}", show_create_type);
939        }
940        ShowCreateType::Subscription => {
941            let (subscription, schema) =
942                catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
943            if !has_access_to_object(
944                current_user,
945                subscription.id.subscription_id,
946                subscription.owner.user_id,
947            ) {
948                return Err(CatalogError::NotFound("subscription", name.to_string()).into());
949            }
950            (subscription.create_sql(), schema)
951        }
952    };
953    let name = format!("{}.{}", schema_name, object_name);
954
955    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
956        .rows([ShowCreateObjectRow {
957            name,
958            create_sql: sql,
959        }])
960        .into())
961}
962
963async fn show_process_list_impl(
964    frontend_client_pool: FrontendClientPoolRef,
965    worker_node_manager: WorkerNodeManagerRef,
966) -> Vec<ShowProcessListRow> {
967    // Create a placeholder row for the worker in case of any errors while fetching its running SQLs.
968    fn on_error(worker_id: u32, err_msg: String) -> Vec<ShowProcessListRow> {
969        vec![ShowProcessListRow {
970            worker_id: format!("{}", worker_id),
971            id: "".to_owned(),
972            user: "".to_owned(),
973            host: "".to_owned(),
974            database: "".to_owned(),
975            time: None,
976            info: Some(format!(
977                "Failed to show process list from worker {worker_id} due to: {err_msg}"
978            )),
979        }]
980    }
981    let futures = worker_node_manager
982        .list_frontend_nodes()
983        .into_iter()
984        .map(|worker| {
985            let frontend_client_pool_ = frontend_client_pool.clone();
986            async move {
987                let client = match frontend_client_pool_.get(&worker).await {
988                    Ok(client) => client,
989                    Err(e) => {
990                        return on_error(worker.id, format!("{}", e.as_report()));
991                    }
992                };
993                let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
994                    Ok(resp) => resp,
995                    Err(e) => {
996                        return on_error(worker.id, format!("{}", e.as_report()));
997                    }
998                };
999                resp.into_inner()
1000                    .running_sqls
1001                    .into_iter()
1002                    .map(|sql| ShowProcessListRow {
1003                        worker_id: format!("{}", worker.id),
1004                        id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
1005                        user: sql.user_name,
1006                        host: sql.peer_addr,
1007                        database: sql.database,
1008                        time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
1009                        info: sql
1010                            .sql
1011                            .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
1012                    })
1013                    .collect_vec()
1014            }
1015        })
1016        .collect_vec();
1017    join_all(futures).await.into_iter().flatten().collect()
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022    use std::ops::Index;
1023
1024    use futures_async_stream::for_await;
1025
1026    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1027
1028    #[tokio::test]
1029    async fn test_show_source() {
1030        let frontend = LocalFrontend::new(Default::default()).await;
1031
1032        let sql = r#"CREATE SOURCE t1 (column1 varchar)
1033        WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1034        FORMAT PLAIN ENCODE JSON"#;
1035        frontend.run_sql(sql).await.unwrap();
1036
1037        let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
1038        rows.sort();
1039        assert_eq!(rows, vec!["Row([Some(b\"public.t1\")])".to_owned(),]);
1040    }
1041
1042    #[tokio::test]
1043    async fn test_show_column() {
1044        let proto_file = create_proto_file(PROTO_FILE_DATA);
1045        let sql = format!(
1046            r#"CREATE SOURCE t
1047    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1048    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1049            proto_file.path().to_str().unwrap()
1050        );
1051        let frontend = LocalFrontend::new(Default::default()).await;
1052        frontend.run_sql(sql).await.unwrap();
1053
1054        let sql = "show columns from t";
1055        let mut pg_response = frontend.run_sql(sql).await.unwrap();
1056
1057        let mut columns = Vec::new();
1058        #[for_await]
1059        for row_set in pg_response.values_stream() {
1060            let row_set = row_set.unwrap();
1061            for row in row_set {
1062                columns.push((
1063                    std::str::from_utf8(row.index(0).as_ref().unwrap())
1064                        .unwrap()
1065                        .to_owned(),
1066                    std::str::from_utf8(row.index(1).as_ref().unwrap())
1067                        .unwrap()
1068                        .to_owned(),
1069                ));
1070            }
1071        }
1072
1073        expect_test::expect![[r#"
1074            [
1075                (
1076                    "id",
1077                    "integer",
1078                ),
1079                (
1080                    "country",
1081                    "struct",
1082                ),
1083                (
1084                    "country.address",
1085                    "character varying",
1086                ),
1087                (
1088                    "country.city",
1089                    "struct",
1090                ),
1091                (
1092                    "country.city.address",
1093                    "character varying",
1094                ),
1095                (
1096                    "country.city.zipcode",
1097                    "character varying",
1098                ),
1099                (
1100                    "country.zipcode",
1101                    "character varying",
1102                ),
1103                (
1104                    "zipcode",
1105                    "bigint",
1106                ),
1107                (
1108                    "rate",
1109                    "real",
1110                ),
1111                (
1112                    "_rw_kafka_timestamp",
1113                    "timestamp with time zone",
1114                ),
1115                (
1116                    "_rw_kafka_partition",
1117                    "character varying",
1118                ),
1119                (
1120                    "_rw_kafka_offset",
1121                    "character varying",
1122                ),
1123                (
1124                    "_row_id",
1125                    "serial",
1126                ),
1127            ]
1128        "#]]
1129        .assert_debug_eq(&columns);
1130    }
1131}