risingwave_frontend/handler/
show.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::pg_server::Session;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::GetRunningSqlsRequest;
33use risingwave_rpc_client::FrontendClientPoolRef;
34use risingwave_sqlparser::ast::{
35    Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
36};
37use thiserror_ext::AsReport;
38
39use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
40use crate::binder::{Binder, Relation};
41use crate::catalog::catalog_service::CatalogReadGuard;
42use crate::catalog::root_catalog::SchemaPath;
43use crate::catalog::schema_catalog::SchemaCatalog;
44use crate::catalog::{CatalogError, IndexCatalog};
45use crate::error::{Result, RwError};
46use crate::handler::HandlerArgs;
47use crate::handler::create_connection::print_connection_params;
48use crate::session::cursor_manager::SubscriptionCursor;
49use crate::session::{SessionImpl, WorkerProcessId};
50use crate::user::has_access_to_object;
51
52pub fn get_columns_from_table(
53    session: &SessionImpl,
54    table_name: ObjectName,
55) -> Result<Vec<ColumnCatalog>> {
56    let mut binder = Binder::new_for_system(session);
57    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
58    let column_catalogs = match relation {
59        Relation::Source(s) => s.catalog.columns,
60        Relation::BaseTable(t) => t.table_catalog.columns.clone(),
61        Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
62        _ => {
63            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
64        }
65    };
66
67    Ok(column_catalogs)
68}
69
70pub fn get_columns_from_sink(
71    session: &SessionImpl,
72    sink_name: ObjectName,
73) -> Result<Vec<ColumnCatalog>> {
74    let binder = Binder::new_for_system(session);
75    let sink = binder.bind_sink_by_name(sink_name.clone())?;
76    Ok(sink.sink_catalog.full_columns().to_vec())
77}
78
79pub fn get_columns_from_view(
80    session: &SessionImpl,
81    view_name: ObjectName,
82) -> Result<Vec<ColumnCatalog>> {
83    let binder = Binder::new_for_system(session);
84    let view = binder.bind_view_by_name(view_name.clone())?;
85
86    Ok(view
87        .view_catalog
88        .columns
89        .iter()
90        .enumerate()
91        .map(|(idx, field)| ColumnCatalog {
92            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
93            is_hidden: false,
94        })
95        .collect())
96}
97
98pub fn get_indexes_from_table(
99    session: &SessionImpl,
100    table_name: ObjectName,
101) -> Result<Vec<Arc<IndexCatalog>>> {
102    let mut binder = Binder::new_for_system(session);
103    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
104    let indexes = match relation {
105        Relation::BaseTable(t) => t.table_indexes,
106        _ => {
107            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
108        }
109    };
110
111    Ok(indexes)
112}
113
114fn schema_or_search_path(
115    session: &Arc<SessionImpl>,
116    schema: &Option<Ident>,
117    search_path: &SearchPath,
118) -> Vec<String> {
119    if let Some(s) = schema {
120        vec![s.real_value()]
121    } else {
122        search_path
123            .real_path()
124            .iter()
125            .map(|s| {
126                if s.eq(USER_NAME_WILD_CARD) {
127                    session.user_name()
128                } else {
129                    s.clone()
130                }
131            })
132            .collect()
133    }
134}
135
136fn iter_schema_items<F, T>(
137    session: &Arc<SessionImpl>,
138    schema: &Option<Ident>,
139    reader: &CatalogReadGuard,
140    mut f: F,
141) -> Vec<T>
142where
143    F: FnMut(&SchemaCatalog) -> Vec<T>,
144{
145    let search_path = session.config().search_path();
146
147    schema_or_search_path(session, schema, &search_path)
148        .into_iter()
149        .filter_map(|schema| {
150            reader
151                .get_schema_by_name(&session.database(), schema.as_ref())
152                .ok()
153        })
154        .flat_map(|s| f(s).into_iter())
155        .collect()
156}
157
158#[derive(Fields)]
159#[fields(style = "Title Case")]
160struct ShowObjectRow {
161    name: String,
162}
163
164#[derive(Fields)]
165#[fields(style = "Title Case")]
166pub struct ShowColumnRow {
167    pub name: ShowColumnName,
168    pub r#type: String,
169    pub is_hidden: Option<String>, // XXX: why not bool?
170    pub description: Option<String>,
171}
172
173#[derive(Clone, Debug)]
174enum ShowColumnNameSegment {
175    Field(Ident),
176    ListElement,
177}
178
179impl ShowColumnNameSegment {
180    pub fn field(name: &str) -> Self {
181        ShowColumnNameSegment::Field(Ident::from_real_value(name))
182    }
183}
184
185/// The name of a column in the output of `SHOW COLUMNS` or `DESCRIBE`.
186#[derive(Clone, Debug)]
187pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
188
189impl ShowColumnName {
190    /// Create a special column name without quoting. Used only for extra information like `primary key`
191    /// in the output of `DESCRIBE`.
192    pub fn special(name: &str) -> Self {
193        ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
194            name,
195        ))])
196    }
197}
198
199impl WithDataType for ShowColumnName {
200    fn default_data_type() -> DataType {
201        DataType::Varchar
202    }
203}
204
205impl ToOwnedDatum for ShowColumnName {
206    fn to_owned_datum(self) -> Datum {
207        use std::fmt::Write;
208
209        let mut s = String::new();
210        for segment in self.0 {
211            match segment {
212                ShowColumnNameSegment::Field(ident) => {
213                    if !s.is_empty() {
214                        // TODO: shall we add parentheses, so that it's valid field access SQL?
215                        s.push('.');
216                    }
217                    write!(s, "{ident}").unwrap();
218                }
219                ShowColumnNameSegment::ListElement => {
220                    s.push_str("[1]");
221                }
222            }
223        }
224        s.to_owned_datum()
225    }
226}
227
228impl ShowColumnRow {
229    /// Create a row with the given information. If the data type is a struct or list,
230    /// flatten the data type to also generate rows for its fields.
231    fn flatten(
232        name: ShowColumnName,
233        data_type: DataType,
234        is_hidden: bool,
235        description: Option<String>,
236    ) -> Vec<Self> {
237        // TODO(struct): use struct's type name once supported.
238        let r#type = match &data_type {
239            DataType::Struct(_) => "struct".to_owned(),
240            DataType::List(box DataType::Struct(_)) => "struct[]".to_owned(),
241            d => d.to_string(),
242        };
243
244        let mut rows = vec![ShowColumnRow {
245            name: name.clone(),
246            r#type,
247            is_hidden: Some(is_hidden.to_string()),
248            description,
249        }];
250
251        match data_type {
252            DataType::Struct(st) => {
253                rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
254                    let mut name = name.clone();
255                    name.0.push(ShowColumnNameSegment::field(field_name));
256                    Self::flatten(name, field_data_type.clone(), is_hidden, None)
257                }));
258            }
259
260            DataType::List(inner @ box DataType::Struct(_)) => {
261                let mut name = name.clone();
262                name.0.push(ShowColumnNameSegment::ListElement);
263                rows.extend(Self::flatten(name, *inner, is_hidden, None));
264            }
265
266            _ => {}
267        }
268
269        rows
270    }
271
272    pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
273        Self::flatten(
274            ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
275            col.column_desc.data_type,
276            col.is_hidden,
277            col.column_desc.description,
278        )
279    }
280}
281
282#[derive(Fields)]
283#[fields(style = "Title Case")]
284struct ShowConnectionRow {
285    name: String,
286    r#type: String,
287    properties: String,
288}
289
290#[derive(Fields)]
291#[fields(style = "Title Case")]
292struct ShowFunctionRow {
293    name: String,
294    arguments: String,
295    return_type: String,
296    language: String,
297    link: Option<String>,
298}
299
300#[derive(Fields)]
301#[fields(style = "Title Case")]
302struct ShowIndexRow {
303    name: String,
304    on: String,
305    key: String,
306    include: String,
307    distributed_by: String,
308}
309
310impl From<Arc<IndexCatalog>> for ShowIndexRow {
311    fn from(index: Arc<IndexCatalog>) -> Self {
312        let index_display = index.display();
313        ShowIndexRow {
314            name: index.name.clone(),
315            on: index.primary_table.name.clone(),
316            key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
317            include: display_comma_separated(&index_display.include_columns).to_string(),
318            distributed_by: display_comma_separated(&index_display.distributed_by_columns)
319                .to_string(),
320        }
321    }
322}
323
324#[derive(Fields)]
325#[fields(style = "Title Case")]
326struct ShowClusterRow {
327    id: i32,
328    addr: String,
329    r#type: String,
330    state: String,
331    parallelism: Option<i32>,
332    is_streaming: Option<bool>,
333    is_serving: Option<bool>,
334    is_unschedulable: Option<bool>,
335    started_at: Option<Timestamptz>,
336}
337
338#[derive(Fields)]
339#[fields(style = "Title Case")]
340struct ShowJobRow {
341    id: i64,
342    statement: String,
343    create_type: String,
344    progress: String,
345}
346
347#[derive(Fields)]
348#[fields(style = "Title Case")]
349struct ShowProcessListRow {
350    worker_id: String,
351    id: String,
352    user: String,
353    host: String,
354    database: String,
355    time: Option<String>,
356    info: Option<String>,
357}
358
359#[derive(Fields)]
360#[fields(style = "Title Case")]
361struct ShowCreateObjectRow {
362    name: String,
363    create_sql: String,
364}
365
366#[derive(Fields)]
367#[fields(style = "Title Case")]
368struct ShowSubscriptionRow {
369    name: String,
370    retention_seconds: i64,
371}
372
373#[derive(Fields)]
374#[fields(style = "Title Case")]
375struct ShowCursorRow {
376    session_id: String,
377    user: String,
378    host: String,
379    database: String,
380    cursor_name: String,
381}
382
383#[derive(Fields)]
384#[fields(style = "Title Case")]
385struct ShowSubscriptionCursorRow {
386    session_id: String,
387    user: String,
388    host: String,
389    database: String,
390    cursor_name: String,
391    subscription_name: String,
392    state: String,
393    idle_duration_ms: i64,
394}
395
396/// Infer the row description for different show objects.
397pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
398    fields_to_descriptors(match objects {
399        ShowObject::Columns { .. } => ShowColumnRow::fields(),
400        ShowObject::Connection { .. } => ShowConnectionRow::fields(),
401        ShowObject::Function { .. } => ShowFunctionRow::fields(),
402        ShowObject::Indexes { .. } => ShowIndexRow::fields(),
403        ShowObject::Cluster => ShowClusterRow::fields(),
404        ShowObject::Jobs => ShowJobRow::fields(),
405        ShowObject::ProcessList => ShowProcessListRow::fields(),
406        _ => ShowObjectRow::fields(),
407    })
408}
409
410pub async fn handle_show_object(
411    handler_args: HandlerArgs,
412    command: ShowObject,
413    filter: Option<ShowStatementFilter>,
414) -> Result<RwPgResponse> {
415    let session = handler_args.session;
416
417    if let Some(ShowStatementFilter::Where(..)) = filter {
418        bail_not_implemented!("WHERE clause in SHOW statement");
419    }
420
421    let catalog_reader = session.env().catalog_reader();
422    let user_reader = session.env().user_info_reader();
423
424    let names = match command {
425        ShowObject::Table { schema } => {
426            let reader = catalog_reader.read_guard();
427            let user_reader = user_reader.read_guard();
428            let current_user = user_reader
429                .get_user_by_name(&session.user_name())
430                .expect("user not found");
431            iter_schema_items(&session, &schema, &reader, |schema| {
432                schema
433                    .iter_user_table_with_acl(current_user)
434                    .map(|t| t.name.clone())
435                    .collect()
436            })
437        }
438        ShowObject::InternalTable { schema } => {
439            let reader = catalog_reader.read_guard();
440            let user_reader = user_reader.read_guard();
441            let current_user = user_reader
442                .get_user_by_name(&session.user_name())
443                .expect("user not found");
444            iter_schema_items(&session, &schema, &reader, |schema| {
445                schema
446                    .iter_internal_table_with_acl(current_user)
447                    .map(|t| t.name.clone())
448                    .collect()
449            })
450        }
451        ShowObject::Database => catalog_reader.read_guard().get_all_database_names(),
452        ShowObject::Schema => catalog_reader
453            .read_guard()
454            .get_all_schema_names(&session.database())?,
455        ShowObject::View { schema } => {
456            let reader = catalog_reader.read_guard();
457            let user_reader = user_reader.read_guard();
458            let current_user = user_reader
459                .get_user_by_name(&session.user_name())
460                .expect("user not found");
461            iter_schema_items(&session, &schema, &reader, |schema| {
462                schema
463                    .iter_view_with_acl(current_user)
464                    .map(|t| t.name.clone())
465                    .collect()
466            })
467        }
468        ShowObject::MaterializedView { schema } => {
469            let reader = catalog_reader.read_guard();
470            let user_reader = user_reader.read_guard();
471            let current_user = user_reader
472                .get_user_by_name(&session.user_name())
473                .expect("user not found");
474            iter_schema_items(&session, &schema, &reader, |schema| {
475                schema
476                    .iter_created_mvs_with_acl(current_user)
477                    .map(|t| t.name.clone())
478                    .collect()
479            })
480        }
481        ShowObject::Source { schema } => {
482            let reader = catalog_reader.read_guard();
483            let user_reader = user_reader.read_guard();
484            let current_user = user_reader
485                .get_user_by_name(&session.user_name())
486                .expect("user not found");
487            let mut sources = iter_schema_items(&session, &schema, &reader, |schema| {
488                schema
489                    .iter_source_with_acl(current_user)
490                    .map(|t| t.name.clone())
491                    .collect()
492            });
493            sources.extend(session.temporary_source_manager().keys());
494            sources
495        }
496        ShowObject::Sink { schema } => {
497            let reader = catalog_reader.read_guard();
498            let user_reader = user_reader.read_guard();
499            let current_user = user_reader
500                .get_user_by_name(&session.user_name())
501                .expect("user not found");
502            iter_schema_items(&session, &schema, &reader, |schema| {
503                schema
504                    .iter_sink_with_acl(current_user)
505                    .map(|t| t.name.clone())
506                    .collect()
507            })
508        }
509        ShowObject::Subscription { schema } => {
510            let reader = catalog_reader.read_guard();
511            let rows = iter_schema_items(&session, &schema, &reader, |schema| {
512                schema
513                    .iter_subscription()
514                    .map(|t| ShowSubscriptionRow {
515                        name: t.name.clone(),
516                        retention_seconds: t.retention_seconds as i64,
517                    })
518                    .collect()
519            });
520            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
521                .rows(rows)
522                .into());
523        }
524        ShowObject::Secret { schema } => {
525            let reader = catalog_reader.read_guard();
526            iter_schema_items(&session, &schema, &reader, |schema| {
527                schema.iter_secret().map(|t| t.name.clone()).collect()
528            })
529        }
530        ShowObject::Columns { table } => {
531            let Ok(columns) = get_columns_from_table(&session, table.clone())
532                .or(get_columns_from_sink(&session, table.clone()))
533                .or(get_columns_from_view(&session, table.clone()))
534            else {
535                return Err(CatalogError::NotFound(
536                    "table, source, sink or view",
537                    table.to_string(),
538                )
539                .into());
540            };
541
542            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
543                .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
544                .into());
545        }
546        ShowObject::Indexes { table } => {
547            let indexes = get_indexes_from_table(&session, table)?;
548
549            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
550                .rows(indexes.into_iter().map(ShowIndexRow::from))
551                .into());
552        }
553        ShowObject::Connection { schema } => {
554            let reader = catalog_reader.read_guard();
555            let rows = iter_schema_items(&session, &schema, &reader, |schema| {
556                schema.iter_connections()
557                .map(|c| {
558                    let name = c.name.clone();
559                    let r#type = match &c.info {
560                        connection::Info::PrivateLinkService(_) => {
561                            PRIVATELINK_CONNECTION.to_owned()
562                        },
563                        connection::Info::ConnectionParams(params) => {
564                            params.get_connection_type().unwrap().as_str_name().to_owned()
565                        }
566                    };
567                    let source_names = schema
568                        .get_source_ids_by_connection(c.id)
569                        .unwrap_or_default()
570                        .into_iter()
571                        .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
572                        .collect_vec();
573                    let sink_names = schema
574                        .get_sink_ids_by_connection(c.id)
575                        .unwrap_or_default()
576                        .into_iter()
577                        .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
578                        .collect_vec();
579                    let properties = match &c.info {
580                        connection::Info::PrivateLinkService(i) => {
581                            format!(
582                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
583                                i.get_provider().unwrap().as_str_name(),
584                                i.service_name,
585                                i.endpoint_id,
586                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
587                                serde_json::to_string(&source_names).unwrap(),
588                                serde_json::to_string(&sink_names).unwrap(),
589                            )
590                        }
591                        connection::Info::ConnectionParams(params) => {
592                            // todo: show dep relations
593                            print_connection_params(&session.database(), params, &reader)
594                        }
595                    };
596                    ShowConnectionRow {
597                        name,
598                        r#type,
599                        properties,
600                    }
601                }).collect_vec()
602            });
603            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
604                .rows(rows)
605                .into());
606        }
607        ShowObject::Function { schema } => {
608            let reader = catalog_reader.read_guard();
609            let rows = iter_schema_items(&session, &schema, &reader, |schema| {
610                schema
611                    .iter_function()
612                    .map(|t| ShowFunctionRow {
613                        name: t.name.clone(),
614                        arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
615                        return_type: t.return_type.to_string(),
616                        language: t.language.clone(),
617                        link: t.link.clone(),
618                    })
619                    .collect()
620            });
621            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
622                .rows(rows)
623                .into());
624        }
625        ShowObject::Cluster => {
626            let workers = session.env().meta_client().list_all_nodes().await?;
627            let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
628                let addr: HostAddr = worker.host.as_ref().unwrap().into();
629                let property = worker.property.as_ref();
630                ShowClusterRow {
631                    id: worker.id as _,
632                    addr: addr.to_string(),
633                    r#type: worker.get_type().unwrap().as_str_name().into(),
634                    state: worker.get_state().unwrap().as_str_name().to_owned(),
635                    parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
636                    is_streaming: property.map(|p| p.is_streaming),
637                    is_serving: property.map(|p| p.is_serving),
638                    is_unschedulable: property.map(|p| p.is_unschedulable),
639                    started_at: worker
640                        .started_at
641                        .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
642                }
643            });
644            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
645                .rows(rows)
646                .into());
647        }
648        ShowObject::Jobs => {
649            let resp = session.env().meta_client().get_ddl_progress().await?;
650            let rows = resp.into_iter().map(|job| ShowJobRow {
651                id: job.id as i64,
652                statement: job.statement,
653                create_type: job.create_type,
654                progress: job.progress,
655            });
656            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
657                .rows(rows)
658                .into());
659        }
660        ShowObject::ProcessList => {
661            let rows = show_process_list_impl(
662                session.env().frontend_client_pool(),
663                session.env().worker_node_manager_ref(),
664            )
665            .await;
666            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
667                .rows(rows)
668                .into());
669        }
670        ShowObject::Cursor => {
671            let sessions = session
672                .env()
673                .sessions_map()
674                .read()
675                .values()
676                .cloned()
677                .collect_vec();
678            let mut rows = vec![];
679            for s in sessions {
680                let session_id = format!("{}", s.id().0);
681                let user = s.user_name();
682                let host = format!("{}", s.peer_addr());
683                let database = s.database();
684
685                s.get_cursor_manager()
686                    .iter_query_cursors(|cursor_name: &String, _| {
687                        rows.push(ShowCursorRow {
688                            session_id: session_id.clone(),
689                            user: user.clone(),
690                            host: host.clone(),
691                            database: database.clone(),
692                            cursor_name: cursor_name.to_owned(),
693                        });
694                    })
695                    .await;
696            }
697            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
698                .rows(rows)
699                .into());
700        }
701        ShowObject::SubscriptionCursor => {
702            let sessions = session
703                .env()
704                .sessions_map()
705                .read()
706                .values()
707                .cloned()
708                .collect_vec();
709            let mut rows = vec![];
710            for s in sessions {
711                let session_id = format!("{}", s.id().0);
712                let user = s.user_name();
713                let host = format!("{}", s.peer_addr());
714                let database = s.database().to_owned();
715
716                s.get_cursor_manager()
717                    .iter_subscription_cursors(
718                        |cursor_name: &String, cursor: &SubscriptionCursor| {
719                            rows.push(ShowSubscriptionCursorRow {
720                                session_id: session_id.clone(),
721                                user: user.clone(),
722                                host: host.clone(),
723                                database: database.clone(),
724                                cursor_name: cursor_name.to_owned(),
725                                subscription_name: cursor.subscription_name().to_owned(),
726                                state: cursor.state_info_string(),
727                                idle_duration_ms: cursor.idle_duration().as_millis() as i64,
728                            });
729                        },
730                    )
731                    .await;
732            }
733
734            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
735                .rows(rows)
736                .into());
737        }
738    };
739
740    let rows = names
741        .into_iter()
742        .filter(|arg| match &filter {
743            Some(ShowStatementFilter::Like(pattern)) => like_default(arg, pattern),
744            Some(ShowStatementFilter::ILike(pattern)) => i_like_default(arg, pattern),
745            Some(ShowStatementFilter::Where(..)) => unreachable!(),
746            None => true,
747        })
748        .map(|name| ShowObjectRow { name });
749
750    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
751        .rows(rows)
752        .into())
753}
754
755pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
756    fields_to_descriptors(ShowCreateObjectRow::fields())
757}
758
759pub fn handle_show_create_object(
760    handle_args: HandlerArgs,
761    show_create_type: ShowCreateType,
762    name: ObjectName,
763) -> Result<RwPgResponse> {
764    let session = handle_args.session;
765    let catalog_reader = session.env().catalog_reader().read_guard();
766    let database = session.database();
767    let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
768    let search_path = session.config().search_path();
769    let user_name = &session.user_name();
770    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
771    let user_reader = session.env().user_info_reader().read_guard();
772    let current_user = user_reader
773        .get_user_by_name(user_name)
774        .expect("user not found");
775
776    let (sql, schema_name) = match show_create_type {
777        ShowCreateType::MaterializedView => {
778            let (mv, schema) = schema_path
779                .try_find(|schema_name| {
780                    Ok::<_, RwError>(
781                        catalog_reader
782                            .get_schema_by_name(&database, schema_name)?
783                            .get_created_table_by_name(&object_name)
784                            .filter(|t| {
785                                t.is_mview()
786                                    && has_access_to_object(
787                                        current_user,
788                                        schema_name,
789                                        t.id.table_id,
790                                        t.owner,
791                                    )
792                            }),
793                    )
794                })?
795                .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
796            (mv.create_sql(), schema)
797        }
798        ShowCreateType::View => {
799            let (view, schema) =
800                catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
801            if !view.is_system_view()
802                && !has_access_to_object(current_user, schema, view.id, view.owner)
803            {
804                return Err(CatalogError::NotFound("view", name.to_string()).into());
805            }
806            (view.create_sql(schema.to_owned()), schema)
807        }
808        ShowCreateType::Table => {
809            let (table, schema) = schema_path
810                .try_find(|schema_name| {
811                    Ok::<_, RwError>(
812                        catalog_reader
813                            .get_schema_by_name(&database, schema_name)?
814                            .get_created_table_by_name(&object_name)
815                            .filter(|t| {
816                                t.is_user_table()
817                                    && has_access_to_object(
818                                        current_user,
819                                        schema_name,
820                                        t.id.table_id,
821                                        t.owner,
822                                    )
823                            }),
824                    )
825                })?
826                .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
827
828            (table.create_sql_purified(), schema)
829        }
830        ShowCreateType::Sink => {
831            let (sink, schema) =
832                catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
833            if !has_access_to_object(current_user, schema, sink.id.sink_id, sink.owner.user_id) {
834                return Err(CatalogError::NotFound("sink", name.to_string()).into());
835            }
836            (sink.create_sql(), schema)
837        }
838        ShowCreateType::Source => {
839            let (source, schema) = schema_path
840                .try_find(|schema_name| {
841                    Ok::<_, RwError>(
842                        catalog_reader
843                            .get_schema_by_name(&database, schema_name)?
844                            .get_source_by_name(&object_name)
845                            .filter(|s| {
846                                s.associated_table_id.is_none()
847                                    && has_access_to_object(
848                                        current_user,
849                                        schema_name,
850                                        s.id,
851                                        s.owner,
852                                    )
853                            }),
854                    )
855                })?
856                .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
857            (source.create_sql_purified(), schema)
858        }
859        ShowCreateType::Index => {
860            let (index, schema) = schema_path
861                .try_find(|schema_name| {
862                    Ok::<_, RwError>(
863                        catalog_reader
864                            .get_schema_by_name(&database, schema_name)?
865                            .get_created_table_by_name(&object_name)
866                            .filter(|t| {
867                                t.is_index()
868                                    && has_access_to_object(
869                                        current_user,
870                                        schema_name,
871                                        t.id.table_id,
872                                        t.owner,
873                                    )
874                            }),
875                    )
876                })?
877                .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
878            (index.create_sql(), schema)
879        }
880        ShowCreateType::Function => {
881            bail_not_implemented!("show create on: {}", show_create_type);
882        }
883        ShowCreateType::Subscription => {
884            let (subscription, schema) =
885                catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
886            if !has_access_to_object(
887                current_user,
888                schema,
889                subscription.id.subscription_id,
890                subscription.owner.user_id,
891            ) {
892                return Err(CatalogError::NotFound("subscription", name.to_string()).into());
893            }
894            (subscription.create_sql(), schema)
895        }
896    };
897    let name = format!("{}.{}", schema_name, object_name);
898
899    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
900        .rows([ShowCreateObjectRow {
901            name,
902            create_sql: sql,
903        }])
904        .into())
905}
906
907async fn show_process_list_impl(
908    frontend_client_pool: FrontendClientPoolRef,
909    worker_node_manager: WorkerNodeManagerRef,
910) -> Vec<ShowProcessListRow> {
911    // Create a placeholder row for the worker in case of any errors while fetching its running SQLs.
912    fn on_error(worker_id: u32, err_msg: String) -> Vec<ShowProcessListRow> {
913        vec![ShowProcessListRow {
914            worker_id: format!("{}", worker_id),
915            id: "".to_owned(),
916            user: "".to_owned(),
917            host: "".to_owned(),
918            database: "".to_owned(),
919            time: None,
920            info: Some(format!(
921                "Failed to show process list from worker {worker_id} due to: {err_msg}"
922            )),
923        }]
924    }
925    let futures = worker_node_manager
926        .list_frontend_nodes()
927        .into_iter()
928        .map(|worker| {
929            let frontend_client_pool_ = frontend_client_pool.clone();
930            async move {
931                let client = match frontend_client_pool_.get(&worker).await {
932                    Ok(client) => client,
933                    Err(e) => {
934                        return on_error(worker.id, format!("{}", e.as_report()));
935                    }
936                };
937                let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
938                    Ok(resp) => resp,
939                    Err(e) => {
940                        return on_error(worker.id, format!("{}", e.as_report()));
941                    }
942                };
943                resp.into_inner()
944                    .running_sqls
945                    .into_iter()
946                    .map(|sql| ShowProcessListRow {
947                        worker_id: format!("{}", worker.id),
948                        id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
949                        user: sql.user_name,
950                        host: sql.peer_addr,
951                        database: sql.database,
952                        time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
953                        info: sql
954                            .sql
955                            .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
956                    })
957                    .collect_vec()
958            }
959        })
960        .collect_vec();
961    join_all(futures).await.into_iter().flatten().collect()
962}
963
964#[cfg(test)]
965mod tests {
966    use std::ops::Index;
967
968    use futures_async_stream::for_await;
969
970    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
971
972    #[tokio::test]
973    async fn test_show_source() {
974        let frontend = LocalFrontend::new(Default::default()).await;
975
976        let sql = r#"CREATE SOURCE t1 (column1 varchar)
977        WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
978        FORMAT PLAIN ENCODE JSON"#;
979        frontend.run_sql(sql).await.unwrap();
980
981        let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
982        rows.sort();
983        assert_eq!(rows, vec!["Row([Some(b\"t1\")])".to_owned(),]);
984    }
985
986    #[tokio::test]
987    async fn test_show_column() {
988        let proto_file = create_proto_file(PROTO_FILE_DATA);
989        let sql = format!(
990            r#"CREATE SOURCE t
991    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
992    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
993            proto_file.path().to_str().unwrap()
994        );
995        let frontend = LocalFrontend::new(Default::default()).await;
996        frontend.run_sql(sql).await.unwrap();
997
998        let sql = "show columns from t";
999        let mut pg_response = frontend.run_sql(sql).await.unwrap();
1000
1001        let mut columns = Vec::new();
1002        #[for_await]
1003        for row_set in pg_response.values_stream() {
1004            let row_set = row_set.unwrap();
1005            for row in row_set {
1006                columns.push((
1007                    std::str::from_utf8(row.index(0).as_ref().unwrap())
1008                        .unwrap()
1009                        .to_owned(),
1010                    std::str::from_utf8(row.index(1).as_ref().unwrap())
1011                        .unwrap()
1012                        .to_owned(),
1013                ));
1014            }
1015        }
1016
1017        expect_test::expect![[r#"
1018            [
1019                (
1020                    "id",
1021                    "integer",
1022                ),
1023                (
1024                    "country",
1025                    "struct",
1026                ),
1027                (
1028                    "country.address",
1029                    "character varying",
1030                ),
1031                (
1032                    "country.city",
1033                    "struct",
1034                ),
1035                (
1036                    "country.city.address",
1037                    "character varying",
1038                ),
1039                (
1040                    "country.city.zipcode",
1041                    "character varying",
1042                ),
1043                (
1044                    "country.zipcode",
1045                    "character varying",
1046                ),
1047                (
1048                    "zipcode",
1049                    "bigint",
1050                ),
1051                (
1052                    "rate",
1053                    "real",
1054                ),
1055                (
1056                    "_rw_kafka_timestamp",
1057                    "timestamp with time zone",
1058                ),
1059                (
1060                    "_rw_kafka_partition",
1061                    "character varying",
1062                ),
1063                (
1064                    "_rw_kafka_offset",
1065                    "character varying",
1066                ),
1067                (
1068                    "_row_id",
1069                    "serial",
1070                ),
1071            ]
1072        "#]]
1073        .assert_debug_eq(&columns);
1074    }
1075}