risingwave_frontend/handler/
show.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::pg_server::Session;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::GetRunningSqlsRequest;
33use risingwave_rpc_client::FrontendClientPoolRef;
34use risingwave_sqlparser::ast::{
35    Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
36};
37use thiserror_ext::AsReport;
38
39use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
40use crate::binder::{Binder, Relation};
41use crate::catalog::catalog_service::CatalogReadGuard;
42use crate::catalog::root_catalog::SchemaPath;
43use crate::catalog::schema_catalog::SchemaCatalog;
44use crate::catalog::{CatalogError, IndexCatalog};
45use crate::error::{Result, RwError};
46use crate::handler::HandlerArgs;
47use crate::handler::create_connection::print_connection_params;
48use crate::session::cursor_manager::SubscriptionCursor;
49use crate::session::{SessionImpl, WorkerProcessId};
50use crate::user::has_access_to_object;
51
52pub fn get_columns_from_table(
53    session: &SessionImpl,
54    table_name: ObjectName,
55) -> Result<Vec<ColumnCatalog>> {
56    let mut binder = Binder::new_for_system(session);
57    let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?;
58    let column_catalogs = match relation {
59        Relation::Source(s) => s.catalog.columns,
60        Relation::BaseTable(t) => t.table_catalog.columns.clone(),
61        Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
62        _ => {
63            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
64        }
65    };
66
67    Ok(column_catalogs)
68}
69
70pub fn get_columns_from_sink(
71    session: &SessionImpl,
72    sink_name: ObjectName,
73) -> Result<Vec<ColumnCatalog>> {
74    let binder = Binder::new_for_system(session);
75    let sink = binder.bind_sink_by_name(sink_name.clone())?;
76    Ok(sink.sink_catalog.full_columns().to_vec())
77}
78
79pub fn get_columns_from_view(
80    session: &SessionImpl,
81    view_name: ObjectName,
82) -> Result<Vec<ColumnCatalog>> {
83    let binder = Binder::new_for_system(session);
84    let view = binder.bind_view_by_name(view_name.clone())?;
85
86    Ok(view
87        .view_catalog
88        .columns
89        .iter()
90        .enumerate()
91        .map(|(idx, field)| ColumnCatalog {
92            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
93            is_hidden: false,
94        })
95        .collect())
96}
97
98pub fn get_indexes_from_table(
99    session: &SessionImpl,
100    table_name: ObjectName,
101) -> Result<Vec<Arc<IndexCatalog>>> {
102    let mut binder = Binder::new_for_system(session);
103    let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?;
104    let indexes = match relation {
105        Relation::BaseTable(t) => t.table_indexes,
106        _ => {
107            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
108        }
109    };
110
111    Ok(indexes)
112}
113
114fn schema_or_search_path(
115    session: &Arc<SessionImpl>,
116    schema: &Option<Ident>,
117    search_path: &SearchPath,
118) -> Vec<String> {
119    if let Some(s) = schema {
120        vec![s.real_value()]
121    } else {
122        search_path
123            .real_path()
124            .iter()
125            .map(|s| {
126                if s.eq(USER_NAME_WILD_CARD) {
127                    session.user_name()
128                } else {
129                    s.to_string()
130                }
131            })
132            .collect()
133    }
134}
135
136fn iter_schema_items<F, T>(
137    session: &Arc<SessionImpl>,
138    schema: &Option<Ident>,
139    reader: &CatalogReadGuard,
140    mut f: F,
141) -> Vec<T>
142where
143    F: FnMut(&SchemaCatalog) -> Vec<T>,
144{
145    let search_path = session.config().search_path();
146
147    schema_or_search_path(session, schema, &search_path)
148        .into_iter()
149        .filter_map(|schema| {
150            reader
151                .get_schema_by_name(&session.database(), schema.as_ref())
152                .ok()
153        })
154        .flat_map(|s| f(s).into_iter())
155        .collect()
156}
157
158#[derive(Fields)]
159#[fields(style = "Title Case")]
160struct ShowObjectRow {
161    name: String,
162}
163
164#[derive(Fields)]
165#[fields(style = "Title Case")]
166pub struct ShowColumnRow {
167    pub name: ShowColumnName,
168    pub r#type: String,
169    pub is_hidden: Option<String>, // XXX: why not bool?
170    pub description: Option<String>,
171}
172
173#[derive(Clone, Debug)]
174enum ShowColumnNameSegment {
175    Field(Ident),
176    ListElement,
177}
178
179impl ShowColumnNameSegment {
180    pub fn field(name: &str) -> Self {
181        ShowColumnNameSegment::Field(Ident::from_real_value(name))
182    }
183}
184
185/// The name of a column in the output of `SHOW COLUMNS` or `DESCRIBE`.
186#[derive(Clone, Debug)]
187pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
188
189impl ShowColumnName {
190    /// Create a special column name without quoting. Used only for extra information like `primary key`
191    /// in the output of `DESCRIBE`.
192    pub fn special(name: &str) -> Self {
193        ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
194            name,
195        ))])
196    }
197}
198
199impl WithDataType for ShowColumnName {
200    fn default_data_type() -> DataType {
201        DataType::Varchar
202    }
203}
204
205impl ToOwnedDatum for ShowColumnName {
206    fn to_owned_datum(self) -> Datum {
207        use std::fmt::Write;
208
209        let mut s = String::new();
210        for segment in self.0 {
211            match segment {
212                ShowColumnNameSegment::Field(ident) => {
213                    if !s.is_empty() {
214                        // TODO: shall we add parentheses, so that it's valid field access SQL?
215                        s.push('.');
216                    }
217                    write!(s, "{ident}").unwrap();
218                }
219                ShowColumnNameSegment::ListElement => {
220                    s.push_str("[1]");
221                }
222            }
223        }
224        s.to_owned_datum()
225    }
226}
227
228impl ShowColumnRow {
229    /// Create a row with the given information. If the data type is a struct or list,
230    /// flatten the data type to also generate rows for its fields.
231    fn flatten(
232        name: ShowColumnName,
233        data_type: DataType,
234        is_hidden: bool,
235        description: Option<String>,
236    ) -> Vec<Self> {
237        // TODO(struct): use struct's type name once supported.
238        let r#type = match &data_type {
239            DataType::Struct(_) => "struct".to_owned(),
240            DataType::List(box DataType::Struct(_)) => "struct[]".to_owned(),
241            d => d.to_string(),
242        };
243
244        let mut rows = vec![ShowColumnRow {
245            name: name.clone(),
246            r#type,
247            is_hidden: Some(is_hidden.to_string()),
248            description,
249        }];
250
251        match data_type {
252            DataType::Struct(st) => {
253                rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
254                    let mut name = name.clone();
255                    name.0.push(ShowColumnNameSegment::field(field_name));
256                    Self::flatten(name, field_data_type.clone(), is_hidden, None)
257                }));
258            }
259
260            DataType::List(inner @ box DataType::Struct(_)) => {
261                let mut name = name.clone();
262                name.0.push(ShowColumnNameSegment::ListElement);
263                rows.extend(Self::flatten(name, *inner, is_hidden, None));
264            }
265
266            _ => {}
267        }
268
269        rows
270    }
271
272    pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
273        Self::flatten(
274            ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
275            col.column_desc.data_type,
276            col.is_hidden,
277            col.column_desc.description,
278        )
279    }
280}
281
282#[derive(Fields)]
283#[fields(style = "Title Case")]
284struct ShowConnectionRow {
285    name: String,
286    r#type: String,
287    properties: String,
288}
289
290#[derive(Fields)]
291#[fields(style = "Title Case")]
292struct ShowFunctionRow {
293    name: String,
294    arguments: String,
295    return_type: String,
296    language: String,
297    link: Option<String>,
298}
299
300#[derive(Fields)]
301#[fields(style = "Title Case")]
302struct ShowIndexRow {
303    name: String,
304    on: String,
305    key: String,
306    include: String,
307    distributed_by: String,
308}
309
310impl From<Arc<IndexCatalog>> for ShowIndexRow {
311    fn from(index: Arc<IndexCatalog>) -> Self {
312        let index_display = index.display();
313        ShowIndexRow {
314            name: index.name.clone(),
315            on: index.primary_table.name.clone(),
316            key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
317            include: display_comma_separated(&index_display.include_columns).to_string(),
318            distributed_by: display_comma_separated(&index_display.distributed_by_columns)
319                .to_string(),
320        }
321    }
322}
323
324#[derive(Fields)]
325#[fields(style = "Title Case")]
326struct ShowClusterRow {
327    id: i32,
328    addr: String,
329    r#type: String,
330    state: String,
331    parallelism: Option<i32>,
332    is_streaming: Option<bool>,
333    is_serving: Option<bool>,
334    is_unschedulable: Option<bool>,
335    started_at: Option<Timestamptz>,
336}
337
338#[derive(Fields)]
339#[fields(style = "Title Case")]
340struct ShowJobRow {
341    id: i64,
342    statement: String,
343    progress: String,
344}
345
346#[derive(Fields)]
347#[fields(style = "Title Case")]
348struct ShowProcessListRow {
349    worker_id: String,
350    id: String,
351    user: String,
352    host: String,
353    database: String,
354    time: Option<String>,
355    info: Option<String>,
356}
357
358#[derive(Fields)]
359#[fields(style = "Title Case")]
360struct ShowCreateObjectRow {
361    name: String,
362    create_sql: String,
363}
364
365#[derive(Fields)]
366#[fields(style = "Title Case")]
367struct ShowSubscriptionRow {
368    name: String,
369    retention_seconds: i64,
370}
371
372#[derive(Fields)]
373#[fields(style = "Title Case")]
374struct ShowCursorRow {
375    session_id: String,
376    user: String,
377    host: String,
378    database: String,
379    cursor_name: String,
380}
381
382#[derive(Fields)]
383#[fields(style = "Title Case")]
384struct ShowSubscriptionCursorRow {
385    session_id: String,
386    user: String,
387    host: String,
388    database: String,
389    cursor_name: String,
390    subscription_name: String,
391    state: String,
392    idle_duration_ms: i64,
393}
394
395/// Infer the row description for different show objects.
396pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
397    fields_to_descriptors(match objects {
398        ShowObject::Columns { .. } => ShowColumnRow::fields(),
399        ShowObject::Connection { .. } => ShowConnectionRow::fields(),
400        ShowObject::Function { .. } => ShowFunctionRow::fields(),
401        ShowObject::Indexes { .. } => ShowIndexRow::fields(),
402        ShowObject::Cluster => ShowClusterRow::fields(),
403        ShowObject::Jobs => ShowJobRow::fields(),
404        ShowObject::ProcessList => ShowProcessListRow::fields(),
405        _ => ShowObjectRow::fields(),
406    })
407}
408
409pub async fn handle_show_object(
410    handler_args: HandlerArgs,
411    command: ShowObject,
412    filter: Option<ShowStatementFilter>,
413) -> Result<RwPgResponse> {
414    let session = handler_args.session;
415
416    if let Some(ShowStatementFilter::Where(..)) = filter {
417        bail_not_implemented!("WHERE clause in SHOW statement");
418    }
419
420    let catalog_reader = session.env().catalog_reader();
421    let user_reader = session.env().user_info_reader();
422
423    let names = match command {
424        ShowObject::Table { schema } => {
425            let reader = catalog_reader.read_guard();
426            let user_reader = user_reader.read_guard();
427            let current_user = user_reader
428                .get_user_by_name(&session.user_name())
429                .expect("user not found");
430            iter_schema_items(&session, &schema, &reader, |schema| {
431                schema
432                    .iter_user_table_with_acl(current_user)
433                    .map(|t| t.name.clone())
434                    .collect()
435            })
436        }
437        ShowObject::InternalTable { schema } => {
438            let reader = catalog_reader.read_guard();
439            let user_reader = user_reader.read_guard();
440            let current_user = user_reader
441                .get_user_by_name(&session.user_name())
442                .expect("user not found");
443            iter_schema_items(&session, &schema, &reader, |schema| {
444                schema
445                    .iter_internal_table_with_acl(current_user)
446                    .map(|t| t.name.clone())
447                    .collect()
448            })
449        }
450        ShowObject::Database => catalog_reader.read_guard().get_all_database_names(),
451        ShowObject::Schema => catalog_reader
452            .read_guard()
453            .get_all_schema_names(&session.database())?,
454        ShowObject::View { schema } => {
455            let reader = catalog_reader.read_guard();
456            let user_reader = user_reader.read_guard();
457            let current_user = user_reader
458                .get_user_by_name(&session.user_name())
459                .expect("user not found");
460            iter_schema_items(&session, &schema, &reader, |schema| {
461                schema
462                    .iter_view_with_acl(current_user)
463                    .map(|t| t.name.clone())
464                    .collect()
465            })
466        }
467        ShowObject::MaterializedView { schema } => {
468            let reader = catalog_reader.read_guard();
469            let user_reader = user_reader.read_guard();
470            let current_user = user_reader
471                .get_user_by_name(&session.user_name())
472                .expect("user not found");
473            iter_schema_items(&session, &schema, &reader, |schema| {
474                schema
475                    .iter_created_mvs_with_acl(current_user)
476                    .map(|t| t.name.clone())
477                    .collect()
478            })
479        }
480        ShowObject::Source { schema } => {
481            let reader = catalog_reader.read_guard();
482            let user_reader = user_reader.read_guard();
483            let current_user = user_reader
484                .get_user_by_name(&session.user_name())
485                .expect("user not found");
486            let mut sources = iter_schema_items(&session, &schema, &reader, |schema| {
487                schema
488                    .iter_source_with_acl(current_user)
489                    .map(|t| t.name.clone())
490                    .collect()
491            });
492            sources.extend(session.temporary_source_manager().keys());
493            sources
494        }
495        ShowObject::Sink { schema } => {
496            let reader = catalog_reader.read_guard();
497            let user_reader = user_reader.read_guard();
498            let current_user = user_reader
499                .get_user_by_name(&session.user_name())
500                .expect("user not found");
501            iter_schema_items(&session, &schema, &reader, |schema| {
502                schema
503                    .iter_sink_with_acl(current_user)
504                    .map(|t| t.name.clone())
505                    .collect()
506            })
507        }
508        ShowObject::Subscription { schema } => {
509            let reader = catalog_reader.read_guard();
510            let rows = iter_schema_items(&session, &schema, &reader, |schema| {
511                schema
512                    .iter_subscription()
513                    .map(|t| ShowSubscriptionRow {
514                        name: t.name.clone(),
515                        retention_seconds: t.retention_seconds as i64,
516                    })
517                    .collect()
518            });
519            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
520                .rows(rows)
521                .into());
522        }
523        ShowObject::Secret { schema } => {
524            let reader = catalog_reader.read_guard();
525            iter_schema_items(&session, &schema, &reader, |schema| {
526                schema.iter_secret().map(|t| t.name.clone()).collect()
527            })
528        }
529        ShowObject::Columns { table } => {
530            let Ok(columns) = get_columns_from_table(&session, table.clone())
531                .or(get_columns_from_sink(&session, table.clone()))
532                .or(get_columns_from_view(&session, table.clone()))
533            else {
534                return Err(CatalogError::NotFound(
535                    "table, source, sink or view",
536                    table.to_string(),
537                )
538                .into());
539            };
540
541            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
542                .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
543                .into());
544        }
545        ShowObject::Indexes { table } => {
546            let indexes = get_indexes_from_table(&session, table)?;
547
548            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
549                .rows(indexes.into_iter().map(ShowIndexRow::from))
550                .into());
551        }
552        ShowObject::Connection { schema } => {
553            let reader = catalog_reader.read_guard();
554            let rows = iter_schema_items(&session, &schema, &reader, |schema| {
555                schema.iter_connections()
556                .map(|c| {
557                    let name = c.name.clone();
558                    let r#type = match &c.info {
559                        connection::Info::PrivateLinkService(_) => {
560                            PRIVATELINK_CONNECTION.to_owned()
561                        },
562                        connection::Info::ConnectionParams(params) => {
563                            params.get_connection_type().unwrap().as_str_name().to_owned()
564                        }
565                    };
566                    let source_names = schema
567                        .get_source_ids_by_connection(c.id)
568                        .unwrap_or_default()
569                        .into_iter()
570                        .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
571                        .collect_vec();
572                    let sink_names = schema
573                        .get_sink_ids_by_connection(c.id)
574                        .unwrap_or_default()
575                        .into_iter()
576                        .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
577                        .collect_vec();
578                    let properties = match &c.info {
579                        connection::Info::PrivateLinkService(i) => {
580                            format!(
581                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
582                                i.get_provider().unwrap().as_str_name(),
583                                i.service_name,
584                                i.endpoint_id,
585                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
586                                serde_json::to_string(&source_names).unwrap(),
587                                serde_json::to_string(&sink_names).unwrap(),
588                            )
589                        }
590                        connection::Info::ConnectionParams(params) => {
591                            // todo: show dep relations
592                            print_connection_params(params, schema)
593                        }
594                    };
595                    ShowConnectionRow {
596                        name,
597                        r#type,
598                        properties,
599                    }
600                }).collect_vec()
601            });
602            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
603                .rows(rows)
604                .into());
605        }
606        ShowObject::Function { schema } => {
607            let reader = catalog_reader.read_guard();
608            let rows = iter_schema_items(&session, &schema, &reader, |schema| {
609                schema
610                    .iter_function()
611                    .map(|t| ShowFunctionRow {
612                        name: t.name.clone(),
613                        arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
614                        return_type: t.return_type.to_string(),
615                        language: t.language.clone(),
616                        link: t.link.clone(),
617                    })
618                    .collect()
619            });
620            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
621                .rows(rows)
622                .into());
623        }
624        ShowObject::Cluster => {
625            let workers = session.env().meta_client().list_all_nodes().await?;
626            let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
627                let addr: HostAddr = worker.host.as_ref().unwrap().into();
628                let property = worker.property.as_ref();
629                ShowClusterRow {
630                    id: worker.id as _,
631                    addr: addr.to_string(),
632                    r#type: worker.get_type().unwrap().as_str_name().into(),
633                    state: worker.get_state().unwrap().as_str_name().to_owned(),
634                    parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
635                    is_streaming: property.map(|p| p.is_streaming),
636                    is_serving: property.map(|p| p.is_serving),
637                    is_unschedulable: property.map(|p| p.is_unschedulable),
638                    started_at: worker
639                        .started_at
640                        .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
641                }
642            });
643            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
644                .rows(rows)
645                .into());
646        }
647        ShowObject::Jobs => {
648            let resp = session.env().meta_client().get_ddl_progress().await?;
649            let rows = resp.into_iter().map(|job| ShowJobRow {
650                id: job.id as i64,
651                statement: job.statement,
652                progress: job.progress,
653            });
654            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
655                .rows(rows)
656                .into());
657        }
658        ShowObject::ProcessList => {
659            let rows = show_process_list_impl(
660                session.env().frontend_client_pool(),
661                session.env().worker_node_manager_ref(),
662            )
663            .await;
664            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
665                .rows(rows)
666                .into());
667        }
668        ShowObject::Cursor => {
669            let sessions = session
670                .env()
671                .sessions_map()
672                .read()
673                .values()
674                .cloned()
675                .collect_vec();
676            let mut rows = vec![];
677            for s in sessions {
678                let session_id = format!("{}", s.id().0);
679                let user = s.user_name();
680                let host = format!("{}", s.peer_addr());
681                let database = s.database();
682
683                s.get_cursor_manager()
684                    .iter_query_cursors(|cursor_name: &String, _| {
685                        rows.push(ShowCursorRow {
686                            session_id: session_id.clone(),
687                            user: user.clone(),
688                            host: host.clone(),
689                            database: database.clone(),
690                            cursor_name: cursor_name.to_owned(),
691                        });
692                    })
693                    .await;
694            }
695            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
696                .rows(rows)
697                .into());
698        }
699        ShowObject::SubscriptionCursor => {
700            let sessions = session
701                .env()
702                .sessions_map()
703                .read()
704                .values()
705                .cloned()
706                .collect_vec();
707            let mut rows = vec![];
708            for s in sessions {
709                let session_id = format!("{}", s.id().0);
710                let user = s.user_name();
711                let host = format!("{}", s.peer_addr());
712                let database = s.database().to_owned();
713
714                s.get_cursor_manager()
715                    .iter_subscription_cursors(
716                        |cursor_name: &String, cursor: &SubscriptionCursor| {
717                            rows.push(ShowSubscriptionCursorRow {
718                                session_id: session_id.clone(),
719                                user: user.clone(),
720                                host: host.clone(),
721                                database: database.clone(),
722                                cursor_name: cursor_name.to_owned(),
723                                subscription_name: cursor.subscription_name().to_owned(),
724                                state: cursor.state_info_string(),
725                                idle_duration_ms: cursor.idle_duration().as_millis() as i64,
726                            });
727                        },
728                    )
729                    .await;
730            }
731
732            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
733                .rows(rows)
734                .into());
735        }
736    };
737
738    let rows = names
739        .into_iter()
740        .filter(|arg| match &filter {
741            Some(ShowStatementFilter::Like(pattern)) => like_default(arg, pattern),
742            Some(ShowStatementFilter::ILike(pattern)) => i_like_default(arg, pattern),
743            Some(ShowStatementFilter::Where(..)) => unreachable!(),
744            None => true,
745        })
746        .map(|name| ShowObjectRow { name });
747
748    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
749        .rows(rows)
750        .into())
751}
752
753pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
754    fields_to_descriptors(ShowCreateObjectRow::fields())
755}
756
757pub fn handle_show_create_object(
758    handle_args: HandlerArgs,
759    show_create_type: ShowCreateType,
760    name: ObjectName,
761) -> Result<RwPgResponse> {
762    let session = handle_args.session;
763    let catalog_reader = session.env().catalog_reader().read_guard();
764    let database = session.database();
765    let (schema_name, object_name) =
766        Binder::resolve_schema_qualified_name(&database, name.clone())?;
767    let search_path = session.config().search_path();
768    let user_name = &session.user_name();
769    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
770    let user_reader = session.env().user_info_reader().read_guard();
771    let current_user = user_reader
772        .get_user_by_name(user_name)
773        .expect("user not found");
774
775    let (sql, schema_name) = match show_create_type {
776        ShowCreateType::MaterializedView => {
777            let (mv, schema) = schema_path
778                .try_find(|schema_name| {
779                    Ok::<_, RwError>(
780                        catalog_reader
781                            .get_schema_by_name(&database, schema_name)?
782                            .get_created_table_by_name(&object_name)
783                            .filter(|t| {
784                                t.is_mview()
785                                    && has_access_to_object(
786                                        current_user,
787                                        schema_name,
788                                        t.id.table_id,
789                                        t.owner,
790                                    )
791                            }),
792                    )
793                })?
794                .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
795            (mv.create_sql(), schema)
796        }
797        ShowCreateType::View => {
798            let (view, schema) =
799                catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
800            if !view.is_system_view()
801                && !has_access_to_object(current_user, schema, view.id, view.owner)
802            {
803                return Err(CatalogError::NotFound("view", name.to_string()).into());
804            }
805            (view.create_sql(schema.to_owned()), schema)
806        }
807        ShowCreateType::Table => {
808            let (table, schema) = schema_path
809                .try_find(|schema_name| {
810                    Ok::<_, RwError>(
811                        catalog_reader
812                            .get_schema_by_name(&database, schema_name)?
813                            .get_created_table_by_name(&object_name)
814                            .filter(|t| {
815                                t.is_user_table()
816                                    && has_access_to_object(
817                                        current_user,
818                                        schema_name,
819                                        t.id.table_id,
820                                        t.owner,
821                                    )
822                            }),
823                    )
824                })?
825                .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
826
827            (table.create_sql_purified(), schema)
828        }
829        ShowCreateType::Sink => {
830            let (sink, schema) =
831                catalog_reader.get_sink_by_name(&database, schema_path, &object_name)?;
832            if !has_access_to_object(current_user, schema, sink.id.sink_id, sink.owner.user_id) {
833                return Err(CatalogError::NotFound("sink", name.to_string()).into());
834            }
835            (sink.create_sql(), schema)
836        }
837        ShowCreateType::Source => {
838            let (source, schema) = schema_path
839                .try_find(|schema_name| {
840                    Ok::<_, RwError>(
841                        catalog_reader
842                            .get_schema_by_name(&database, schema_name)?
843                            .get_source_by_name(&object_name)
844                            .filter(|s| {
845                                s.associated_table_id.is_none()
846                                    && has_access_to_object(
847                                        current_user,
848                                        schema_name,
849                                        s.id,
850                                        s.owner,
851                                    )
852                            }),
853                    )
854                })?
855                .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
856            (source.create_sql_purified(), schema)
857        }
858        ShowCreateType::Index => {
859            let (index, schema) = schema_path
860                .try_find(|schema_name| {
861                    Ok::<_, RwError>(
862                        catalog_reader
863                            .get_schema_by_name(&database, schema_name)?
864                            .get_created_table_by_name(&object_name)
865                            .filter(|t| {
866                                t.is_index()
867                                    && has_access_to_object(
868                                        current_user,
869                                        schema_name,
870                                        t.id.table_id,
871                                        t.owner,
872                                    )
873                            }),
874                    )
875                })?
876                .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
877            (index.create_sql(), schema)
878        }
879        ShowCreateType::Function => {
880            bail_not_implemented!("show create on: {}", show_create_type);
881        }
882        ShowCreateType::Subscription => {
883            let (subscription, schema) =
884                catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
885            if !has_access_to_object(
886                current_user,
887                schema,
888                subscription.id.subscription_id,
889                subscription.owner.user_id,
890            ) {
891                return Err(CatalogError::NotFound("subscription", name.to_string()).into());
892            }
893            (subscription.create_sql(), schema)
894        }
895    };
896    let name = format!("{}.{}", schema_name, object_name);
897
898    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
899        .rows([ShowCreateObjectRow {
900            name,
901            create_sql: sql,
902        }])
903        .into())
904}
905
906async fn show_process_list_impl(
907    frontend_client_pool: FrontendClientPoolRef,
908    worker_node_manager: WorkerNodeManagerRef,
909) -> Vec<ShowProcessListRow> {
910    // Create a placeholder row for the worker in case of any errors while fetching its running SQLs.
911    fn on_error(worker_id: u32, err_msg: String) -> Vec<ShowProcessListRow> {
912        vec![ShowProcessListRow {
913            worker_id: format!("{}", worker_id),
914            id: "".to_owned(),
915            user: "".to_owned(),
916            host: "".to_owned(),
917            database: "".to_owned(),
918            time: None,
919            info: Some(format!(
920                "Failed to show process list from worker {worker_id} due to: {err_msg}"
921            )),
922        }]
923    }
924    let futures = worker_node_manager
925        .list_frontend_nodes()
926        .into_iter()
927        .map(|worker| {
928            let frontend_client_pool_ = frontend_client_pool.clone();
929            async move {
930                let client = match frontend_client_pool_.get(&worker).await {
931                    Ok(client) => client,
932                    Err(e) => {
933                        return on_error(worker.id, format!("{}", e.as_report()));
934                    }
935                };
936                let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
937                    Ok(resp) => resp,
938                    Err(e) => {
939                        return on_error(worker.id, format!("{}", e.as_report()));
940                    }
941                };
942                resp.into_inner()
943                    .running_sqls
944                    .into_iter()
945                    .map(|sql| ShowProcessListRow {
946                        worker_id: format!("{}", worker.id),
947                        id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
948                        user: sql.user_name,
949                        host: sql.peer_addr,
950                        database: sql.database,
951                        time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
952                        info: sql
953                            .sql
954                            .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
955                    })
956                    .collect_vec()
957            }
958        })
959        .collect_vec();
960    join_all(futures).await.into_iter().flatten().collect()
961}
962
963#[cfg(test)]
964mod tests {
965    use std::ops::Index;
966
967    use futures_async_stream::for_await;
968
969    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
970
971    #[tokio::test]
972    async fn test_show_source() {
973        let frontend = LocalFrontend::new(Default::default()).await;
974
975        let sql = r#"CREATE SOURCE t1 (column1 varchar)
976        WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
977        FORMAT PLAIN ENCODE JSON"#;
978        frontend.run_sql(sql).await.unwrap();
979
980        let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
981        rows.sort();
982        assert_eq!(rows, vec!["Row([Some(b\"t1\")])".to_owned(),]);
983    }
984
985    #[tokio::test]
986    async fn test_show_column() {
987        let proto_file = create_proto_file(PROTO_FILE_DATA);
988        let sql = format!(
989            r#"CREATE SOURCE t
990    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
991    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
992            proto_file.path().to_str().unwrap()
993        );
994        let frontend = LocalFrontend::new(Default::default()).await;
995        frontend.run_sql(sql).await.unwrap();
996
997        let sql = "show columns from t";
998        let mut pg_response = frontend.run_sql(sql).await.unwrap();
999
1000        let mut columns = Vec::new();
1001        #[for_await]
1002        for row_set in pg_response.values_stream() {
1003            let row_set = row_set.unwrap();
1004            for row in row_set {
1005                columns.push((
1006                    std::str::from_utf8(row.index(0).as_ref().unwrap())
1007                        .unwrap()
1008                        .to_owned(),
1009                    std::str::from_utf8(row.index(1).as_ref().unwrap())
1010                        .unwrap()
1011                        .to_owned(),
1012                ));
1013            }
1014        }
1015
1016        expect_test::expect![[r#"
1017            [
1018                (
1019                    "id",
1020                    "integer",
1021                ),
1022                (
1023                    "country",
1024                    "struct",
1025                ),
1026                (
1027                    "country.address",
1028                    "character varying",
1029                ),
1030                (
1031                    "country.city",
1032                    "struct",
1033                ),
1034                (
1035                    "country.city.address",
1036                    "character varying",
1037                ),
1038                (
1039                    "country.city.zipcode",
1040                    "character varying",
1041                ),
1042                (
1043                    "country.zipcode",
1044                    "character varying",
1045                ),
1046                (
1047                    "zipcode",
1048                    "bigint",
1049                ),
1050                (
1051                    "rate",
1052                    "real",
1053                ),
1054                (
1055                    "_rw_kafka_timestamp",
1056                    "timestamp with time zone",
1057                ),
1058                (
1059                    "_rw_kafka_partition",
1060                    "character varying",
1061                ),
1062                (
1063                    "_rw_kafka_offset",
1064                    "character varying",
1065                ),
1066                (
1067                    "_row_id",
1068                    "serial",
1069                ),
1070            ]
1071        "#]]
1072        .assert_debug_eq(&columns);
1073    }
1074}