risingwave_frontend/handler/
show.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::pg_server::Session;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::GetRunningSqlsRequest;
33use risingwave_pb::id::WorkerId;
34use risingwave_rpc_client::FrontendClientPoolRef;
35use risingwave_sqlparser::ast::{
36    Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
37};
38use thiserror_ext::AsReport;
39
40use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
41use crate::binder::{Binder, Relation};
42use crate::catalog::catalog_service::CatalogReadGuard;
43use crate::catalog::root_catalog::SchemaPath;
44use crate::catalog::schema_catalog::SchemaCatalog;
45use crate::catalog::{CatalogError, IndexCatalog};
46use crate::error::{Result, RwError};
47use crate::handler::HandlerArgs;
48use crate::handler::create_connection::print_connection_params;
49use crate::session::cursor_manager::SubscriptionCursor;
50use crate::session::{SessionImpl, WorkerProcessId};
51use crate::user::has_access_to_object;
52use crate::user::user_catalog::UserCatalog;
53
54pub fn get_columns_from_table(
55    session: &SessionImpl,
56    table_name: ObjectName,
57) -> Result<Vec<ColumnCatalog>> {
58    let mut binder = Binder::new_for_system(session);
59    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
60    let column_catalogs = match relation {
61        Relation::Source(s) => s.catalog.columns,
62        Relation::BaseTable(t) => t.table_catalog.columns.clone(),
63        Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
64        _ => {
65            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
66        }
67    };
68
69    Ok(column_catalogs)
70}
71
72pub fn get_columns_from_sink(
73    session: &SessionImpl,
74    sink_name: ObjectName,
75) -> Result<Vec<ColumnCatalog>> {
76    let binder = Binder::new_for_system(session);
77    let sink = binder.bind_sink_by_name(sink_name)?;
78    Ok(sink.sink_catalog.full_columns().to_vec())
79}
80
81pub fn get_columns_from_view(
82    session: &SessionImpl,
83    view_name: ObjectName,
84) -> Result<Vec<ColumnCatalog>> {
85    let binder = Binder::new_for_system(session);
86    let view = binder.bind_view_by_name(view_name)?;
87
88    Ok(view
89        .view_catalog
90        .columns
91        .iter()
92        .enumerate()
93        .map(|(idx, field)| ColumnCatalog {
94            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
95            is_hidden: false,
96        })
97        .collect())
98}
99
100pub fn get_indexes_from_table(
101    session: &SessionImpl,
102    table_name: ObjectName,
103) -> Result<Vec<Arc<IndexCatalog>>> {
104    let mut binder = Binder::new_for_system(session);
105    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
106    let indexes = match relation {
107        Relation::BaseTable(t) => t.table_indexes,
108        _ => {
109            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
110        }
111    };
112
113    Ok(indexes)
114}
115
116fn schema_or_search_path(
117    session: &Arc<SessionImpl>,
118    schema: &Option<Ident>,
119    search_path: &SearchPath,
120) -> Vec<String> {
121    if let Some(s) = schema {
122        vec![s.real_value()]
123    } else {
124        search_path
125            .real_path()
126            .iter()
127            .map(|s| {
128                if s.eq(USER_NAME_WILD_CARD) {
129                    session.user_name()
130                } else {
131                    s.clone()
132                }
133            })
134            .collect()
135    }
136}
137
138fn iter_schema_items<F, T>(
139    session: &Arc<SessionImpl>,
140    schema: &Option<Ident>,
141    reader: &CatalogReadGuard,
142    current_user: &UserCatalog,
143    mut f: F,
144) -> Vec<T>
145where
146    F: FnMut(&SchemaCatalog) -> Vec<T>,
147{
148    let search_path = session.config().search_path();
149
150    schema_or_search_path(session, schema, &search_path)
151        .into_iter()
152        .filter_map(|schema| {
153            if let Ok(schema_catalog) =
154                reader.get_schema_by_name(&session.database(), schema.as_ref())
155                && (current_user.is_super
156                    || current_user.has_schema_usage_privilege(schema_catalog.id()))
157            {
158                Some(schema_catalog)
159            } else {
160                None
161            }
162        })
163        .flat_map(|s| f(s).into_iter())
164        .collect()
165}
166
167/// Wrapper for `ObjectName` to be used as a field in a row.
168// TODO: replace remaining `name: String` with `name: ObjectNameField`
169struct ObjectNameField(ObjectName);
170
171fn with_schema_name(schema_name: &str, name: &str) -> ObjectNameField {
172    if schema_name.is_empty() {
173        ObjectNameField(ObjectName(vec![name.into()]))
174    } else {
175        ObjectNameField(ObjectName(vec![schema_name.into(), name.into()]))
176    }
177}
178
179impl WithDataType for ObjectNameField {
180    fn default_data_type() -> DataType {
181        DataType::Varchar
182    }
183}
184
185impl ToOwnedDatum for ObjectNameField {
186    fn to_owned_datum(self) -> Datum {
187        Some(self.0.to_string().into())
188    }
189}
190
191#[derive(Fields)]
192#[fields(style = "Title Case")]
193struct ShowObjectRow {
194    name: ObjectNameField,
195}
196
197impl ShowObjectRow {
198    fn base_name(&self) -> String {
199        self.name.0.base_name()
200    }
201}
202
203#[derive(Fields)]
204#[fields(style = "Title Case")]
205struct ShowDatabaseRow {
206    name: String,
207}
208
209#[derive(Fields)]
210#[fields(style = "Title Case")]
211struct ShowSchemaRow {
212    name: String,
213}
214
215#[derive(Fields)]
216#[fields(style = "Title Case")]
217pub struct ShowColumnRow {
218    pub name: ShowColumnName,
219    pub r#type: String,
220    pub is_hidden: Option<String>, // XXX: why not bool?
221    pub description: Option<String>,
222}
223
224#[derive(Clone, Debug)]
225enum ShowColumnNameSegment {
226    Field(Ident),
227    ListElement,
228}
229
230impl ShowColumnNameSegment {
231    pub fn field(name: &str) -> Self {
232        ShowColumnNameSegment::Field(Ident::from_real_value(name))
233    }
234}
235
236/// The name of a column in the output of `SHOW COLUMNS` or `DESCRIBE`.
237#[derive(Clone, Debug)]
238pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
239
240impl ShowColumnName {
241    /// Create a special column name without quoting. Used only for extra information like `primary key`
242    /// in the output of `DESCRIBE`.
243    pub fn special(name: &str) -> Self {
244        ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
245            name,
246        ))])
247    }
248}
249
250impl WithDataType for ShowColumnName {
251    fn default_data_type() -> DataType {
252        DataType::Varchar
253    }
254}
255
256impl ToOwnedDatum for ShowColumnName {
257    fn to_owned_datum(self) -> Datum {
258        use std::fmt::Write;
259
260        let mut s = String::new();
261        for segment in self.0 {
262            match segment {
263                ShowColumnNameSegment::Field(ident) => {
264                    if !s.is_empty() {
265                        // TODO: shall we add parentheses, so that it's valid field access SQL?
266                        s.push('.');
267                    }
268                    write!(s, "{ident}").unwrap();
269                }
270                ShowColumnNameSegment::ListElement => {
271                    s.push_str("[1]");
272                }
273            }
274        }
275        s.to_owned_datum()
276    }
277}
278
279impl ShowColumnRow {
280    /// Create a row with the given information. If the data type is a struct or list,
281    /// flatten the data type to also generate rows for its fields.
282    fn flatten(
283        name: ShowColumnName,
284        data_type: DataType,
285        is_hidden: bool,
286        description: Option<String>,
287    ) -> Vec<Self> {
288        // TODO(struct): use struct's type name once supported.
289        let r#type = match &data_type {
290            DataType::Struct(_) => "struct".to_owned(),
291            DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
292            d => d.to_string(),
293        };
294
295        let mut rows = vec![ShowColumnRow {
296            name: name.clone(),
297            r#type,
298            is_hidden: Some(is_hidden.to_string()),
299            description,
300        }];
301
302        match data_type {
303            DataType::Struct(st) => {
304                rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
305                    let mut name = name.clone();
306                    name.0.push(ShowColumnNameSegment::field(field_name));
307                    Self::flatten(name, field_data_type.clone(), is_hidden, None)
308                }));
309            }
310
311            DataType::List(list) if let DataType::Struct(_) = list.elem() => {
312                let mut name = name.clone();
313                name.0.push(ShowColumnNameSegment::ListElement);
314                rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
315            }
316
317            _ => {}
318        }
319
320        rows
321    }
322
323    pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
324        Self::flatten(
325            ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
326            col.column_desc.data_type,
327            col.is_hidden,
328            col.column_desc.description,
329        )
330    }
331}
332
333#[derive(Fields)]
334#[fields(style = "Title Case")]
335struct ShowConnectionRow {
336    name: ObjectNameField,
337    r#type: String,
338    properties: String,
339}
340
341#[derive(Fields)]
342#[fields(style = "Title Case")]
343struct ShowFunctionRow {
344    name: ObjectNameField,
345    arguments: String,
346    return_type: String,
347    language: String,
348    link: Option<String>,
349}
350
351#[derive(Fields)]
352#[fields(style = "Title Case")]
353struct ShowIndexRow {
354    name: String,
355    on: String,
356    key: String,
357    include: String,
358    distributed_by: String,
359}
360
361impl From<Arc<IndexCatalog>> for ShowIndexRow {
362    fn from(index: Arc<IndexCatalog>) -> Self {
363        let index_display = index.display();
364        ShowIndexRow {
365            name: index.name.clone(),
366            on: index.primary_table.name.clone(),
367            key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
368            include: display_comma_separated(&index_display.include_columns).to_string(),
369            distributed_by: display_comma_separated(&index_display.distributed_by_columns)
370                .to_string(),
371        }
372    }
373}
374
375#[derive(Fields)]
376#[fields(style = "Title Case")]
377struct ShowClusterRow {
378    id: i32,
379    addr: String,
380    r#type: String,
381    state: String,
382    parallelism: Option<i32>,
383    is_streaming: Option<bool>,
384    is_serving: Option<bool>,
385    is_unschedulable: Option<bool>,
386    started_at: Option<Timestamptz>,
387}
388
389#[derive(Fields)]
390#[fields(style = "Title Case")]
391struct ShowJobRow {
392    id: i64,
393    statement: String,
394    create_type: String,
395    progress: String,
396}
397
398#[derive(Fields)]
399#[fields(style = "Title Case")]
400struct ShowProcessListRow {
401    worker_id: String,
402    id: String,
403    user: String,
404    host: String,
405    database: String,
406    time: Option<String>,
407    info: Option<String>,
408}
409
410#[derive(Fields)]
411#[fields(style = "Title Case")]
412struct ShowCreateObjectRow {
413    name: String,
414    create_sql: String,
415}
416
417#[derive(Fields)]
418#[fields(style = "Title Case")]
419struct ShowSubscriptionRow {
420    name: ObjectNameField,
421    retention_seconds: i64,
422}
423
424#[derive(Fields)]
425#[fields(style = "Title Case")]
426struct ShowCursorRow {
427    session_id: String,
428    user: String,
429    host: String,
430    database: String,
431    cursor_name: String,
432}
433
434#[derive(Fields)]
435#[fields(style = "Title Case")]
436struct ShowSubscriptionCursorRow {
437    session_id: String,
438    user: String,
439    host: String,
440    database: String,
441    cursor_name: String,
442    subscription_name: String,
443    state: String,
444    idle_duration_ms: i64,
445}
446
447/// Infer the row description for different show objects.
448pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
449    fields_to_descriptors(match objects {
450        ShowObject::Database => ShowDatabaseRow::fields(),
451        ShowObject::Schema => ShowSchemaRow::fields(),
452        ShowObject::Columns { .. } => ShowColumnRow::fields(),
453        ShowObject::Connection { .. } => ShowConnectionRow::fields(),
454        ShowObject::Function { .. } => ShowFunctionRow::fields(),
455        ShowObject::Indexes { .. } => ShowIndexRow::fields(),
456        ShowObject::Cluster => ShowClusterRow::fields(),
457        ShowObject::Jobs => ShowJobRow::fields(),
458        ShowObject::ProcessList => ShowProcessListRow::fields(),
459        ShowObject::Cursor => ShowCursorRow::fields(),
460        ShowObject::SubscriptionCursor => ShowSubscriptionCursorRow::fields(),
461        ShowObject::Subscription { .. } => ShowSubscriptionRow::fields(),
462
463        ShowObject::Table { .. }
464        | ShowObject::InternalTable { .. }
465        | ShowObject::View { .. }
466        | ShowObject::MaterializedView { .. }
467        | ShowObject::Source { .. }
468        | ShowObject::Sink { .. }
469        | ShowObject::Secret { .. } => ShowObjectRow::fields(),
470    })
471}
472
473pub async fn handle_show_object(
474    handler_args: HandlerArgs,
475    command: ShowObject,
476    filter: Option<ShowStatementFilter>,
477) -> Result<RwPgResponse> {
478    let session = handler_args.session;
479
480    if let Some(ShowStatementFilter::Where(..)) = filter {
481        bail_not_implemented!("WHERE clause in SHOW statement");
482    }
483
484    let catalog_reader = session.env().catalog_reader();
485    let user_reader = session.env().user_info_reader();
486    let get_catalog_reader = || {
487        let reader = catalog_reader.read_guard();
488        let user_reader = user_reader.read_guard();
489        let current_user = user_reader
490            .get_user_by_name(&session.user_name())
491            .expect("user not found")
492            .clone();
493        (reader, current_user)
494    };
495
496    let rows: Vec<ShowObjectRow> = match command {
497        ShowObject::Table { schema } => {
498            let (reader, current_user) = get_catalog_reader();
499            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
500                schema
501                    .iter_user_table()
502                    .map(|t| with_schema_name(&schema.name, &t.name))
503                    .map(|name| ShowObjectRow { name })
504                    .collect()
505            })
506        }
507        ShowObject::InternalTable { schema } => {
508            let (reader, current_user) = get_catalog_reader();
509            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
510                schema
511                    .iter_internal_table()
512                    .map(|t| with_schema_name(&schema.name, &t.name))
513                    .map(|name| ShowObjectRow { name })
514                    .collect()
515            })
516        }
517        ShowObject::Database => {
518            let reader = catalog_reader.read_guard();
519            let rows = reader
520                .get_all_database_names()
521                .into_iter()
522                .map(|name| ShowDatabaseRow { name });
523            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
524                .rows(rows)
525                .into());
526        }
527        ShowObject::Schema => {
528            let reader = catalog_reader.read_guard();
529            let rows = reader
530                .get_all_schema_names(&session.database())?
531                .into_iter()
532                .map(|name| ShowSchemaRow { name });
533            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
534                .rows(rows)
535                .into());
536        }
537        ShowObject::View { schema } => {
538            let (reader, current_user) = get_catalog_reader();
539            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
540                schema
541                    .iter_view()
542                    .map(|t| with_schema_name(&schema.name, &t.name))
543                    .map(|name| ShowObjectRow { name })
544                    .collect()
545            })
546        }
547        ShowObject::MaterializedView { schema } => {
548            let (reader, current_user) = get_catalog_reader();
549            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
550                schema
551                    .iter_created_mvs()
552                    .map(|t| with_schema_name(&schema.name, &t.name))
553                    .map(|name| ShowObjectRow { name })
554                    .collect()
555            })
556        }
557        ShowObject::Source { schema } => {
558            let (reader, current_user) = get_catalog_reader();
559            let mut sources =
560                iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
561                    schema
562                        .iter_source()
563                        .map(|t| with_schema_name(&schema.name, &t.name))
564                        .map(|name| ShowObjectRow { name })
565                        .collect()
566                });
567            sources.extend(
568                session
569                    .temporary_source_manager()
570                    .keys()
571                    .into_iter()
572                    .map(|t| with_schema_name("", &t))
573                    .map(|name| ShowObjectRow { name }),
574            );
575            sources
576        }
577        ShowObject::Sink { schema } => {
578            let (reader, current_user) = get_catalog_reader();
579            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
580                schema
581                    .iter_sink()
582                    .map(|t| with_schema_name(&schema.name, &t.name))
583                    .map(|name| ShowObjectRow { name })
584                    .collect()
585            })
586        }
587        ShowObject::Subscription { schema } => {
588            let (reader, current_user) = get_catalog_reader();
589            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
590                schema
591                    .iter_subscription()
592                    .map(|t| ShowSubscriptionRow {
593                        name: with_schema_name(&schema.name, &t.name),
594                        retention_seconds: t.retention_seconds as i64,
595                    })
596                    .collect()
597            });
598            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
599                .rows(rows)
600                .into());
601        }
602        ShowObject::Secret { schema } => {
603            let (reader, current_user) = get_catalog_reader();
604            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
605                schema
606                    .iter_secret()
607                    .map(|t| with_schema_name(&schema.name, &t.name))
608                    .map(|name| ShowObjectRow { name })
609                    .collect()
610            })
611        }
612        ShowObject::Columns { table } => {
613            let Ok(columns) = get_columns_from_table(&session, table.clone())
614                .or(get_columns_from_sink(&session, table.clone()))
615                .or(get_columns_from_view(&session, table.clone()))
616            else {
617                return Err(CatalogError::NotFound(
618                    "table, source, sink or view",
619                    table.to_string(),
620                )
621                .into());
622            };
623
624            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
625                .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
626                .into());
627        }
628        ShowObject::Indexes { table } => {
629            let indexes = get_indexes_from_table(&session, table)?;
630
631            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
632                .rows(indexes.into_iter().map(ShowIndexRow::from))
633                .into());
634        }
635        ShowObject::Connection { schema } => {
636            let (reader, current_user) = get_catalog_reader();
637            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
638                schema.iter_connections()
639                .map(|c| {
640                    let name = c.name.clone();
641                    let r#type = match &c.info {
642                        connection::Info::PrivateLinkService(_) => {
643                            PRIVATELINK_CONNECTION.to_owned()
644                        },
645                        connection::Info::ConnectionParams(params) => {
646                            params.get_connection_type().unwrap().as_str_name().to_owned()
647                        }
648                    };
649                    let source_names = schema
650                        .get_source_ids_by_connection(c.id)
651                        .unwrap_or_default()
652                        .into_iter()
653                        .filter_map(|sid| schema.get_source_by_id(sid).map(|catalog| catalog.name.as_str()))
654                        .collect_vec();
655                    let sink_names = schema
656                        .get_sink_ids_by_connection(c.id)
657                        .unwrap_or_default()
658                        .into_iter()
659                        .filter_map(|sid| schema.get_sink_by_id(sid).map(|catalog| catalog.name.as_str()))
660                        .collect_vec();
661                    let properties = match &c.info {
662                        connection::Info::PrivateLinkService(i) => {
663                            format!(
664                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
665                                i.get_provider().unwrap().as_str_name(),
666                                i.service_name,
667                                i.endpoint_id,
668                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
669                                serde_json::to_string(&source_names).unwrap(),
670                                serde_json::to_string(&sink_names).unwrap(),
671                            )
672                        }
673                        connection::Info::ConnectionParams(params) => {
674                            // todo: show dep relations
675                            print_connection_params(&session.database(), params, &reader)
676                        }
677                    };
678                    ShowConnectionRow {
679                        name: with_schema_name(&schema.name, &name),
680                        r#type,
681                        properties,
682                    }
683                }).collect_vec()
684            });
685            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
686                .rows(rows)
687                .into());
688        }
689        ShowObject::Function { schema } => {
690            let (reader, current_user) = get_catalog_reader();
691            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
692                schema
693                    .iter_function()
694                    .map(|t| ShowFunctionRow {
695                        name: with_schema_name(&schema.name, &t.name),
696                        arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
697                        return_type: t.return_type.to_string(),
698                        language: t.language.clone(),
699                        link: t.link.clone(),
700                    })
701                    .collect()
702            });
703            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
704                .rows(rows)
705                .into());
706        }
707        ShowObject::Cluster => {
708            let workers = session.env().meta_client().list_all_nodes().await?;
709            let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
710                let addr: HostAddr = worker.host.as_ref().unwrap().into();
711                let property = worker.property.as_ref();
712                ShowClusterRow {
713                    id: worker.id.as_i32_id(),
714                    addr: addr.to_string(),
715                    r#type: worker.get_type().unwrap().as_str_name().into(),
716                    state: worker.get_state().unwrap().as_str_name().to_owned(),
717                    parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
718                    is_streaming: property.map(|p| p.is_streaming),
719                    is_serving: property.map(|p| p.is_serving),
720                    is_unschedulable: property.map(|p| p.is_unschedulable),
721                    started_at: worker
722                        .started_at
723                        .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
724                }
725            });
726            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
727                .rows(rows)
728                .into());
729        }
730        ShowObject::Jobs => {
731            let resp = session.env().meta_client().get_ddl_progress().await?;
732            let rows = resp.into_iter().map(|job| ShowJobRow {
733                id: job.id as i64,
734                statement: job.statement,
735                create_type: job.create_type,
736                progress: job.progress,
737            });
738            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
739                .rows(rows)
740                .into());
741        }
742        ShowObject::ProcessList => {
743            let rows = show_process_list_impl(
744                session.env().frontend_client_pool(),
745                session.env().worker_node_manager_ref(),
746            )
747            .await;
748            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
749                .rows(rows)
750                .into());
751        }
752        ShowObject::Cursor => {
753            let sessions = session
754                .env()
755                .sessions_map()
756                .read()
757                .values()
758                .cloned()
759                .collect_vec();
760            let mut rows = vec![];
761            for s in sessions {
762                let session_id = format!("{}", s.id().0);
763                let user = s.user_name();
764                let host = format!("{}", s.peer_addr());
765                let database = s.database();
766
767                s.get_cursor_manager()
768                    .iter_query_cursors(|cursor_name: &String, _| {
769                        rows.push(ShowCursorRow {
770                            session_id: session_id.clone(),
771                            user: user.clone(),
772                            host: host.clone(),
773                            database: database.clone(),
774                            cursor_name: cursor_name.to_owned(),
775                        });
776                    })
777                    .await;
778            }
779            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
780                .rows(rows)
781                .into());
782        }
783        ShowObject::SubscriptionCursor => {
784            let sessions = session
785                .env()
786                .sessions_map()
787                .read()
788                .values()
789                .cloned()
790                .collect_vec();
791            let mut rows = vec![];
792            for s in sessions {
793                let session_id = format!("{}", s.id().0);
794                let user = s.user_name();
795                let host = format!("{}", s.peer_addr());
796                let database = s.database().clone();
797
798                s.get_cursor_manager()
799                    .iter_subscription_cursors(
800                        |cursor_name: &String, cursor: &SubscriptionCursor| {
801                            rows.push(ShowSubscriptionCursorRow {
802                                session_id: session_id.clone(),
803                                user: user.clone(),
804                                host: host.clone(),
805                                database: database.clone(),
806                                cursor_name: cursor_name.to_owned(),
807                                subscription_name: cursor.subscription_name().to_owned(),
808                                state: cursor.state_info_string(),
809                                idle_duration_ms: cursor.idle_duration().as_millis() as i64,
810                            });
811                        },
812                    )
813                    .await;
814            }
815
816            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
817                .rows(rows)
818                .into());
819        }
820    };
821
822    // Apply filters.
823    let rows = rows.into_iter().filter(|row| match &filter {
824        Some(ShowStatementFilter::Like(pattern)) => like_default(&row.base_name(), pattern),
825        Some(ShowStatementFilter::ILike(pattern)) => i_like_default(&row.base_name(), pattern),
826        Some(ShowStatementFilter::Where(..)) => unreachable!(),
827        None => true,
828    });
829
830    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
831        .rows(rows)
832        .into())
833}
834
835pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
836    fields_to_descriptors(ShowCreateObjectRow::fields())
837}
838
839pub fn handle_show_create_object(
840    handle_args: HandlerArgs,
841    show_create_type: ShowCreateType,
842    name: ObjectName,
843) -> Result<RwPgResponse> {
844    let session = handle_args.session;
845    let catalog_reader = session.env().catalog_reader().read_guard();
846    let database = session.database();
847    let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
848    let search_path = session.config().search_path();
849    let user_name = &session.user_name();
850    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
851    let user_reader = session.env().user_info_reader().read_guard();
852    let current_user = user_reader
853        .get_user_by_name(user_name)
854        .expect("user not found");
855
856    let (sql, schema_name) = match show_create_type {
857        ShowCreateType::MaterializedView => {
858            let (mv, schema) = schema_path
859                .try_find(|schema_name| {
860                    Ok::<_, RwError>(
861                        catalog_reader
862                            .get_schema_by_name(&database, schema_name)?
863                            .get_created_table_by_name(&object_name)
864                            .filter(|t| {
865                                t.is_mview() && has_access_to_object(current_user, t.id, t.owner)
866                            }),
867                    )
868                })?
869                .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
870            (mv.create_sql(), schema)
871        }
872        ShowCreateType::View => {
873            let (view, schema) =
874                catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
875            if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
876                return Err(CatalogError::NotFound("view", name.to_string()).into());
877            }
878            (view.create_sql(schema.to_owned()), schema)
879        }
880        ShowCreateType::Table => {
881            let (table, schema) = schema_path
882                .try_find(|schema_name| {
883                    Ok::<_, RwError>(
884                        catalog_reader
885                            .get_schema_by_name(&database, schema_name)?
886                            .get_created_table_by_name(&object_name)
887                            .filter(|t| {
888                                t.is_user_table()
889                                    && has_access_to_object(current_user, t.id, t.owner)
890                            }),
891                    )
892                })?
893                .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
894
895            (table.create_sql_purified(), schema)
896        }
897        ShowCreateType::Sink => {
898            let (sink, schema) =
899                catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
900            if !has_access_to_object(current_user, sink.id, sink.owner.user_id) {
901                return Err(CatalogError::NotFound("sink", name.to_string()).into());
902            }
903            (sink.create_sql(), schema)
904        }
905        ShowCreateType::Source => {
906            let (source, schema) = schema_path
907                .try_find(|schema_name| {
908                    Ok::<_, RwError>(
909                        catalog_reader
910                            .get_schema_by_name(&database, schema_name)?
911                            .get_source_by_name(&object_name)
912                            .filter(|s| {
913                                s.associated_table_id.is_none()
914                                    && has_access_to_object(current_user, s.id, s.owner)
915                            }),
916                    )
917                })?
918                .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
919            (source.create_sql_purified(), schema)
920        }
921        ShowCreateType::Index => {
922            let (index, schema) = schema_path
923                .try_find(|schema_name| {
924                    Ok::<_, RwError>(
925                        catalog_reader
926                            .get_schema_by_name(&database, schema_name)?
927                            .get_created_table_by_name(&object_name)
928                            .filter(|t| {
929                                t.is_index() && has_access_to_object(current_user, t.id, t.owner)
930                            }),
931                    )
932                })?
933                .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
934            (index.create_sql(), schema)
935        }
936        ShowCreateType::Function => {
937            bail_not_implemented!("show create on: {}", show_create_type);
938        }
939        ShowCreateType::Subscription => {
940            let (subscription, schema) =
941                catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
942            if !has_access_to_object(current_user, subscription.id, subscription.owner.user_id) {
943                return Err(CatalogError::NotFound("subscription", name.to_string()).into());
944            }
945            (subscription.create_sql(), schema)
946        }
947    };
948    let name = format!("{}.{}", schema_name, object_name);
949
950    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
951        .rows([ShowCreateObjectRow {
952            name,
953            create_sql: sql,
954        }])
955        .into())
956}
957
958async fn show_process_list_impl(
959    frontend_client_pool: FrontendClientPoolRef,
960    worker_node_manager: WorkerNodeManagerRef,
961) -> Vec<ShowProcessListRow> {
962    // Create a placeholder row for the worker in case of any errors while fetching its running SQLs.
963    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowProcessListRow> {
964        vec![ShowProcessListRow {
965            worker_id: format!("{}", worker_id),
966            id: "".to_owned(),
967            user: "".to_owned(),
968            host: "".to_owned(),
969            database: "".to_owned(),
970            time: None,
971            info: Some(format!(
972                "Failed to show process list from worker {worker_id} due to: {err_msg}"
973            )),
974        }]
975    }
976    let futures = worker_node_manager
977        .list_frontend_nodes()
978        .into_iter()
979        .map(|worker| {
980            let frontend_client_pool_ = frontend_client_pool.clone();
981            async move {
982                let client = match frontend_client_pool_.get(&worker).await {
983                    Ok(client) => client,
984                    Err(e) => {
985                        return on_error(worker.id, format!("{}", e.as_report()));
986                    }
987                };
988                let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
989                    Ok(resp) => resp,
990                    Err(e) => {
991                        return on_error(worker.id, format!("{}", e.as_report()));
992                    }
993                };
994                resp.into_inner()
995                    .running_sqls
996                    .into_iter()
997                    .map(|sql| ShowProcessListRow {
998                        worker_id: format!("{}", worker.id),
999                        id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
1000                        user: sql.user_name,
1001                        host: sql.peer_addr,
1002                        database: sql.database,
1003                        time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
1004                        info: sql
1005                            .sql
1006                            .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
1007                    })
1008                    .collect_vec()
1009            }
1010        })
1011        .collect_vec();
1012    join_all(futures).await.into_iter().flatten().collect()
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017    use std::ops::Index;
1018
1019    use futures_async_stream::for_await;
1020
1021    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1022
1023    #[tokio::test]
1024    async fn test_show_source() {
1025        let frontend = LocalFrontend::new(Default::default()).await;
1026
1027        let sql = r#"CREATE SOURCE t1 (column1 varchar)
1028        WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1029        FORMAT PLAIN ENCODE JSON"#;
1030        frontend.run_sql(sql).await.unwrap();
1031
1032        let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
1033        rows.sort();
1034        assert_eq!(rows, vec!["Row([Some(b\"public.t1\")])".to_owned(),]);
1035    }
1036
1037    #[tokio::test]
1038    async fn test_show_column() {
1039        let proto_file = create_proto_file(PROTO_FILE_DATA);
1040        let sql = format!(
1041            r#"CREATE SOURCE t
1042    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1043    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1044            proto_file.path().to_str().unwrap()
1045        );
1046        let frontend = LocalFrontend::new(Default::default()).await;
1047        frontend.run_sql(sql).await.unwrap();
1048
1049        let sql = "show columns from t";
1050        let mut pg_response = frontend.run_sql(sql).await.unwrap();
1051
1052        let mut columns = Vec::new();
1053        #[for_await]
1054        for row_set in pg_response.values_stream() {
1055            let row_set = row_set.unwrap();
1056            for row in row_set {
1057                columns.push((
1058                    std::str::from_utf8(row.index(0).as_ref().unwrap())
1059                        .unwrap()
1060                        .to_owned(),
1061                    std::str::from_utf8(row.index(1).as_ref().unwrap())
1062                        .unwrap()
1063                        .to_owned(),
1064                ));
1065            }
1066        }
1067
1068        expect_test::expect![[r#"
1069            [
1070                (
1071                    "id",
1072                    "integer",
1073                ),
1074                (
1075                    "country",
1076                    "struct",
1077                ),
1078                (
1079                    "country.address",
1080                    "character varying",
1081                ),
1082                (
1083                    "country.city",
1084                    "struct",
1085                ),
1086                (
1087                    "country.city.address",
1088                    "character varying",
1089                ),
1090                (
1091                    "country.city.zipcode",
1092                    "character varying",
1093                ),
1094                (
1095                    "country.zipcode",
1096                    "character varying",
1097                ),
1098                (
1099                    "zipcode",
1100                    "bigint",
1101                ),
1102                (
1103                    "rate",
1104                    "real",
1105                ),
1106                (
1107                    "_rw_kafka_timestamp",
1108                    "timestamp with time zone",
1109                ),
1110                (
1111                    "_rw_kafka_partition",
1112                    "character varying",
1113                ),
1114                (
1115                    "_rw_kafka_offset",
1116                    "character varying",
1117                ),
1118                (
1119                    "_row_id",
1120                    "serial",
1121                ),
1122            ]
1123        "#]]
1124        .assert_debug_eq(&columns);
1125    }
1126}