risingwave_frontend/handler/
show.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::pg_server::Session;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::GetRunningSqlsRequest;
33use risingwave_rpc_client::FrontendClientPoolRef;
34use risingwave_sqlparser::ast::{
35    Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
36};
37use thiserror_ext::AsReport;
38
39use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
40use crate::binder::{Binder, Relation};
41use crate::catalog::catalog_service::CatalogReadGuard;
42use crate::catalog::root_catalog::SchemaPath;
43use crate::catalog::schema_catalog::SchemaCatalog;
44use crate::catalog::{CatalogError, IndexCatalog};
45use crate::error::{Result, RwError};
46use crate::handler::HandlerArgs;
47use crate::handler::create_connection::print_connection_params;
48use crate::session::cursor_manager::SubscriptionCursor;
49use crate::session::{SessionImpl, WorkerProcessId};
50use crate::user::has_access_to_object;
51use crate::user::user_catalog::UserCatalog;
52
53pub fn get_columns_from_table(
54    session: &SessionImpl,
55    table_name: ObjectName,
56) -> Result<Vec<ColumnCatalog>> {
57    let mut binder = Binder::new_for_system(session);
58    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
59    let column_catalogs = match relation {
60        Relation::Source(s) => s.catalog.columns,
61        Relation::BaseTable(t) => t.table_catalog.columns.clone(),
62        Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
63        _ => {
64            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
65        }
66    };
67
68    Ok(column_catalogs)
69}
70
71pub fn get_columns_from_sink(
72    session: &SessionImpl,
73    sink_name: ObjectName,
74) -> Result<Vec<ColumnCatalog>> {
75    let binder = Binder::new_for_system(session);
76    let sink = binder.bind_sink_by_name(sink_name.clone())?;
77    Ok(sink.sink_catalog.full_columns().to_vec())
78}
79
80pub fn get_columns_from_view(
81    session: &SessionImpl,
82    view_name: ObjectName,
83) -> Result<Vec<ColumnCatalog>> {
84    let binder = Binder::new_for_system(session);
85    let view = binder.bind_view_by_name(view_name.clone())?;
86
87    Ok(view
88        .view_catalog
89        .columns
90        .iter()
91        .enumerate()
92        .map(|(idx, field)| ColumnCatalog {
93            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
94            is_hidden: false,
95        })
96        .collect())
97}
98
99pub fn get_indexes_from_table(
100    session: &SessionImpl,
101    table_name: ObjectName,
102) -> Result<Vec<Arc<IndexCatalog>>> {
103    let mut binder = Binder::new_for_system(session);
104    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
105    let indexes = match relation {
106        Relation::BaseTable(t) => t.table_indexes,
107        _ => {
108            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
109        }
110    };
111
112    Ok(indexes)
113}
114
115fn schema_or_search_path(
116    session: &Arc<SessionImpl>,
117    schema: &Option<Ident>,
118    search_path: &SearchPath,
119) -> Vec<String> {
120    if let Some(s) = schema {
121        vec![s.real_value()]
122    } else {
123        search_path
124            .real_path()
125            .iter()
126            .map(|s| {
127                if s.eq(USER_NAME_WILD_CARD) {
128                    session.user_name()
129                } else {
130                    s.clone()
131                }
132            })
133            .collect()
134    }
135}
136
137fn iter_schema_items<F, T>(
138    session: &Arc<SessionImpl>,
139    schema: &Option<Ident>,
140    reader: &CatalogReadGuard,
141    current_user: &UserCatalog,
142    mut f: F,
143) -> Vec<T>
144where
145    F: FnMut(&SchemaCatalog) -> Vec<T>,
146{
147    let search_path = session.config().search_path();
148
149    schema_or_search_path(session, schema, &search_path)
150        .into_iter()
151        .filter_map(|schema| {
152            if let Ok(schema_catalog) =
153                reader.get_schema_by_name(&session.database(), schema.as_ref())
154                && (current_user.is_super
155                    || current_user.has_schema_usage_privilege(schema_catalog.id()))
156            {
157                Some(schema_catalog)
158            } else {
159                None
160            }
161        })
162        .flat_map(|s| f(s).into_iter())
163        .collect()
164}
165
166#[derive(Fields)]
167#[fields(style = "Title Case")]
168struct ShowObjectRow {
169    name: String,
170}
171
172#[derive(Fields)]
173#[fields(style = "Title Case")]
174pub struct ShowColumnRow {
175    pub name: ShowColumnName,
176    pub r#type: String,
177    pub is_hidden: Option<String>, // XXX: why not bool?
178    pub description: Option<String>,
179}
180
181#[derive(Clone, Debug)]
182enum ShowColumnNameSegment {
183    Field(Ident),
184    ListElement,
185}
186
187impl ShowColumnNameSegment {
188    pub fn field(name: &str) -> Self {
189        ShowColumnNameSegment::Field(Ident::from_real_value(name))
190    }
191}
192
193/// The name of a column in the output of `SHOW COLUMNS` or `DESCRIBE`.
194#[derive(Clone, Debug)]
195pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
196
197impl ShowColumnName {
198    /// Create a special column name without quoting. Used only for extra information like `primary key`
199    /// in the output of `DESCRIBE`.
200    pub fn special(name: &str) -> Self {
201        ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
202            name,
203        ))])
204    }
205}
206
207impl WithDataType for ShowColumnName {
208    fn default_data_type() -> DataType {
209        DataType::Varchar
210    }
211}
212
213impl ToOwnedDatum for ShowColumnName {
214    fn to_owned_datum(self) -> Datum {
215        use std::fmt::Write;
216
217        let mut s = String::new();
218        for segment in self.0 {
219            match segment {
220                ShowColumnNameSegment::Field(ident) => {
221                    if !s.is_empty() {
222                        // TODO: shall we add parentheses, so that it's valid field access SQL?
223                        s.push('.');
224                    }
225                    write!(s, "{ident}").unwrap();
226                }
227                ShowColumnNameSegment::ListElement => {
228                    s.push_str("[1]");
229                }
230            }
231        }
232        s.to_owned_datum()
233    }
234}
235
236impl ShowColumnRow {
237    /// Create a row with the given information. If the data type is a struct or list,
238    /// flatten the data type to also generate rows for its fields.
239    fn flatten(
240        name: ShowColumnName,
241        data_type: DataType,
242        is_hidden: bool,
243        description: Option<String>,
244    ) -> Vec<Self> {
245        // TODO(struct): use struct's type name once supported.
246        let r#type = match &data_type {
247            DataType::Struct(_) => "struct".to_owned(),
248            DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
249            d => d.to_string(),
250        };
251
252        let mut rows = vec![ShowColumnRow {
253            name: name.clone(),
254            r#type,
255            is_hidden: Some(is_hidden.to_string()),
256            description,
257        }];
258
259        match data_type {
260            DataType::Struct(st) => {
261                rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
262                    let mut name = name.clone();
263                    name.0.push(ShowColumnNameSegment::field(field_name));
264                    Self::flatten(name, field_data_type.clone(), is_hidden, None)
265                }));
266            }
267
268            DataType::List(list) if let DataType::Struct(_) = list.elem() => {
269                let mut name = name.clone();
270                name.0.push(ShowColumnNameSegment::ListElement);
271                rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
272            }
273
274            _ => {}
275        }
276
277        rows
278    }
279
280    pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
281        Self::flatten(
282            ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
283            col.column_desc.data_type,
284            col.is_hidden,
285            col.column_desc.description,
286        )
287    }
288}
289
290#[derive(Fields)]
291#[fields(style = "Title Case")]
292struct ShowConnectionRow {
293    name: String,
294    r#type: String,
295    properties: String,
296}
297
298#[derive(Fields)]
299#[fields(style = "Title Case")]
300struct ShowFunctionRow {
301    name: String,
302    arguments: String,
303    return_type: String,
304    language: String,
305    link: Option<String>,
306}
307
308#[derive(Fields)]
309#[fields(style = "Title Case")]
310struct ShowIndexRow {
311    name: String,
312    on: String,
313    key: String,
314    include: String,
315    distributed_by: String,
316}
317
318impl From<Arc<IndexCatalog>> for ShowIndexRow {
319    fn from(index: Arc<IndexCatalog>) -> Self {
320        let index_display = index.display();
321        ShowIndexRow {
322            name: index.name.clone(),
323            on: index.primary_table.name.clone(),
324            key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
325            include: display_comma_separated(&index_display.include_columns).to_string(),
326            distributed_by: display_comma_separated(&index_display.distributed_by_columns)
327                .to_string(),
328        }
329    }
330}
331
332#[derive(Fields)]
333#[fields(style = "Title Case")]
334struct ShowClusterRow {
335    id: i32,
336    addr: String,
337    r#type: String,
338    state: String,
339    parallelism: Option<i32>,
340    is_streaming: Option<bool>,
341    is_serving: Option<bool>,
342    is_unschedulable: Option<bool>,
343    started_at: Option<Timestamptz>,
344}
345
346#[derive(Fields)]
347#[fields(style = "Title Case")]
348struct ShowJobRow {
349    id: i64,
350    statement: String,
351    create_type: String,
352    progress: String,
353}
354
355#[derive(Fields)]
356#[fields(style = "Title Case")]
357struct ShowProcessListRow {
358    worker_id: String,
359    id: String,
360    user: String,
361    host: String,
362    database: String,
363    time: Option<String>,
364    info: Option<String>,
365}
366
367#[derive(Fields)]
368#[fields(style = "Title Case")]
369struct ShowCreateObjectRow {
370    name: String,
371    create_sql: String,
372}
373
374#[derive(Fields)]
375#[fields(style = "Title Case")]
376struct ShowSubscriptionRow {
377    name: String,
378    retention_seconds: i64,
379}
380
381#[derive(Fields)]
382#[fields(style = "Title Case")]
383struct ShowCursorRow {
384    session_id: String,
385    user: String,
386    host: String,
387    database: String,
388    cursor_name: String,
389}
390
391#[derive(Fields)]
392#[fields(style = "Title Case")]
393struct ShowSubscriptionCursorRow {
394    session_id: String,
395    user: String,
396    host: String,
397    database: String,
398    cursor_name: String,
399    subscription_name: String,
400    state: String,
401    idle_duration_ms: i64,
402}
403
404/// Infer the row description for different show objects.
405pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
406    fields_to_descriptors(match objects {
407        ShowObject::Columns { .. } => ShowColumnRow::fields(),
408        ShowObject::Connection { .. } => ShowConnectionRow::fields(),
409        ShowObject::Function { .. } => ShowFunctionRow::fields(),
410        ShowObject::Indexes { .. } => ShowIndexRow::fields(),
411        ShowObject::Cluster => ShowClusterRow::fields(),
412        ShowObject::Jobs => ShowJobRow::fields(),
413        ShowObject::ProcessList => ShowProcessListRow::fields(),
414        _ => ShowObjectRow::fields(),
415    })
416}
417
418pub async fn handle_show_object(
419    handler_args: HandlerArgs,
420    command: ShowObject,
421    filter: Option<ShowStatementFilter>,
422) -> Result<RwPgResponse> {
423    let session = handler_args.session;
424
425    if let Some(ShowStatementFilter::Where(..)) = filter {
426        bail_not_implemented!("WHERE clause in SHOW statement");
427    }
428
429    let catalog_reader = session.env().catalog_reader();
430    let user_reader = session.env().user_info_reader();
431    let get_catalog_reader = || {
432        let reader = catalog_reader.read_guard();
433        let user_reader = user_reader.read_guard();
434        let current_user = user_reader
435            .get_user_by_name(&session.user_name())
436            .expect("user not found")
437            .clone();
438        (reader, current_user)
439    };
440
441    let names = match command {
442        ShowObject::Table { schema } => {
443            let (reader, current_user) = get_catalog_reader();
444            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
445                schema.iter_user_table().map(|t| t.name.clone()).collect()
446            })
447        }
448        ShowObject::InternalTable { schema } => {
449            let (reader, current_user) = get_catalog_reader();
450            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
451                schema
452                    .iter_internal_table()
453                    .map(|t| t.name.clone())
454                    .collect()
455            })
456        }
457        ShowObject::Database => {
458            let reader = catalog_reader.read_guard();
459            reader.get_all_database_names()
460        }
461        ShowObject::Schema => {
462            let reader = catalog_reader.read_guard();
463            reader.get_all_schema_names(&session.database())?
464        }
465        ShowObject::View { schema } => {
466            let (reader, current_user) = get_catalog_reader();
467            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
468                schema.iter_view().map(|t| t.name.clone()).collect()
469            })
470        }
471        ShowObject::MaterializedView { schema } => {
472            let (reader, current_user) = get_catalog_reader();
473            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
474                schema.iter_created_mvs().map(|t| t.name.clone()).collect()
475            })
476        }
477        ShowObject::Source { schema } => {
478            let (reader, current_user) = get_catalog_reader();
479            let mut sources =
480                iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
481                    schema.iter_source().map(|t| t.name.clone()).collect()
482                });
483            sources.extend(session.temporary_source_manager().keys());
484            sources
485        }
486        ShowObject::Sink { schema } => {
487            let (reader, current_user) = get_catalog_reader();
488            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
489                schema.iter_sink().map(|t| t.name.clone()).collect()
490            })
491        }
492        ShowObject::Subscription { schema } => {
493            let (reader, current_user) = get_catalog_reader();
494            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
495                schema
496                    .iter_subscription()
497                    .map(|t| ShowSubscriptionRow {
498                        name: t.name.clone(),
499                        retention_seconds: t.retention_seconds as i64,
500                    })
501                    .collect()
502            });
503            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
504                .rows(rows)
505                .into());
506        }
507        ShowObject::Secret { schema } => {
508            let (reader, current_user) = get_catalog_reader();
509            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
510                schema.iter_secret().map(|t| t.name.clone()).collect()
511            })
512        }
513        ShowObject::Columns { table } => {
514            let Ok(columns) = get_columns_from_table(&session, table.clone())
515                .or(get_columns_from_sink(&session, table.clone()))
516                .or(get_columns_from_view(&session, table.clone()))
517            else {
518                return Err(CatalogError::NotFound(
519                    "table, source, sink or view",
520                    table.to_string(),
521                )
522                .into());
523            };
524
525            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
526                .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
527                .into());
528        }
529        ShowObject::Indexes { table } => {
530            let indexes = get_indexes_from_table(&session, table)?;
531
532            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
533                .rows(indexes.into_iter().map(ShowIndexRow::from))
534                .into());
535        }
536        ShowObject::Connection { schema } => {
537            let (reader, current_user) = get_catalog_reader();
538            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
539                schema.iter_connections()
540                .map(|c| {
541                    let name = c.name.clone();
542                    let r#type = match &c.info {
543                        connection::Info::PrivateLinkService(_) => {
544                            PRIVATELINK_CONNECTION.to_owned()
545                        },
546                        connection::Info::ConnectionParams(params) => {
547                            params.get_connection_type().unwrap().as_str_name().to_owned()
548                        }
549                    };
550                    let source_names = schema
551                        .get_source_ids_by_connection(c.id)
552                        .unwrap_or_default()
553                        .into_iter()
554                        .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
555                        .collect_vec();
556                    let sink_names = schema
557                        .get_sink_ids_by_connection(c.id)
558                        .unwrap_or_default()
559                        .into_iter()
560                        .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
561                        .collect_vec();
562                    let properties = match &c.info {
563                        connection::Info::PrivateLinkService(i) => {
564                            format!(
565                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
566                                i.get_provider().unwrap().as_str_name(),
567                                i.service_name,
568                                i.endpoint_id,
569                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
570                                serde_json::to_string(&source_names).unwrap(),
571                                serde_json::to_string(&sink_names).unwrap(),
572                            )
573                        }
574                        connection::Info::ConnectionParams(params) => {
575                            // todo: show dep relations
576                            print_connection_params(&session.database(), params, &reader)
577                        }
578                    };
579                    ShowConnectionRow {
580                        name,
581                        r#type,
582                        properties,
583                    }
584                }).collect_vec()
585            });
586            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
587                .rows(rows)
588                .into());
589        }
590        ShowObject::Function { schema } => {
591            let (reader, current_user) = get_catalog_reader();
592            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
593                schema
594                    .iter_function()
595                    .map(|t| ShowFunctionRow {
596                        name: t.name.clone(),
597                        arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
598                        return_type: t.return_type.to_string(),
599                        language: t.language.clone(),
600                        link: t.link.clone(),
601                    })
602                    .collect()
603            });
604            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
605                .rows(rows)
606                .into());
607        }
608        ShowObject::Cluster => {
609            let workers = session.env().meta_client().list_all_nodes().await?;
610            let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
611                let addr: HostAddr = worker.host.as_ref().unwrap().into();
612                let property = worker.property.as_ref();
613                ShowClusterRow {
614                    id: worker.id as _,
615                    addr: addr.to_string(),
616                    r#type: worker.get_type().unwrap().as_str_name().into(),
617                    state: worker.get_state().unwrap().as_str_name().to_owned(),
618                    parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
619                    is_streaming: property.map(|p| p.is_streaming),
620                    is_serving: property.map(|p| p.is_serving),
621                    is_unschedulable: property.map(|p| p.is_unschedulable),
622                    started_at: worker
623                        .started_at
624                        .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
625                }
626            });
627            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
628                .rows(rows)
629                .into());
630        }
631        ShowObject::Jobs => {
632            let resp = session.env().meta_client().get_ddl_progress().await?;
633            let rows = resp.into_iter().map(|job| ShowJobRow {
634                id: job.id as i64,
635                statement: job.statement,
636                create_type: job.create_type,
637                progress: job.progress,
638            });
639            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
640                .rows(rows)
641                .into());
642        }
643        ShowObject::ProcessList => {
644            let rows = show_process_list_impl(
645                session.env().frontend_client_pool(),
646                session.env().worker_node_manager_ref(),
647            )
648            .await;
649            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
650                .rows(rows)
651                .into());
652        }
653        ShowObject::Cursor => {
654            let sessions = session
655                .env()
656                .sessions_map()
657                .read()
658                .values()
659                .cloned()
660                .collect_vec();
661            let mut rows = vec![];
662            for s in sessions {
663                let session_id = format!("{}", s.id().0);
664                let user = s.user_name();
665                let host = format!("{}", s.peer_addr());
666                let database = s.database();
667
668                s.get_cursor_manager()
669                    .iter_query_cursors(|cursor_name: &String, _| {
670                        rows.push(ShowCursorRow {
671                            session_id: session_id.clone(),
672                            user: user.clone(),
673                            host: host.clone(),
674                            database: database.clone(),
675                            cursor_name: cursor_name.to_owned(),
676                        });
677                    })
678                    .await;
679            }
680            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
681                .rows(rows)
682                .into());
683        }
684        ShowObject::SubscriptionCursor => {
685            let sessions = session
686                .env()
687                .sessions_map()
688                .read()
689                .values()
690                .cloned()
691                .collect_vec();
692            let mut rows = vec![];
693            for s in sessions {
694                let session_id = format!("{}", s.id().0);
695                let user = s.user_name();
696                let host = format!("{}", s.peer_addr());
697                let database = s.database().to_owned();
698
699                s.get_cursor_manager()
700                    .iter_subscription_cursors(
701                        |cursor_name: &String, cursor: &SubscriptionCursor| {
702                            rows.push(ShowSubscriptionCursorRow {
703                                session_id: session_id.clone(),
704                                user: user.clone(),
705                                host: host.clone(),
706                                database: database.clone(),
707                                cursor_name: cursor_name.to_owned(),
708                                subscription_name: cursor.subscription_name().to_owned(),
709                                state: cursor.state_info_string(),
710                                idle_duration_ms: cursor.idle_duration().as_millis() as i64,
711                            });
712                        },
713                    )
714                    .await;
715            }
716
717            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
718                .rows(rows)
719                .into());
720        }
721    };
722
723    let rows = names
724        .into_iter()
725        .filter(|arg| match &filter {
726            Some(ShowStatementFilter::Like(pattern)) => like_default(arg, pattern),
727            Some(ShowStatementFilter::ILike(pattern)) => i_like_default(arg, pattern),
728            Some(ShowStatementFilter::Where(..)) => unreachable!(),
729            None => true,
730        })
731        .map(|name| ShowObjectRow { name });
732
733    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
734        .rows(rows)
735        .into())
736}
737
738pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
739    fields_to_descriptors(ShowCreateObjectRow::fields())
740}
741
742pub fn handle_show_create_object(
743    handle_args: HandlerArgs,
744    show_create_type: ShowCreateType,
745    name: ObjectName,
746) -> Result<RwPgResponse> {
747    let session = handle_args.session;
748    let catalog_reader = session.env().catalog_reader().read_guard();
749    let database = session.database();
750    let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
751    let search_path = session.config().search_path();
752    let user_name = &session.user_name();
753    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
754    let user_reader = session.env().user_info_reader().read_guard();
755    let current_user = user_reader
756        .get_user_by_name(user_name)
757        .expect("user not found");
758
759    let (sql, schema_name) = match show_create_type {
760        ShowCreateType::MaterializedView => {
761            let (mv, schema) = schema_path
762                .try_find(|schema_name| {
763                    Ok::<_, RwError>(
764                        catalog_reader
765                            .get_schema_by_name(&database, schema_name)?
766                            .get_created_table_by_name(&object_name)
767                            .filter(|t| {
768                                t.is_mview()
769                                    && has_access_to_object(current_user, t.id.table_id, t.owner)
770                            }),
771                    )
772                })?
773                .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
774            (mv.create_sql(), schema)
775        }
776        ShowCreateType::View => {
777            let (view, schema) =
778                catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
779            if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
780                return Err(CatalogError::NotFound("view", name.to_string()).into());
781            }
782            (view.create_sql(schema.to_owned()), schema)
783        }
784        ShowCreateType::Table => {
785            let (table, schema) = schema_path
786                .try_find(|schema_name| {
787                    Ok::<_, RwError>(
788                        catalog_reader
789                            .get_schema_by_name(&database, schema_name)?
790                            .get_created_table_by_name(&object_name)
791                            .filter(|t| {
792                                t.is_user_table()
793                                    && has_access_to_object(current_user, t.id.table_id, t.owner)
794                            }),
795                    )
796                })?
797                .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
798
799            (table.create_sql_purified(), schema)
800        }
801        ShowCreateType::Sink => {
802            let (sink, schema) =
803                catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
804            if !has_access_to_object(current_user, sink.id.sink_id, sink.owner.user_id) {
805                return Err(CatalogError::NotFound("sink", name.to_string()).into());
806            }
807            (sink.create_sql(), schema)
808        }
809        ShowCreateType::Source => {
810            let (source, schema) = schema_path
811                .try_find(|schema_name| {
812                    Ok::<_, RwError>(
813                        catalog_reader
814                            .get_schema_by_name(&database, schema_name)?
815                            .get_source_by_name(&object_name)
816                            .filter(|s| {
817                                s.associated_table_id.is_none()
818                                    && has_access_to_object(current_user, s.id, s.owner)
819                            }),
820                    )
821                })?
822                .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
823            (source.create_sql_purified(), schema)
824        }
825        ShowCreateType::Index => {
826            let (index, schema) = schema_path
827                .try_find(|schema_name| {
828                    Ok::<_, RwError>(
829                        catalog_reader
830                            .get_schema_by_name(&database, schema_name)?
831                            .get_created_table_by_name(&object_name)
832                            .filter(|t| {
833                                t.is_index()
834                                    && has_access_to_object(current_user, t.id.table_id, t.owner)
835                            }),
836                    )
837                })?
838                .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
839            (index.create_sql(), schema)
840        }
841        ShowCreateType::Function => {
842            bail_not_implemented!("show create on: {}", show_create_type);
843        }
844        ShowCreateType::Subscription => {
845            let (subscription, schema) =
846                catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
847            if !has_access_to_object(
848                current_user,
849                subscription.id.subscription_id,
850                subscription.owner.user_id,
851            ) {
852                return Err(CatalogError::NotFound("subscription", name.to_string()).into());
853            }
854            (subscription.create_sql(), schema)
855        }
856    };
857    let name = format!("{}.{}", schema_name, object_name);
858
859    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
860        .rows([ShowCreateObjectRow {
861            name,
862            create_sql: sql,
863        }])
864        .into())
865}
866
867async fn show_process_list_impl(
868    frontend_client_pool: FrontendClientPoolRef,
869    worker_node_manager: WorkerNodeManagerRef,
870) -> Vec<ShowProcessListRow> {
871    // Create a placeholder row for the worker in case of any errors while fetching its running SQLs.
872    fn on_error(worker_id: u32, err_msg: String) -> Vec<ShowProcessListRow> {
873        vec![ShowProcessListRow {
874            worker_id: format!("{}", worker_id),
875            id: "".to_owned(),
876            user: "".to_owned(),
877            host: "".to_owned(),
878            database: "".to_owned(),
879            time: None,
880            info: Some(format!(
881                "Failed to show process list from worker {worker_id} due to: {err_msg}"
882            )),
883        }]
884    }
885    let futures = worker_node_manager
886        .list_frontend_nodes()
887        .into_iter()
888        .map(|worker| {
889            let frontend_client_pool_ = frontend_client_pool.clone();
890            async move {
891                let client = match frontend_client_pool_.get(&worker).await {
892                    Ok(client) => client,
893                    Err(e) => {
894                        return on_error(worker.id, format!("{}", e.as_report()));
895                    }
896                };
897                let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
898                    Ok(resp) => resp,
899                    Err(e) => {
900                        return on_error(worker.id, format!("{}", e.as_report()));
901                    }
902                };
903                resp.into_inner()
904                    .running_sqls
905                    .into_iter()
906                    .map(|sql| ShowProcessListRow {
907                        worker_id: format!("{}", worker.id),
908                        id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
909                        user: sql.user_name,
910                        host: sql.peer_addr,
911                        database: sql.database,
912                        time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
913                        info: sql
914                            .sql
915                            .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
916                    })
917                    .collect_vec()
918            }
919        })
920        .collect_vec();
921    join_all(futures).await.into_iter().flatten().collect()
922}
923
924#[cfg(test)]
925mod tests {
926    use std::ops::Index;
927
928    use futures_async_stream::for_await;
929
930    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
931
932    #[tokio::test]
933    async fn test_show_source() {
934        let frontend = LocalFrontend::new(Default::default()).await;
935
936        let sql = r#"CREATE SOURCE t1 (column1 varchar)
937        WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
938        FORMAT PLAIN ENCODE JSON"#;
939        frontend.run_sql(sql).await.unwrap();
940
941        let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
942        rows.sort();
943        assert_eq!(rows, vec!["Row([Some(b\"t1\")])".to_owned(),]);
944    }
945
946    #[tokio::test]
947    async fn test_show_column() {
948        let proto_file = create_proto_file(PROTO_FILE_DATA);
949        let sql = format!(
950            r#"CREATE SOURCE t
951    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
952    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
953            proto_file.path().to_str().unwrap()
954        );
955        let frontend = LocalFrontend::new(Default::default()).await;
956        frontend.run_sql(sql).await.unwrap();
957
958        let sql = "show columns from t";
959        let mut pg_response = frontend.run_sql(sql).await.unwrap();
960
961        let mut columns = Vec::new();
962        #[for_await]
963        for row_set in pg_response.values_stream() {
964            let row_set = row_set.unwrap();
965            for row in row_set {
966                columns.push((
967                    std::str::from_utf8(row.index(0).as_ref().unwrap())
968                        .unwrap()
969                        .to_owned(),
970                    std::str::from_utf8(row.index(1).as_ref().unwrap())
971                        .unwrap()
972                        .to_owned(),
973                ));
974            }
975        }
976
977        expect_test::expect![[r#"
978            [
979                (
980                    "id",
981                    "integer",
982                ),
983                (
984                    "country",
985                    "struct",
986                ),
987                (
988                    "country.address",
989                    "character varying",
990                ),
991                (
992                    "country.city",
993                    "struct",
994                ),
995                (
996                    "country.city.address",
997                    "character varying",
998                ),
999                (
1000                    "country.city.zipcode",
1001                    "character varying",
1002                ),
1003                (
1004                    "country.zipcode",
1005                    "character varying",
1006                ),
1007                (
1008                    "zipcode",
1009                    "bigint",
1010                ),
1011                (
1012                    "rate",
1013                    "real",
1014                ),
1015                (
1016                    "_rw_kafka_timestamp",
1017                    "timestamp with time zone",
1018                ),
1019                (
1020                    "_rw_kafka_partition",
1021                    "character varying",
1022                ),
1023                (
1024                    "_rw_kafka_offset",
1025                    "character varying",
1026                ),
1027                (
1028                    "_row_id",
1029                    "serial",
1030                ),
1031            ]
1032        "#]]
1033        .assert_debug_eq(&columns);
1034    }
1035}