risingwave_frontend/handler/
show.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use itertools::Itertools;
18use pgwire::pg_field_descriptor::PgFieldDescriptor;
19use pgwire::pg_protocol::truncated_fmt;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pgwire::pg_server::Session;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
24use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
25use risingwave_common::types::{DataType, Fields, Timestamptz};
26use risingwave_common::util::addr::HostAddr;
27use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
28use risingwave_expr::scalar::like::{i_like_default, like_default};
29use risingwave_pb::catalog::connection;
30use risingwave_sqlparser::ast::{
31    Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
32};
33
34use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
35use crate::binder::{Binder, Relation};
36use crate::catalog::catalog_service::CatalogReadGuard;
37use crate::catalog::root_catalog::SchemaPath;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::{CatalogError, IndexCatalog};
40use crate::error::{Result, RwError};
41use crate::handler::HandlerArgs;
42use crate::handler::create_connection::print_connection_params;
43use crate::session::SessionImpl;
44use crate::session::cursor_manager::SubscriptionCursor;
45use crate::user::has_access_to_object;
46
47pub fn get_columns_from_table(
48    session: &SessionImpl,
49    table_name: ObjectName,
50) -> Result<Vec<ColumnCatalog>> {
51    let mut binder = Binder::new_for_system(session);
52    let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?;
53    let column_catalogs = match relation {
54        Relation::Source(s) => s.catalog.columns,
55        Relation::BaseTable(t) => t.table_catalog.columns.clone(),
56        Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
57        _ => {
58            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
59        }
60    };
61
62    Ok(column_catalogs)
63}
64
65pub fn get_columns_from_sink(
66    session: &SessionImpl,
67    sink_name: ObjectName,
68) -> Result<Vec<ColumnCatalog>> {
69    let binder = Binder::new_for_system(session);
70    let sink = binder.bind_sink_by_name(sink_name.clone())?;
71    Ok(sink.sink_catalog.full_columns().to_vec())
72}
73
74pub fn get_columns_from_view(
75    session: &SessionImpl,
76    view_name: ObjectName,
77) -> Result<Vec<ColumnCatalog>> {
78    let binder = Binder::new_for_system(session);
79    let view = binder.bind_view_by_name(view_name.clone())?;
80
81    Ok(view
82        .view_catalog
83        .columns
84        .iter()
85        .enumerate()
86        .map(|(idx, field)| ColumnCatalog {
87            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
88            is_hidden: false,
89        })
90        .collect())
91}
92
93pub fn get_indexes_from_table(
94    session: &SessionImpl,
95    table_name: ObjectName,
96) -> Result<Vec<Arc<IndexCatalog>>> {
97    let mut binder = Binder::new_for_system(session);
98    let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?;
99    let indexes = match relation {
100        Relation::BaseTable(t) => t.table_indexes,
101        _ => {
102            return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
103        }
104    };
105
106    Ok(indexes)
107}
108
109fn schema_or_search_path(
110    session: &Arc<SessionImpl>,
111    schema: &Option<Ident>,
112    search_path: &SearchPath,
113) -> Vec<String> {
114    if let Some(s) = schema {
115        vec![s.real_value()]
116    } else {
117        search_path
118            .real_path()
119            .iter()
120            .map(|s| {
121                if s.eq(USER_NAME_WILD_CARD) {
122                    session.user_name()
123                } else {
124                    s.to_string()
125                }
126            })
127            .collect()
128    }
129}
130
131fn iter_schema_items<F, T>(
132    session: &Arc<SessionImpl>,
133    schema: &Option<Ident>,
134    reader: &CatalogReadGuard,
135    mut f: F,
136) -> Vec<T>
137where
138    F: FnMut(&SchemaCatalog) -> Vec<T>,
139{
140    let search_path = session.config().search_path();
141
142    schema_or_search_path(session, schema, &search_path)
143        .into_iter()
144        .filter_map(|schema| {
145            reader
146                .get_schema_by_name(&session.database(), schema.as_ref())
147                .ok()
148        })
149        .flat_map(|s| f(s).into_iter())
150        .collect()
151}
152
153#[derive(Fields)]
154#[fields(style = "Title Case")]
155struct ShowObjectRow {
156    name: String,
157}
158
159#[derive(Fields)]
160#[fields(style = "Title Case")]
161pub struct ShowColumnRow {
162    pub name: String,
163    pub r#type: String,
164    pub is_hidden: Option<String>, // XXX: why not bool?
165    pub description: Option<String>,
166}
167
168impl ShowColumnRow {
169    /// Create a row with the given information. If the data type is a struct or list,
170    /// flatten the data type to also generate rows for its fields.
171    fn flatten(
172        name: String,
173        data_type: DataType,
174        is_hidden: bool,
175        description: Option<String>,
176    ) -> Vec<Self> {
177        // TODO(struct): use struct's type name once supported.
178        let r#type = match &data_type {
179            DataType::Struct(_) => "struct".to_owned(),
180            DataType::List(box DataType::Struct(_)) => "struct[]".to_owned(),
181            d => d.to_string(),
182        };
183
184        let mut rows = vec![ShowColumnRow {
185            name: name.clone(),
186            r#type,
187            is_hidden: Some(is_hidden.to_string()),
188            description,
189        }];
190
191        match data_type {
192            DataType::Struct(st) => {
193                rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
194                    Self::flatten(
195                        format!("{}.{}", name, field_name),
196                        field_data_type.clone(),
197                        is_hidden,
198                        None,
199                    )
200                }));
201            }
202
203            DataType::List(inner @ box DataType::Struct(_)) => {
204                rows.extend(Self::flatten(
205                    format!("{}[1]", name),
206                    *inner,
207                    is_hidden,
208                    None,
209                ));
210            }
211
212            _ => {}
213        }
214
215        rows
216    }
217
218    pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
219        Self::flatten(
220            col.column_desc.name,
221            col.column_desc.data_type,
222            col.is_hidden,
223            col.column_desc.description,
224        )
225    }
226}
227
228#[derive(Fields)]
229#[fields(style = "Title Case")]
230struct ShowConnectionRow {
231    name: String,
232    r#type: String,
233    properties: String,
234}
235
236#[derive(Fields)]
237#[fields(style = "Title Case")]
238struct ShowFunctionRow {
239    name: String,
240    arguments: String,
241    return_type: String,
242    language: String,
243    link: Option<String>,
244}
245
246#[derive(Fields)]
247#[fields(style = "Title Case")]
248struct ShowIndexRow {
249    name: String,
250    on: String,
251    key: String,
252    include: String,
253    distributed_by: String,
254}
255
256impl From<Arc<IndexCatalog>> for ShowIndexRow {
257    fn from(index: Arc<IndexCatalog>) -> Self {
258        let index_display = index.display();
259        ShowIndexRow {
260            name: index.name.clone(),
261            on: index.primary_table.name.clone(),
262            key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
263            include: display_comma_separated(&index_display.include_columns).to_string(),
264            distributed_by: display_comma_separated(&index_display.distributed_by_columns)
265                .to_string(),
266        }
267    }
268}
269
270#[derive(Fields)]
271#[fields(style = "Title Case")]
272struct ShowClusterRow {
273    id: i32,
274    addr: String,
275    r#type: String,
276    state: String,
277    parallelism: Option<i32>,
278    is_streaming: Option<bool>,
279    is_serving: Option<bool>,
280    is_unschedulable: Option<bool>,
281    started_at: Option<Timestamptz>,
282}
283
284#[derive(Fields)]
285#[fields(style = "Title Case")]
286struct ShowJobRow {
287    id: i64,
288    statement: String,
289    progress: String,
290}
291
292#[derive(Fields)]
293#[fields(style = "Title Case")]
294struct ShowProcessListRow {
295    id: String,
296    user: String,
297    host: String,
298    database: String,
299    time: Option<String>,
300    info: Option<String>,
301}
302
303#[derive(Fields)]
304#[fields(style = "Title Case")]
305struct ShowCreateObjectRow {
306    name: String,
307    create_sql: String,
308}
309
310#[derive(Fields)]
311#[fields(style = "Title Case")]
312struct ShowSubscriptionRow {
313    name: String,
314    retention_seconds: i64,
315}
316
317#[derive(Fields)]
318#[fields(style = "Title Case")]
319struct ShowCursorRow {
320    session_id: String,
321    user: String,
322    host: String,
323    database: String,
324    cursor_name: String,
325}
326
327#[derive(Fields)]
328#[fields(style = "Title Case")]
329struct ShowSubscriptionCursorRow {
330    session_id: String,
331    user: String,
332    host: String,
333    database: String,
334    cursor_name: String,
335    subscription_name: String,
336    state: String,
337    idle_duration_ms: i64,
338}
339
340/// Infer the row description for different show objects.
341pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
342    fields_to_descriptors(match objects {
343        ShowObject::Columns { .. } => ShowColumnRow::fields(),
344        ShowObject::Connection { .. } => ShowConnectionRow::fields(),
345        ShowObject::Function { .. } => ShowFunctionRow::fields(),
346        ShowObject::Indexes { .. } => ShowIndexRow::fields(),
347        ShowObject::Cluster => ShowClusterRow::fields(),
348        ShowObject::Jobs => ShowJobRow::fields(),
349        ShowObject::ProcessList => ShowProcessListRow::fields(),
350        _ => ShowObjectRow::fields(),
351    })
352}
353
354pub async fn handle_show_object(
355    handler_args: HandlerArgs,
356    command: ShowObject,
357    filter: Option<ShowStatementFilter>,
358) -> Result<RwPgResponse> {
359    let session = handler_args.session;
360
361    if let Some(ShowStatementFilter::Where(..)) = filter {
362        bail_not_implemented!("WHERE clause in SHOW statement");
363    }
364
365    let catalog_reader = session.env().catalog_reader();
366    let user_reader = session.env().user_info_reader();
367
368    let names = match command {
369        ShowObject::Table { schema } => {
370            let reader = catalog_reader.read_guard();
371            let user_reader = user_reader.read_guard();
372            let current_user = user_reader
373                .get_user_by_name(&session.user_name())
374                .expect("user not found");
375            iter_schema_items(&session, &schema, &reader, |schema| {
376                schema
377                    .iter_user_table_with_acl(current_user)
378                    .map(|t| t.name.clone())
379                    .collect()
380            })
381        }
382        ShowObject::InternalTable { schema } => {
383            let reader = catalog_reader.read_guard();
384            let user_reader = user_reader.read_guard();
385            let current_user = user_reader
386                .get_user_by_name(&session.user_name())
387                .expect("user not found");
388            iter_schema_items(&session, &schema, &reader, |schema| {
389                schema
390                    .iter_internal_table_with_acl(current_user)
391                    .map(|t| t.name.clone())
392                    .collect()
393            })
394        }
395        ShowObject::Database => catalog_reader.read_guard().get_all_database_names(),
396        ShowObject::Schema => catalog_reader
397            .read_guard()
398            .get_all_schema_names(&session.database())?,
399        ShowObject::View { schema } => {
400            let reader = catalog_reader.read_guard();
401            let user_reader = user_reader.read_guard();
402            let current_user = user_reader
403                .get_user_by_name(&session.user_name())
404                .expect("user not found");
405            iter_schema_items(&session, &schema, &reader, |schema| {
406                schema
407                    .iter_view_with_acl(current_user)
408                    .map(|t| t.name.clone())
409                    .collect()
410            })
411        }
412        ShowObject::MaterializedView { schema } => {
413            let reader = catalog_reader.read_guard();
414            let user_reader = user_reader.read_guard();
415            let current_user = user_reader
416                .get_user_by_name(&session.user_name())
417                .expect("user not found");
418            iter_schema_items(&session, &schema, &reader, |schema| {
419                schema
420                    .iter_created_mvs_with_acl(current_user)
421                    .map(|t| t.name.clone())
422                    .collect()
423            })
424        }
425        ShowObject::Source { schema } => {
426            let reader = catalog_reader.read_guard();
427            let user_reader = user_reader.read_guard();
428            let current_user = user_reader
429                .get_user_by_name(&session.user_name())
430                .expect("user not found");
431            let mut sources = iter_schema_items(&session, &schema, &reader, |schema| {
432                schema
433                    .iter_source_with_acl(current_user)
434                    .map(|t| t.name.clone())
435                    .collect()
436            });
437            sources.extend(session.temporary_source_manager().keys());
438            sources
439        }
440        ShowObject::Sink { schema } => {
441            let reader = catalog_reader.read_guard();
442            let user_reader = user_reader.read_guard();
443            let current_user = user_reader
444                .get_user_by_name(&session.user_name())
445                .expect("user not found");
446            iter_schema_items(&session, &schema, &reader, |schema| {
447                schema
448                    .iter_sink_with_acl(current_user)
449                    .map(|t| t.name.clone())
450                    .collect()
451            })
452        }
453        ShowObject::Subscription { schema } => {
454            let reader = catalog_reader.read_guard();
455            let rows = iter_schema_items(&session, &schema, &reader, |schema| {
456                schema
457                    .iter_subscription()
458                    .map(|t| ShowSubscriptionRow {
459                        name: t.name.clone(),
460                        retention_seconds: t.retention_seconds as i64,
461                    })
462                    .collect()
463            });
464            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
465                .rows(rows)
466                .into());
467        }
468        ShowObject::Secret { schema } => {
469            let reader = catalog_reader.read_guard();
470            iter_schema_items(&session, &schema, &reader, |schema| {
471                schema.iter_secret().map(|t| t.name.clone()).collect()
472            })
473        }
474        ShowObject::Columns { table } => {
475            let Ok(columns) = get_columns_from_table(&session, table.clone())
476                .or(get_columns_from_sink(&session, table.clone()))
477                .or(get_columns_from_view(&session, table.clone()))
478            else {
479                return Err(CatalogError::NotFound(
480                    "table, source, sink or view",
481                    table.to_string(),
482                )
483                .into());
484            };
485
486            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
487                .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
488                .into());
489        }
490        ShowObject::Indexes { table } => {
491            let indexes = get_indexes_from_table(&session, table)?;
492
493            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
494                .rows(indexes.into_iter().map(ShowIndexRow::from))
495                .into());
496        }
497        ShowObject::Connection { schema } => {
498            let reader = catalog_reader.read_guard();
499            let rows = iter_schema_items(&session, &schema, &reader, |schema| {
500                schema.iter_connections()
501                .map(|c| {
502                    let name = c.name.clone();
503                    let r#type = match &c.info {
504                        connection::Info::PrivateLinkService(_) => {
505                            PRIVATELINK_CONNECTION.to_owned()
506                        },
507                        connection::Info::ConnectionParams(params) => {
508                            params.get_connection_type().unwrap().as_str_name().to_owned()
509                        }
510                    };
511                    let source_names = schema
512                        .get_source_ids_by_connection(c.id)
513                        .unwrap_or_default()
514                        .into_iter()
515                        .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
516                        .collect_vec();
517                    let sink_names = schema
518                        .get_sink_ids_by_connection(c.id)
519                        .unwrap_or_default()
520                        .into_iter()
521                        .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
522                        .collect_vec();
523                    let properties = match &c.info {
524                        connection::Info::PrivateLinkService(i) => {
525                            format!(
526                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
527                                i.get_provider().unwrap().as_str_name(),
528                                i.service_name,
529                                i.endpoint_id,
530                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
531                                serde_json::to_string(&source_names).unwrap(),
532                                serde_json::to_string(&sink_names).unwrap(),
533                            )
534                        }
535                        connection::Info::ConnectionParams(params) => {
536                            // todo: show dep relations
537                            print_connection_params(params, schema)
538                        }
539                    };
540                    ShowConnectionRow {
541                        name,
542                        r#type,
543                        properties,
544                    }
545                }).collect_vec()
546            });
547            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
548                .rows(rows)
549                .into());
550        }
551        ShowObject::Function { schema } => {
552            let reader = catalog_reader.read_guard();
553            let rows = iter_schema_items(&session, &schema, &reader, |schema| {
554                schema
555                    .iter_function()
556                    .map(|t| ShowFunctionRow {
557                        name: t.name.clone(),
558                        arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
559                        return_type: t.return_type.to_string(),
560                        language: t.language.clone(),
561                        link: t.link.clone(),
562                    })
563                    .collect()
564            });
565            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
566                .rows(rows)
567                .into());
568        }
569        ShowObject::Cluster => {
570            let workers = session.env().meta_client().list_all_nodes().await?;
571            let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
572                let addr: HostAddr = worker.host.as_ref().unwrap().into();
573                let property = worker.property.as_ref();
574                ShowClusterRow {
575                    id: worker.id as _,
576                    addr: addr.to_string(),
577                    r#type: worker.get_type().unwrap().as_str_name().into(),
578                    state: worker.get_state().unwrap().as_str_name().to_owned(),
579                    parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
580                    is_streaming: property.map(|p| p.is_streaming),
581                    is_serving: property.map(|p| p.is_serving),
582                    is_unschedulable: property.map(|p| p.is_unschedulable),
583                    started_at: worker
584                        .started_at
585                        .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
586                }
587            });
588            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
589                .rows(rows)
590                .into());
591        }
592        ShowObject::Jobs => {
593            let resp = session.env().meta_client().get_ddl_progress().await?;
594            let rows = resp.into_iter().map(|job| ShowJobRow {
595                id: job.id as i64,
596                statement: job.statement,
597                progress: job.progress,
598            });
599            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
600                .rows(rows)
601                .into());
602        }
603        ShowObject::ProcessList => {
604            let sessions_map = session.env().sessions_map().read();
605            let rows = sessions_map.values().map(|s| {
606                ShowProcessListRow {
607                    // Since process id and the secret id in the session id are the same in RisingWave, just display the process id.
608                    id: format!("{}", s.id().0),
609                    user: s.user_name(),
610                    host: format!("{}", s.peer_addr()),
611                    database: s.database(),
612                    time: s
613                        .elapse_since_running_sql()
614                        .map(|mills| format!("{}ms", mills)),
615                    info: s
616                        .running_sql()
617                        .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
618                }
619            });
620
621            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
622                .rows(rows)
623                .into());
624        }
625        ShowObject::Cursor => {
626            let sessions = session
627                .env()
628                .sessions_map()
629                .read()
630                .values()
631                .cloned()
632                .collect_vec();
633            let mut rows = vec![];
634            for s in sessions {
635                let session_id = format!("{}", s.id().0);
636                let user = s.user_name();
637                let host = format!("{}", s.peer_addr());
638                let database = s.database();
639
640                s.get_cursor_manager()
641                    .iter_query_cursors(|cursor_name: &String, _| {
642                        rows.push(ShowCursorRow {
643                            session_id: session_id.clone(),
644                            user: user.clone(),
645                            host: host.clone(),
646                            database: database.clone(),
647                            cursor_name: cursor_name.to_owned(),
648                        });
649                    })
650                    .await;
651            }
652            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
653                .rows(rows)
654                .into());
655        }
656        ShowObject::SubscriptionCursor => {
657            let sessions = session
658                .env()
659                .sessions_map()
660                .read()
661                .values()
662                .cloned()
663                .collect_vec();
664            let mut rows = vec![];
665            for s in sessions {
666                let session_id = format!("{}", s.id().0);
667                let user = s.user_name();
668                let host = format!("{}", s.peer_addr());
669                let database = s.database().to_owned();
670
671                s.get_cursor_manager()
672                    .iter_subscription_cursors(
673                        |cursor_name: &String, cursor: &SubscriptionCursor| {
674                            rows.push(ShowSubscriptionCursorRow {
675                                session_id: session_id.clone(),
676                                user: user.clone(),
677                                host: host.clone(),
678                                database: database.clone(),
679                                cursor_name: cursor_name.to_owned(),
680                                subscription_name: cursor.subscription_name().to_owned(),
681                                state: cursor.state_info_string(),
682                                idle_duration_ms: cursor.idle_duration().as_millis() as i64,
683                            });
684                        },
685                    )
686                    .await;
687            }
688
689            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
690                .rows(rows)
691                .into());
692        }
693    };
694
695    let rows = names
696        .into_iter()
697        .filter(|arg| match &filter {
698            Some(ShowStatementFilter::Like(pattern)) => like_default(arg, pattern),
699            Some(ShowStatementFilter::ILike(pattern)) => i_like_default(arg, pattern),
700            Some(ShowStatementFilter::Where(..)) => unreachable!(),
701            None => true,
702        })
703        .map(|name| ShowObjectRow { name });
704
705    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
706        .rows(rows)
707        .into())
708}
709
710pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
711    fields_to_descriptors(ShowCreateObjectRow::fields())
712}
713
714pub fn handle_show_create_object(
715    handle_args: HandlerArgs,
716    show_create_type: ShowCreateType,
717    name: ObjectName,
718) -> Result<RwPgResponse> {
719    let session = handle_args.session;
720    let catalog_reader = session.env().catalog_reader().read_guard();
721    let database = session.database();
722    let (schema_name, object_name) =
723        Binder::resolve_schema_qualified_name(&database, name.clone())?;
724    let search_path = session.config().search_path();
725    let user_name = &session.user_name();
726    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
727    let user_reader = session.env().user_info_reader().read_guard();
728    let current_user = user_reader
729        .get_user_by_name(user_name)
730        .expect("user not found");
731
732    let (sql, schema_name) = match show_create_type {
733        ShowCreateType::MaterializedView => {
734            let (mv, schema) = schema_path
735                .try_find(|schema_name| {
736                    Ok::<_, RwError>(
737                        catalog_reader
738                            .get_schema_by_name(&database, schema_name)?
739                            .get_created_table_by_name(&object_name)
740                            .filter(|t| {
741                                t.is_mview()
742                                    && has_access_to_object(
743                                        current_user,
744                                        schema_name,
745                                        t.id.table_id,
746                                        t.owner,
747                                    )
748                            }),
749                    )
750                })?
751                .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
752            (mv.create_sql(), schema)
753        }
754        ShowCreateType::View => {
755            let (view, schema) =
756                catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
757            if !view.is_system_view()
758                && !has_access_to_object(current_user, schema, view.id, view.owner)
759            {
760                return Err(CatalogError::NotFound("view", name.to_string()).into());
761            }
762            (view.create_sql(schema.to_owned()), schema)
763        }
764        ShowCreateType::Table => {
765            let (table, schema) = schema_path
766                .try_find(|schema_name| {
767                    Ok::<_, RwError>(
768                        catalog_reader
769                            .get_schema_by_name(&database, schema_name)?
770                            .get_created_table_by_name(&object_name)
771                            .filter(|t| {
772                                t.is_user_table()
773                                    && has_access_to_object(
774                                        current_user,
775                                        schema_name,
776                                        t.id.table_id,
777                                        t.owner,
778                                    )
779                            }),
780                    )
781                })?
782                .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
783
784            (table.create_sql_purified(), schema)
785        }
786        ShowCreateType::Sink => {
787            let (sink, schema) =
788                catalog_reader.get_sink_by_name(&database, schema_path, &object_name)?;
789            if !has_access_to_object(current_user, schema, sink.id.sink_id, sink.owner.user_id) {
790                return Err(CatalogError::NotFound("sink", name.to_string()).into());
791            }
792            (sink.create_sql(), schema)
793        }
794        ShowCreateType::Source => {
795            let (source, schema) = schema_path
796                .try_find(|schema_name| {
797                    Ok::<_, RwError>(
798                        catalog_reader
799                            .get_schema_by_name(&database, schema_name)?
800                            .get_source_by_name(&object_name)
801                            .filter(|s| {
802                                s.associated_table_id.is_none()
803                                    && has_access_to_object(
804                                        current_user,
805                                        schema_name,
806                                        s.id,
807                                        s.owner,
808                                    )
809                            }),
810                    )
811                })?
812                .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
813            (source.create_sql_purified(), schema)
814        }
815        ShowCreateType::Index => {
816            let (index, schema) = schema_path
817                .try_find(|schema_name| {
818                    Ok::<_, RwError>(
819                        catalog_reader
820                            .get_schema_by_name(&database, schema_name)?
821                            .get_created_table_by_name(&object_name)
822                            .filter(|t| {
823                                t.is_index()
824                                    && has_access_to_object(
825                                        current_user,
826                                        schema_name,
827                                        t.id.table_id,
828                                        t.owner,
829                                    )
830                            }),
831                    )
832                })?
833                .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
834            (index.create_sql(), schema)
835        }
836        ShowCreateType::Function => {
837            bail_not_implemented!("show create on: {}", show_create_type);
838        }
839        ShowCreateType::Subscription => {
840            let (subscription, schema) =
841                catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
842            if !has_access_to_object(
843                current_user,
844                schema,
845                subscription.id.subscription_id,
846                subscription.owner.user_id,
847            ) {
848                return Err(CatalogError::NotFound("subscription", name.to_string()).into());
849            }
850            (subscription.create_sql(), schema)
851        }
852    };
853    let name = format!("{}.{}", schema_name, object_name);
854
855    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
856        .rows([ShowCreateObjectRow {
857            name,
858            create_sql: sql,
859        }])
860        .into())
861}
862
863#[cfg(test)]
864mod tests {
865    use std::ops::Index;
866
867    use futures_async_stream::for_await;
868
869    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
870
871    #[tokio::test]
872    async fn test_show_source() {
873        let frontend = LocalFrontend::new(Default::default()).await;
874
875        let sql = r#"CREATE SOURCE t1 (column1 varchar)
876        WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
877        FORMAT PLAIN ENCODE JSON"#;
878        frontend.run_sql(sql).await.unwrap();
879
880        let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
881        rows.sort();
882        assert_eq!(rows, vec!["Row([Some(b\"t1\")])".to_owned(),]);
883    }
884
885    #[tokio::test]
886    async fn test_show_column() {
887        let proto_file = create_proto_file(PROTO_FILE_DATA);
888        let sql = format!(
889            r#"CREATE SOURCE t
890    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
891    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
892            proto_file.path().to_str().unwrap()
893        );
894        let frontend = LocalFrontend::new(Default::default()).await;
895        frontend.run_sql(sql).await.unwrap();
896
897        let sql = "show columns from t";
898        let mut pg_response = frontend.run_sql(sql).await.unwrap();
899
900        let mut columns = Vec::new();
901        #[for_await]
902        for row_set in pg_response.values_stream() {
903            let row_set = row_set.unwrap();
904            for row in row_set {
905                columns.push((
906                    std::str::from_utf8(row.index(0).as_ref().unwrap())
907                        .unwrap()
908                        .to_owned(),
909                    std::str::from_utf8(row.index(1).as_ref().unwrap())
910                        .unwrap()
911                        .to_owned(),
912                ));
913            }
914        }
915
916        expect_test::expect![[r#"
917            [
918                (
919                    "id",
920                    "integer",
921                ),
922                (
923                    "country",
924                    "struct",
925                ),
926                (
927                    "country.address",
928                    "character varying",
929                ),
930                (
931                    "country.city",
932                    "struct",
933                ),
934                (
935                    "country.city.address",
936                    "character varying",
937                ),
938                (
939                    "country.city.zipcode",
940                    "character varying",
941                ),
942                (
943                    "country.zipcode",
944                    "character varying",
945                ),
946                (
947                    "zipcode",
948                    "bigint",
949                ),
950                (
951                    "rate",
952                    "real",
953                ),
954                (
955                    "_rw_kafka_timestamp",
956                    "timestamp with time zone",
957                ),
958                (
959                    "_rw_kafka_partition",
960                    "character varying",
961                ),
962                (
963                    "_rw_kafka_offset",
964                    "character varying",
965                ),
966                (
967                    "_row_id",
968                    "serial",
969                ),
970            ]
971        "#]]
972        .assert_debug_eq(&columns);
973    }
974}