risingwave_frontend/handler/
show.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
23use risingwave_common::bail_not_implemented;
24use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
25use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
26use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
27use risingwave_common::util::addr::HostAddr;
28use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
29use risingwave_expr::scalar::like::{i_like_default, like_default};
30use risingwave_pb::catalog::connection;
31use risingwave_pb::frontend_service::{
32    GetAllCursorsRequest, GetAllSubCursorsRequest, GetRunningSqlsRequest,
33};
34use risingwave_pb::id::WorkerId;
35use risingwave_rpc_client::FrontendClientPoolRef;
36use risingwave_sqlparser::ast::{
37    Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
38};
39use thiserror_ext::AsReport;
40
41use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
42use crate::binder::{Binder, Relation};
43use crate::catalog::catalog_service::CatalogReadGuard;
44use crate::catalog::root_catalog::SchemaPath;
45use crate::catalog::schema_catalog::SchemaCatalog;
46use crate::catalog::{CatalogError, IndexCatalog};
47use crate::error::{Result, RwError};
48use crate::handler::HandlerArgs;
49use crate::handler::create_connection::print_connection_params;
50use crate::session::{SessionImpl, WorkerProcessId};
51use crate::user::user_catalog::UserCatalog;
52use crate::user::{has_access_to_object, has_schema_usage_privilege};
53
54pub fn get_columns_from_table(
55    session: &SessionImpl,
56    table_name: ObjectName,
57) -> Result<Vec<ColumnCatalog>> {
58    let mut binder = Binder::new_for_system(session);
59    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
60    let column_catalogs = match relation {
61        Relation::Source(s) => s.catalog.columns,
62        Relation::BaseTable(t) => t.table_catalog.columns.clone(),
63        Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
64        _ => {
65            return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
66        }
67    };
68
69    Ok(column_catalogs)
70}
71
72pub fn get_columns_from_sink(
73    session: &SessionImpl,
74    sink_name: ObjectName,
75) -> Result<Vec<ColumnCatalog>> {
76    let binder = Binder::new_for_system(session);
77    let sink = binder.bind_sink_by_name(sink_name)?;
78    Ok(sink.sink_catalog.full_columns().to_vec())
79}
80
81pub fn get_columns_from_view(
82    session: &SessionImpl,
83    view_name: ObjectName,
84) -> Result<Vec<ColumnCatalog>> {
85    let binder = Binder::new_for_system(session);
86    let view = binder.bind_view_by_name(view_name)?;
87
88    Ok(view
89        .view_catalog
90        .columns
91        .iter()
92        .enumerate()
93        .map(|(idx, field)| ColumnCatalog {
94            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
95            is_hidden: false,
96        })
97        .collect())
98}
99
100pub fn get_indexes_from_table(
101    session: &SessionImpl,
102    table_name: ObjectName,
103) -> Result<Vec<Arc<IndexCatalog>>> {
104    let mut binder = Binder::new_for_system(session);
105    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
106    let indexes = match relation {
107        Relation::BaseTable(t) => t.table_indexes,
108        _ => {
109            return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
110        }
111    };
112
113    Ok(indexes)
114}
115
116fn schema_or_search_path(
117    session: &Arc<SessionImpl>,
118    schema: &Option<Ident>,
119    search_path: &SearchPath,
120) -> Vec<String> {
121    if let Some(s) = schema {
122        vec![s.real_value()]
123    } else {
124        search_path
125            .real_path()
126            .iter()
127            .map(|s| {
128                if s.eq(USER_NAME_WILD_CARD) {
129                    session.user_name()
130                } else {
131                    s.clone()
132                }
133            })
134            .collect()
135    }
136}
137
138fn iter_schema_items<F, T>(
139    session: &Arc<SessionImpl>,
140    schema: &Option<Ident>,
141    reader: &CatalogReadGuard,
142    current_user: &UserCatalog,
143    mut f: F,
144) -> Vec<T>
145where
146    F: FnMut(&SchemaCatalog) -> Vec<T>,
147{
148    let search_path = session.config().search_path();
149
150    schema_or_search_path(session, schema, &search_path)
151        .into_iter()
152        .filter_map(|schema| {
153            if let Ok(schema_catalog) =
154                reader.get_schema_by_name(&session.database(), schema.as_ref())
155                && (has_schema_usage_privilege(
156                    current_user,
157                    schema_catalog.id(),
158                    schema_catalog.owner,
159                ))
160            {
161                Some(schema_catalog)
162            } else {
163                None
164            }
165        })
166        .flat_map(|s| f(s).into_iter())
167        .collect()
168}
169
170/// Wrapper for `ObjectName` to be used as a field in a row.
171// TODO: replace remaining `name: String` with `name: ObjectNameField`
172struct ObjectNameField(ObjectName);
173
174fn with_schema_name(schema_name: &str, name: &str) -> ObjectNameField {
175    if schema_name.is_empty() {
176        ObjectNameField(ObjectName(vec![name.into()]))
177    } else {
178        ObjectNameField(ObjectName(vec![schema_name.into(), name.into()]))
179    }
180}
181
182impl WithDataType for ObjectNameField {
183    fn default_data_type() -> DataType {
184        DataType::Varchar
185    }
186}
187
188impl ToOwnedDatum for ObjectNameField {
189    fn to_owned_datum(self) -> Datum {
190        Some(self.0.to_string().into())
191    }
192}
193
194#[derive(Fields)]
195#[fields(style = "Title Case")]
196struct ShowObjectRow {
197    name: ObjectNameField,
198}
199
200impl ShowObjectRow {
201    fn base_name(&self) -> String {
202        self.name.0.base_name()
203    }
204}
205
206#[derive(Fields)]
207#[fields(style = "Title Case")]
208struct ShowDatabaseRow {
209    name: String,
210}
211
212#[derive(Fields)]
213#[fields(style = "Title Case")]
214struct ShowSchemaRow {
215    name: String,
216}
217
218#[derive(Fields)]
219#[fields(style = "Title Case")]
220pub struct ShowColumnRow {
221    pub name: ShowColumnName,
222    pub r#type: String,
223    pub is_hidden: Option<String>, // XXX: why not bool?
224    pub description: Option<String>,
225}
226
227#[derive(Clone, Debug)]
228enum ShowColumnNameSegment {
229    Field(Ident),
230    ListElement,
231}
232
233impl ShowColumnNameSegment {
234    pub fn field(name: &str) -> Self {
235        ShowColumnNameSegment::Field(Ident::from_real_value(name))
236    }
237}
238
239/// The name of a column in the output of `SHOW COLUMNS` or `DESCRIBE`.
240#[derive(Clone, Debug)]
241pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
242
243impl ShowColumnName {
244    /// Create a special column name without quoting. Used only for extra information like `primary key`
245    /// in the output of `DESCRIBE`.
246    pub fn special(name: &str) -> Self {
247        ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
248            name,
249        ))])
250    }
251}
252
253impl WithDataType for ShowColumnName {
254    fn default_data_type() -> DataType {
255        DataType::Varchar
256    }
257}
258
259impl ToOwnedDatum for ShowColumnName {
260    fn to_owned_datum(self) -> Datum {
261        use std::fmt::Write;
262
263        let mut s = String::new();
264        for segment in self.0 {
265            match segment {
266                ShowColumnNameSegment::Field(ident) => {
267                    if !s.is_empty() {
268                        // TODO: shall we add parentheses, so that it's valid field access SQL?
269                        s.push('.');
270                    }
271                    write!(s, "{ident}").unwrap();
272                }
273                ShowColumnNameSegment::ListElement => {
274                    s.push_str("[1]");
275                }
276            }
277        }
278        s.to_owned_datum()
279    }
280}
281
282impl ShowColumnRow {
283    /// Create a row with the given information. If the data type is a struct or list,
284    /// flatten the data type to also generate rows for its fields.
285    fn flatten(
286        name: ShowColumnName,
287        data_type: DataType,
288        is_hidden: bool,
289        description: Option<String>,
290    ) -> Vec<Self> {
291        // TODO(struct): use struct's type name once supported.
292        let r#type = match &data_type {
293            DataType::Struct(_) => "struct".to_owned(),
294            DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
295            d => d.to_string(),
296        };
297
298        let mut rows = vec![ShowColumnRow {
299            name: name.clone(),
300            r#type,
301            is_hidden: Some(is_hidden.to_string()),
302            description,
303        }];
304
305        match data_type {
306            DataType::Struct(st) => {
307                rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
308                    let mut name = name.clone();
309                    name.0.push(ShowColumnNameSegment::field(field_name));
310                    Self::flatten(name, field_data_type.clone(), is_hidden, None)
311                }));
312            }
313
314            DataType::List(list) if let DataType::Struct(_) = list.elem() => {
315                let mut name = name.clone();
316                name.0.push(ShowColumnNameSegment::ListElement);
317                rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
318            }
319
320            _ => {}
321        }
322
323        rows
324    }
325
326    pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
327        Self::flatten(
328            ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
329            col.column_desc.data_type,
330            col.is_hidden,
331            col.column_desc.description,
332        )
333    }
334}
335
336#[derive(Fields)]
337#[fields(style = "Title Case")]
338struct ShowConnectionRow {
339    name: ObjectNameField,
340    r#type: String,
341    properties: String,
342}
343
344#[derive(Fields)]
345#[fields(style = "Title Case")]
346struct ShowFunctionRow {
347    name: ObjectNameField,
348    arguments: String,
349    return_type: String,
350    language: String,
351    link: Option<String>,
352}
353
354#[derive(Fields)]
355#[fields(style = "Title Case")]
356struct ShowIndexRow {
357    name: String,
358    on: String,
359    key: String,
360    include: String,
361    distributed_by: String,
362}
363
364impl From<Arc<IndexCatalog>> for ShowIndexRow {
365    fn from(index: Arc<IndexCatalog>) -> Self {
366        let index_display = index.display();
367        ShowIndexRow {
368            name: index.name.clone(),
369            on: index.primary_table.name.clone(),
370            key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
371            include: display_comma_separated(&index_display.include_columns).to_string(),
372            distributed_by: display_comma_separated(&index_display.distributed_by_columns)
373                .to_string(),
374        }
375    }
376}
377
378#[derive(Fields)]
379#[fields(style = "Title Case")]
380struct ShowClusterRow {
381    id: WorkerId,
382    addr: String,
383    r#type: String,
384    state: String,
385    parallelism: Option<i32>,
386    is_streaming: Option<bool>,
387    is_serving: Option<bool>,
388    started_at: Option<Timestamptz>,
389}
390
391#[derive(Fields)]
392#[fields(style = "Title Case")]
393struct ShowJobRow {
394    id: i64,
395    statement: String,
396    create_type: String,
397    progress: String,
398}
399
400#[derive(Fields)]
401#[fields(style = "Title Case")]
402struct ShowProcessListRow {
403    worker_id: String,
404    id: String,
405    user: String,
406    host: String,
407    database: String,
408    time: Option<String>,
409    info: Option<String>,
410}
411
412#[derive(Fields)]
413#[fields(style = "Title Case")]
414struct ShowCreateObjectRow {
415    name: String,
416    create_sql: String,
417}
418
419#[derive(Fields)]
420#[fields(style = "Title Case")]
421struct ShowSubscriptionRow {
422    name: ObjectNameField,
423    retention_seconds: i64,
424}
425
426#[derive(Fields)]
427#[fields(style = "Title Case")]
428struct ShowCursorRow {
429    worker_id: String,
430    session_id: String,
431    user: String,
432    host: String,
433    database: String,
434    cursor_name: String,
435    info: Option<String>,
436}
437
438#[derive(Fields)]
439#[fields(style = "Title Case")]
440struct ShowSubscriptionCursorRow {
441    worker_id: String,
442    session_id: String,
443    user: String,
444    host: String,
445    database: String,
446    cursor_name: String,
447    subscription_name: String,
448    state: String,
449    idle_duration_ms: i64,
450    info: Option<String>,
451}
452
453/// Infer the row description for different show objects.
454pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
455    fields_to_descriptors(match objects {
456        ShowObject::Database => ShowDatabaseRow::fields(),
457        ShowObject::Schema => ShowSchemaRow::fields(),
458        ShowObject::Columns { .. } => ShowColumnRow::fields(),
459        ShowObject::Connection { .. } => ShowConnectionRow::fields(),
460        ShowObject::Function { .. } => ShowFunctionRow::fields(),
461        ShowObject::Indexes { .. } => ShowIndexRow::fields(),
462        ShowObject::Cluster => ShowClusterRow::fields(),
463        ShowObject::Jobs => ShowJobRow::fields(),
464        ShowObject::ProcessList => ShowProcessListRow::fields(),
465        ShowObject::Cursor => ShowCursorRow::fields(),
466        ShowObject::SubscriptionCursor => ShowSubscriptionCursorRow::fields(),
467        ShowObject::Subscription { .. } => ShowSubscriptionRow::fields(),
468
469        ShowObject::Table { .. }
470        | ShowObject::InternalTable { .. }
471        | ShowObject::View { .. }
472        | ShowObject::MaterializedView { .. }
473        | ShowObject::Source { .. }
474        | ShowObject::Sink { .. }
475        | ShowObject::Secret { .. } => ShowObjectRow::fields(),
476    })
477}
478
479pub async fn handle_show_object(
480    handler_args: HandlerArgs,
481    command: ShowObject,
482    filter: Option<ShowStatementFilter>,
483) -> Result<RwPgResponse> {
484    let session = handler_args.session;
485
486    if let Some(ShowStatementFilter::Where(..)) = filter {
487        bail_not_implemented!("WHERE clause in SHOW statement");
488    }
489
490    let catalog_reader = session.env().catalog_reader();
491    let user_reader = session.env().user_info_reader();
492    let get_catalog_reader = || {
493        let reader = catalog_reader.read_guard();
494        let user_reader = user_reader.read_guard();
495        let current_user = user_reader
496            .get_user_by_name(&session.user_name())
497            .expect("user not found")
498            .clone();
499        (reader, current_user)
500    };
501
502    let rows: Vec<ShowObjectRow> = match command {
503        ShowObject::Table { schema } => {
504            let (reader, current_user) = get_catalog_reader();
505            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
506                schema
507                    .iter_user_table()
508                    .map(|t| with_schema_name(&schema.name, &t.name))
509                    .map(|name| ShowObjectRow { name })
510                    .collect()
511            })
512        }
513        ShowObject::InternalTable { schema } => {
514            let (reader, current_user) = get_catalog_reader();
515            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
516                schema
517                    .iter_internal_table()
518                    .map(|t| with_schema_name(&schema.name, &t.name))
519                    .map(|name| ShowObjectRow { name })
520                    .collect()
521            })
522        }
523        ShowObject::Database => {
524            let reader = catalog_reader.read_guard();
525            let rows = reader
526                .get_all_database_names()
527                .into_iter()
528                .map(|name| ShowDatabaseRow { name });
529            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
530                .rows(rows)
531                .into());
532        }
533        ShowObject::Schema => {
534            let reader = catalog_reader.read_guard();
535            let rows = reader
536                .get_all_schema_names(&session.database())?
537                .into_iter()
538                .map(|name| ShowSchemaRow { name });
539            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
540                .rows(rows)
541                .into());
542        }
543        ShowObject::View { schema } => {
544            let (reader, current_user) = get_catalog_reader();
545            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
546                schema
547                    .iter_view()
548                    .map(|t| with_schema_name(&schema.name, &t.name))
549                    .map(|name| ShowObjectRow { name })
550                    .collect()
551            })
552        }
553        ShowObject::MaterializedView { schema } => {
554            let (reader, current_user) = get_catalog_reader();
555            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
556                schema
557                    .iter_created_mvs()
558                    .map(|t| with_schema_name(&schema.name, &t.name))
559                    .map(|name| ShowObjectRow { name })
560                    .collect()
561            })
562        }
563        ShowObject::Source { schema } => {
564            let (reader, current_user) = get_catalog_reader();
565            let mut sources =
566                iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
567                    schema
568                        .iter_source()
569                        .map(|t| with_schema_name(&schema.name, &t.name))
570                        .map(|name| ShowObjectRow { name })
571                        .collect()
572                });
573            sources.extend(
574                session
575                    .temporary_source_manager()
576                    .keys()
577                    .into_iter()
578                    .map(|t| with_schema_name("", &t))
579                    .map(|name| ShowObjectRow { name }),
580            );
581            sources
582        }
583        ShowObject::Sink { schema } => {
584            let (reader, current_user) = get_catalog_reader();
585            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
586                schema
587                    .iter_sink()
588                    .map(|t| with_schema_name(&schema.name, &t.name))
589                    .map(|name| ShowObjectRow { name })
590                    .collect()
591            })
592        }
593        ShowObject::Subscription { schema } => {
594            let (reader, current_user) = get_catalog_reader();
595            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
596                schema
597                    .iter_subscription()
598                    .map(|t| ShowSubscriptionRow {
599                        name: with_schema_name(&schema.name, &t.name),
600                        retention_seconds: t.retention_seconds as i64,
601                    })
602                    .collect()
603            });
604            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
605                .rows(rows)
606                .into());
607        }
608        ShowObject::Secret { schema } => {
609            let (reader, current_user) = get_catalog_reader();
610            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
611                schema
612                    .iter_secret()
613                    .map(|t| with_schema_name(&schema.name, &t.name))
614                    .map(|name| ShowObjectRow { name })
615                    .collect()
616            })
617        }
618        ShowObject::Columns { table } => {
619            let Ok(columns) = get_columns_from_table(&session, table.clone())
620                .or(get_columns_from_sink(&session, table.clone()))
621                .or(get_columns_from_view(&session, table.clone()))
622            else {
623                return Err(CatalogError::not_found(
624                    "table, source, sink or view",
625                    table.to_string(),
626                )
627                .into());
628            };
629
630            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
631                .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
632                .into());
633        }
634        ShowObject::Indexes { table } => {
635            let indexes = get_indexes_from_table(&session, table)?;
636
637            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
638                .rows(indexes.into_iter().map(ShowIndexRow::from))
639                .into());
640        }
641        ShowObject::Connection { schema } => {
642            let (reader, current_user) = get_catalog_reader();
643            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
644                schema.iter_connections()
645                .map(|c| {
646                    let name = c.name.clone();
647                    let r#type = match &c.info {
648                        #[expect(deprecated)]
649                        connection::Info::PrivateLinkService(_) => {
650                            PRIVATELINK_CONNECTION.to_owned()
651                        },
652                        connection::Info::ConnectionParams(params) => {
653                            params.get_connection_type().unwrap().as_str_name().to_owned()
654                        }
655                    };
656                    let source_names = schema
657                        .get_source_ids_by_connection(c.id)
658                        .unwrap_or_default()
659                        .into_iter()
660                        .filter_map(|sid| schema.get_source_by_id(sid).map(|catalog| catalog.name.as_str()))
661                        .collect_vec();
662                    let sink_names = schema
663                        .get_sink_ids_by_connection(c.id)
664                        .unwrap_or_default()
665                        .into_iter()
666                        .filter_map(|sid| schema.get_sink_by_id(sid).map(|catalog| catalog.name.as_str()))
667                        .collect_vec();
668                    let properties = match &c.info {
669                        #[expect(deprecated)]
670                        connection::Info::PrivateLinkService(i) => {
671                            format!(
672                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
673                                i.get_provider().unwrap().as_str_name(),
674                                i.service_name,
675                                i.endpoint_id,
676                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
677                                serde_json::to_string(&source_names).unwrap(),
678                                serde_json::to_string(&sink_names).unwrap(),
679                            )
680                        }
681                        connection::Info::ConnectionParams(params) => {
682                            // todo: show dep relations
683                            print_connection_params(&session.database(), params, &reader)
684                        }
685                    };
686                    ShowConnectionRow {
687                        name: with_schema_name(&schema.name, &name),
688                        r#type,
689                        properties,
690                    }
691                }).collect_vec()
692            });
693            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
694                .rows(rows)
695                .into());
696        }
697        ShowObject::Function { schema } => {
698            let (reader, current_user) = get_catalog_reader();
699            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
700                schema
701                    .iter_function()
702                    .map(|t| ShowFunctionRow {
703                        name: with_schema_name(&schema.name, &t.name),
704                        arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
705                        return_type: t.return_type.to_string(),
706                        language: t.language.clone(),
707                        link: t.link.clone(),
708                    })
709                    .collect()
710            });
711            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
712                .rows(rows)
713                .into());
714        }
715        ShowObject::Cluster => {
716            let workers = session.env().meta_client().list_all_nodes().await?;
717            let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
718                let addr: HostAddr = worker.host.as_ref().unwrap().into();
719                let property = worker.property.as_ref();
720                ShowClusterRow {
721                    id: worker.id,
722                    addr: addr.to_string(),
723                    r#type: worker.get_type().unwrap().as_str_name().into(),
724                    state: worker.get_state().unwrap().as_str_name().to_owned(),
725                    parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
726                    is_streaming: property.map(|p| p.is_streaming),
727                    is_serving: property.map(|p| p.is_serving),
728                    started_at: worker
729                        .started_at
730                        .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
731                }
732            });
733            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
734                .rows(rows)
735                .into());
736        }
737        ShowObject::Jobs => {
738            let resp = session.env().meta_client().get_ddl_progress().await?;
739            let rows = resp.into_iter().map(|job| ShowJobRow {
740                id: job.id as i64,
741                statement: job.statement,
742                create_type: job.create_type,
743                progress: job.progress,
744            });
745            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
746                .rows(rows)
747                .into());
748        }
749        ShowObject::ProcessList => {
750            let rows = show_process_list_impl(
751                session.env().frontend_client_pool(),
752                session.env().worker_node_manager_ref(),
753            )
754            .await;
755            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
756                .rows(rows)
757                .into());
758        }
759        ShowObject::Cursor => {
760            let rows = show_all_cursors_impl(
761                session.env().frontend_client_pool(),
762                session.env().worker_node_manager_ref(),
763            )
764            .await;
765            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
766                .rows(rows)
767                .into());
768        }
769        ShowObject::SubscriptionCursor => {
770            let rows = show_all_sub_cursors_impl(
771                session.env().frontend_client_pool(),
772                session.env().worker_node_manager_ref(),
773            )
774            .await;
775
776            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
777                .rows(rows)
778                .into());
779        }
780    };
781
782    // Apply filters.
783    let rows = rows.into_iter().filter(|row| match &filter {
784        Some(ShowStatementFilter::Like(pattern)) => like_default(&row.base_name(), pattern),
785        Some(ShowStatementFilter::ILike(pattern)) => i_like_default(&row.base_name(), pattern),
786        Some(ShowStatementFilter::Where(..)) => unreachable!(),
787        None => true,
788    });
789
790    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
791        .rows(rows)
792        .into())
793}
794
795pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
796    fields_to_descriptors(ShowCreateObjectRow::fields())
797}
798
799pub fn handle_show_create_object(
800    handle_args: HandlerArgs,
801    show_create_type: ShowCreateType,
802    name: ObjectName,
803) -> Result<RwPgResponse> {
804    let session = handle_args.session;
805    let catalog_reader = session.env().catalog_reader().read_guard();
806    let database = session.database();
807    let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
808    let search_path = session.config().search_path();
809    let user_name = &session.user_name();
810    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
811    let user_reader = session.env().user_info_reader().read_guard();
812    let current_user = user_reader
813        .get_user_by_name(user_name)
814        .expect("user not found");
815
816    let (sql, schema_name) = match show_create_type {
817        ShowCreateType::MaterializedView => {
818            let (mv, schema) = schema_path
819                .try_find(|schema_name| {
820                    Ok::<_, RwError>(
821                        catalog_reader
822                            .get_schema_by_name(&database, schema_name)?
823                            .get_created_table_by_name(&object_name)
824                            .filter(|t| {
825                                t.is_mview() && has_access_to_object(current_user, t.id, t.owner)
826                            }),
827                    )
828                })?
829                .ok_or_else(|| CatalogError::not_found("materialized view", name.to_string()))?;
830            (mv.create_sql(), schema)
831        }
832        ShowCreateType::View => {
833            let (view, schema) =
834                catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
835            if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
836                return Err(CatalogError::not_found("view", name.to_string()).into());
837            }
838            (view.create_sql(schema.to_owned()), schema)
839        }
840        ShowCreateType::Table => {
841            let (table, schema) = schema_path
842                .try_find(|schema_name| {
843                    Ok::<_, RwError>(
844                        catalog_reader
845                            .get_schema_by_name(&database, schema_name)?
846                            .get_created_table_by_name(&object_name)
847                            .filter(|t| {
848                                t.is_user_table()
849                                    && has_access_to_object(current_user, t.id, t.owner)
850                            }),
851                    )
852                })?
853                .ok_or_else(|| CatalogError::not_found("table", name.to_string()))?;
854
855            (table.create_sql_purified(), schema)
856        }
857        ShowCreateType::Sink => {
858            let (sink, schema) =
859                catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
860            if !has_access_to_object(current_user, sink.id, sink.owner) {
861                return Err(CatalogError::not_found("sink", name.to_string()).into());
862            }
863            (sink.create_sql(), schema)
864        }
865        ShowCreateType::Source => {
866            let (source, schema) = schema_path
867                .try_find(|schema_name| {
868                    Ok::<_, RwError>(
869                        catalog_reader
870                            .get_schema_by_name(&database, schema_name)?
871                            .get_source_by_name(&object_name)
872                            .filter(|s| {
873                                s.associated_table_id.is_none()
874                                    && has_access_to_object(current_user, s.id, s.owner)
875                            }),
876                    )
877                })?
878                .ok_or_else(|| CatalogError::not_found("source", name.to_string()))?;
879            (source.create_sql_purified(), schema)
880        }
881        ShowCreateType::Index => {
882            let (index, schema) = schema_path
883                .try_find(|schema_name| {
884                    Ok::<_, RwError>(
885                        catalog_reader
886                            .get_schema_by_name(&database, schema_name)?
887                            .get_created_table_by_name(&object_name)
888                            .filter(|t| {
889                                t.is_index() && has_access_to_object(current_user, t.id, t.owner)
890                            }),
891                    )
892                })?
893                .ok_or_else(|| CatalogError::not_found("index", name.to_string()))?;
894            (index.create_sql(), schema)
895        }
896        ShowCreateType::Function => {
897            bail_not_implemented!("show create on: {}", show_create_type);
898        }
899        ShowCreateType::Subscription => {
900            let (subscription, schema) =
901                catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
902            if !has_access_to_object(current_user, subscription.id, subscription.owner) {
903                return Err(CatalogError::not_found("subscription", name.to_string()).into());
904            }
905            (subscription.create_sql(), schema)
906        }
907    };
908    let name = format!("{}.{}", schema_name, object_name);
909
910    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
911        .rows([ShowCreateObjectRow {
912            name,
913            create_sql: sql,
914        }])
915        .into())
916}
917
918async fn show_all_sub_cursors_impl(
919    frontend_client_pool: FrontendClientPoolRef,
920    worker_node_manager: WorkerNodeManagerRef,
921) -> Vec<ShowSubscriptionCursorRow> {
922    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowSubscriptionCursorRow> {
923        vec![ShowSubscriptionCursorRow {
924            worker_id: format!("{}", worker_id),
925            session_id: "".to_owned(),
926            user: "".to_owned(),
927            host: "".to_owned(),
928            database: "".to_owned(),
929            cursor_name: "".to_owned(),
930            subscription_name: "".to_owned(),
931            state: "".to_owned(),
932            idle_duration_ms: 0,
933            info: Some(format!(
934                "Failed to show subscription cursor from worker {worker_id} due to: {err_msg}"
935            )),
936        }]
937    }
938
939    let futures = worker_node_manager
940        .list_frontend_nodes()
941        .into_iter()
942        .map(|worker| {
943            let frontend_client_pool_ = frontend_client_pool.clone();
944            async move {
945                let client = match frontend_client_pool_.get(&worker).await {
946                    Ok(client) => client,
947                    Err(e) => {
948                        return on_error(worker.id, format!("{}", e.as_report()));
949                    }
950                };
951
952                let resp = match client.get_all_sub_cursors(GetAllSubCursorsRequest {}).await {
953                    Ok(resp) => resp,
954                    Err(e) => {
955                        return on_error(worker.id, format!("{}", e.as_report()));
956                    }
957                };
958
959                resp.into_inner()
960                    .subscription_cursors
961                    .into_iter()
962                    .flat_map(|sub_cursor| {
963                        let worker_id = worker.id.to_string();
964                        let session_id = sub_cursor.session_id.to_string();
965
966                        let user = sub_cursor.user_name;
967                        let host = sub_cursor.peer_addr;
968                        let database = sub_cursor.database;
969
970                        let size = sub_cursor.states.len();
971                        let mut sub_cursors = vec![];
972                        for index in 0..size {
973                            sub_cursors.push(ShowSubscriptionCursorRow {
974                                worker_id: worker_id.clone(),
975                                session_id: session_id.clone(),
976                                user: user.clone(),
977                                host: host.clone(),
978                                database: database.clone(),
979                                cursor_name: sub_cursor.cursor_names[index].clone(),
980                                subscription_name: sub_cursor.subscription_names[index].clone(),
981                                state: sub_cursor.states[index].clone(),
982                                idle_duration_ms: sub_cursor.idle_durations[index] as i64,
983                                info: None,
984                            })
985                        }
986                        sub_cursors
987                    })
988                    .collect_vec()
989            }
990        })
991        .collect_vec();
992    join_all(futures).await.into_iter().flatten().collect()
993}
994
995async fn show_all_cursors_impl(
996    frontend_client_pool: FrontendClientPoolRef,
997    worker_node_manager: WorkerNodeManagerRef,
998) -> Vec<ShowCursorRow> {
999    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowCursorRow> {
1000        vec![ShowCursorRow {
1001            worker_id: format!("{}", worker_id),
1002            session_id: "".to_owned(),
1003            user: "".to_owned(),
1004            host: "".to_owned(),
1005            database: "".to_owned(),
1006            cursor_name: "".to_owned(),
1007            info: Some(format!(
1008                "Failed to show cursor from worker {worker_id} due to: {err_msg}"
1009            )),
1010        }]
1011    }
1012    let futures = worker_node_manager
1013        .list_frontend_nodes()
1014        .into_iter()
1015        .map(|worker| {
1016            let frontend_client_pool_ = frontend_client_pool.clone();
1017            async move {
1018                let client = match frontend_client_pool_.get(&worker).await {
1019                    Ok(client) => client,
1020                    Err(e) => {
1021                        return on_error(worker.id, format!("{}", e.as_report()));
1022                    }
1023                };
1024
1025                let resp = match client.get_all_cursors(GetAllCursorsRequest {}).await {
1026                    Ok(resp) => resp,
1027                    Err(e) => {
1028                        return on_error(worker.id, format!("{}", e.as_report()));
1029                    }
1030                };
1031
1032                resp.into_inner()
1033                    .all_cursors
1034                    .into_iter()
1035                    .flat_map(|cursors| {
1036                        let worker_id = worker.id.to_string();
1037                        let session_id = cursors.session_id.to_string();
1038
1039                        let user = cursors.user_name;
1040                        let host = cursors.peer_addr;
1041                        let database = cursors.database;
1042
1043                        cursors
1044                            .cursor_names
1045                            .into_iter()
1046                            .map(|cursor_name| ShowCursorRow {
1047                                worker_id: worker_id.clone(),
1048                                session_id: session_id.clone(),
1049                                user: user.clone(),
1050                                host: host.clone(),
1051                                database: database.clone(),
1052                                cursor_name,
1053                                info: None,
1054                            })
1055                            .collect_vec()
1056                    })
1057                    .collect_vec()
1058            }
1059        })
1060        .collect_vec();
1061    join_all(futures).await.into_iter().flatten().collect()
1062}
1063
1064async fn show_process_list_impl(
1065    frontend_client_pool: FrontendClientPoolRef,
1066    worker_node_manager: WorkerNodeManagerRef,
1067) -> Vec<ShowProcessListRow> {
1068    // Create a placeholder row for the worker in case of any errors while fetching its running SQLs.
1069    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowProcessListRow> {
1070        vec![ShowProcessListRow {
1071            worker_id: format!("{}", worker_id),
1072            id: "".to_owned(),
1073            user: "".to_owned(),
1074            host: "".to_owned(),
1075            database: "".to_owned(),
1076            time: None,
1077            info: Some(format!(
1078                "Failed to show process list from worker {worker_id} due to: {err_msg}"
1079            )),
1080        }]
1081    }
1082    let futures = worker_node_manager
1083        .list_frontend_nodes()
1084        .into_iter()
1085        .map(|worker| {
1086            let frontend_client_pool_ = frontend_client_pool.clone();
1087            async move {
1088                let client = match frontend_client_pool_.get(&worker).await {
1089                    Ok(client) => client,
1090                    Err(e) => {
1091                        return on_error(worker.id, format!("{}", e.as_report()));
1092                    }
1093                };
1094                let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
1095                    Ok(resp) => resp,
1096                    Err(e) => {
1097                        return on_error(worker.id, format!("{}", e.as_report()));
1098                    }
1099                };
1100                resp.into_inner()
1101                    .running_sqls
1102                    .into_iter()
1103                    .map(|sql| ShowProcessListRow {
1104                        worker_id: format!("{}", worker.id),
1105                        id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
1106                        user: sql.user_name,
1107                        host: sql.peer_addr,
1108                        database: sql.database,
1109                        time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
1110                        info: sql
1111                            .sql
1112                            .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
1113                    })
1114                    .collect_vec()
1115            }
1116        })
1117        .collect_vec();
1118    join_all(futures).await.into_iter().flatten().collect()
1119}
1120
1121#[cfg(test)]
1122mod tests {
1123    use std::ops::Index;
1124
1125    use futures_async_stream::for_await;
1126
1127    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1128
1129    #[tokio::test]
1130    async fn test_show_source() {
1131        let frontend = LocalFrontend::new(Default::default()).await;
1132
1133        let sql = r#"CREATE SOURCE t1 (column1 varchar)
1134        WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1135        FORMAT PLAIN ENCODE JSON"#;
1136        frontend.run_sql(sql).await.unwrap();
1137
1138        let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
1139        rows.sort();
1140        assert_eq!(rows, vec!["Row([Some(b\"public.t1\")])".to_owned(),]);
1141    }
1142
1143    #[tokio::test]
1144    async fn test_show_column() {
1145        let proto_file = create_proto_file(PROTO_FILE_DATA);
1146        let sql = format!(
1147            r#"CREATE SOURCE t
1148    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1149    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1150            proto_file.path().to_str().unwrap()
1151        );
1152        let frontend = LocalFrontend::new(Default::default()).await;
1153        frontend.run_sql(sql).await.unwrap();
1154
1155        let sql = "show columns from t";
1156        let mut pg_response = frontend.run_sql(sql).await.unwrap();
1157
1158        let mut columns = Vec::new();
1159        #[for_await]
1160        for row_set in pg_response.values_stream() {
1161            let row_set = row_set.unwrap();
1162            for row in row_set {
1163                columns.push((
1164                    std::str::from_utf8(row.index(0).as_ref().unwrap())
1165                        .unwrap()
1166                        .to_owned(),
1167                    std::str::from_utf8(row.index(1).as_ref().unwrap())
1168                        .unwrap()
1169                        .to_owned(),
1170                ));
1171            }
1172        }
1173
1174        expect_test::expect![[r#"
1175            [
1176                (
1177                    "id",
1178                    "integer",
1179                ),
1180                (
1181                    "country",
1182                    "struct",
1183                ),
1184                (
1185                    "country.address",
1186                    "character varying",
1187                ),
1188                (
1189                    "country.city",
1190                    "struct",
1191                ),
1192                (
1193                    "country.city.address",
1194                    "character varying",
1195                ),
1196                (
1197                    "country.city.zipcode",
1198                    "character varying",
1199                ),
1200                (
1201                    "country.zipcode",
1202                    "character varying",
1203                ),
1204                (
1205                    "zipcode",
1206                    "bigint",
1207                ),
1208                (
1209                    "rate",
1210                    "real",
1211                ),
1212                (
1213                    "_rw_kafka_timestamp",
1214                    "timestamp with time zone",
1215                ),
1216                (
1217                    "_rw_kafka_partition",
1218                    "character varying",
1219                ),
1220                (
1221                    "_rw_kafka_offset",
1222                    "character varying",
1223                ),
1224                (
1225                    "_row_id",
1226                    "serial",
1227                ),
1228            ]
1229        "#]]
1230        .assert_debug_eq(&columns);
1231    }
1232}