risingwave_frontend/handler/
show.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
23use risingwave_common::acl::AclMode;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::{
33    GetAllCursorsRequest, GetAllSubCursorsRequest, GetRunningSqlsRequest,
34};
35use risingwave_pb::id::WorkerId;
36use risingwave_rpc_client::FrontendClientPoolRef;
37use risingwave_sqlparser::ast::{
38    Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
39};
40use thiserror_ext::AsReport;
41
42use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
43use crate::binder::{Binder, Relation};
44use crate::catalog::catalog_service::CatalogReadGuard;
45use crate::catalog::root_catalog::SchemaPath;
46use crate::catalog::schema_catalog::SchemaCatalog;
47use crate::catalog::{CatalogError, IndexCatalog};
48use crate::error::{ErrorCode, Result, RwError};
49use crate::handler::HandlerArgs;
50use crate::handler::create_connection::print_connection_params_with_secret_visibility;
51use crate::handler::privilege::ObjectCheckItem;
52use crate::session::{SessionImpl, WorkerProcessId};
53use crate::user::user_catalog::UserCatalog;
54use crate::user::{has_access_to_object, has_schema_usage_privilege};
55
56pub fn get_columns_from_table(
57    session: &SessionImpl,
58    table_name: ObjectName,
59) -> Result<Vec<ColumnCatalog>> {
60    let mut binder = Binder::new_for_batch(session);
61    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
62    let column_catalogs = match relation {
63        Relation::Source(s) => s.catalog.columns,
64        Relation::BaseTable(t) => t.table_catalog.columns.clone(),
65        Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
66        _ => {
67            return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
68        }
69    };
70
71    Ok(column_catalogs)
72}
73
74pub fn get_columns_from_sink(
75    session: &SessionImpl,
76    sink_name: ObjectName,
77) -> Result<Vec<ColumnCatalog>> {
78    let binder = Binder::new_for_system(session);
79    let sink = binder.bind_sink_by_name(sink_name)?;
80
81    let catalog_reader = session.env().catalog_reader().read_guard();
82    let schema = catalog_reader
83        .get_schema_by_id(sink.sink_catalog.database_id, sink.sink_catalog.schema_id)?;
84    session.check_privileges(&[
85        ObjectCheckItem::new(schema.owner, AclMode::Usage, schema.name(), schema.id()),
86        ObjectCheckItem::new(
87            sink.sink_catalog.owner,
88            AclMode::Select,
89            sink.sink_catalog.name.clone(),
90            sink.sink_catalog.id,
91        ),
92    ])?;
93
94    Ok(sink.sink_catalog.full_columns().to_vec())
95}
96
97pub fn get_columns_from_view(
98    session: &SessionImpl,
99    view_name: ObjectName,
100) -> Result<Vec<ColumnCatalog>> {
101    let _authorized_relation =
102        Binder::new_for_batch(session).bind_relation_by_name(&view_name, None, None, false)?;
103    let binder = Binder::new_for_system(session);
104    let view = binder.bind_view_by_name(view_name)?;
105
106    Ok(view
107        .view_catalog
108        .columns
109        .iter()
110        .enumerate()
111        .map(|(idx, field)| ColumnCatalog {
112            column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
113            is_hidden: false,
114        })
115        .collect())
116}
117
118pub fn get_indexes_from_table(
119    session: &SessionImpl,
120    table_name: ObjectName,
121) -> Result<Vec<Arc<IndexCatalog>>> {
122    let mut binder = Binder::new_for_batch(session);
123    let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
124    let indexes = match relation {
125        Relation::BaseTable(t) => t.table_indexes,
126        _ => {
127            return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
128        }
129    };
130
131    Ok(indexes)
132}
133
134fn schema_or_search_path(
135    session: &Arc<SessionImpl>,
136    schema: &Option<Ident>,
137    search_path: &SearchPath,
138) -> Vec<String> {
139    if let Some(s) = schema {
140        vec![s.real_value()]
141    } else {
142        search_path
143            .real_path()
144            .iter()
145            .map(|s| {
146                if s.eq(USER_NAME_WILD_CARD) {
147                    session.user_name()
148                } else {
149                    s.clone()
150                }
151            })
152            .collect()
153    }
154}
155
156fn iter_schema_items<F, T>(
157    session: &Arc<SessionImpl>,
158    schema: &Option<Ident>,
159    reader: &CatalogReadGuard,
160    current_user: &UserCatalog,
161    mut f: F,
162) -> Vec<T>
163where
164    F: FnMut(&SchemaCatalog) -> Vec<T>,
165{
166    let search_path = session.config().search_path();
167
168    schema_or_search_path(session, schema, &search_path)
169        .into_iter()
170        .filter_map(|schema| {
171            if let Ok(schema_catalog) =
172                reader.get_schema_by_name(&session.database(), schema.as_ref())
173                && (has_schema_usage_privilege(
174                    current_user,
175                    schema_catalog.id(),
176                    schema_catalog.owner,
177                ))
178            {
179                Some(schema_catalog)
180            } else {
181                None
182            }
183        })
184        .flat_map(|s| f(s).into_iter())
185        .collect()
186}
187
188/// Wrapper for `ObjectName` to be used as a field in a row.
189// TODO: replace remaining `name: String` with `name: ObjectNameField`
190struct ObjectNameField(ObjectName);
191
192fn with_schema_name(schema_name: &str, name: &str) -> ObjectNameField {
193    if schema_name.is_empty() {
194        ObjectNameField(ObjectName(vec![name.into()]))
195    } else {
196        ObjectNameField(ObjectName(vec![schema_name.into(), name.into()]))
197    }
198}
199
200impl WithDataType for ObjectNameField {
201    fn default_data_type() -> DataType {
202        DataType::Varchar
203    }
204}
205
206impl ToOwnedDatum for ObjectNameField {
207    fn to_owned_datum(self) -> Datum {
208        Some(self.0.to_string().into())
209    }
210}
211
212#[derive(Fields)]
213#[fields(style = "Title Case")]
214struct ShowObjectRow {
215    name: ObjectNameField,
216}
217
218impl ShowObjectRow {
219    fn base_name(&self) -> String {
220        self.name.0.base_name()
221    }
222}
223
224fn is_permission_denied(err: &RwError) -> bool {
225    matches!(err.inner(), ErrorCode::PermissionDenied(_))
226}
227
228#[derive(Fields)]
229#[fields(style = "Title Case")]
230struct ShowDatabaseRow {
231    name: String,
232}
233
234#[derive(Fields)]
235#[fields(style = "Title Case")]
236struct ShowSchemaRow {
237    name: String,
238}
239
240#[derive(Fields)]
241#[fields(style = "Title Case")]
242pub struct ShowColumnRow {
243    pub name: ShowColumnName,
244    pub r#type: String,
245    pub is_hidden: Option<String>, // XXX: why not bool?
246    pub description: Option<String>,
247}
248
249#[derive(Clone, Debug)]
250enum ShowColumnNameSegment {
251    Field(Ident),
252    ListElement,
253}
254
255impl ShowColumnNameSegment {
256    pub fn field(name: &str) -> Self {
257        ShowColumnNameSegment::Field(Ident::from_real_value(name))
258    }
259}
260
261/// The name of a column in the output of `SHOW COLUMNS` or `DESCRIBE`.
262#[derive(Clone, Debug)]
263pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
264
265impl ShowColumnName {
266    /// Create a special column name without quoting. Used only for extra information like `primary key`
267    /// in the output of `DESCRIBE`.
268    pub fn special(name: &str) -> Self {
269        ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
270            name,
271        ))])
272    }
273}
274
275impl WithDataType for ShowColumnName {
276    fn default_data_type() -> DataType {
277        DataType::Varchar
278    }
279}
280
281impl ToOwnedDatum for ShowColumnName {
282    fn to_owned_datum(self) -> Datum {
283        use std::fmt::Write;
284
285        let mut s = String::new();
286        for segment in self.0 {
287            match segment {
288                ShowColumnNameSegment::Field(ident) => {
289                    if !s.is_empty() {
290                        // TODO: shall we add parentheses, so that it's valid field access SQL?
291                        s.push('.');
292                    }
293                    write!(s, "{ident}").unwrap();
294                }
295                ShowColumnNameSegment::ListElement => {
296                    s.push_str("[1]");
297                }
298            }
299        }
300        s.to_owned_datum()
301    }
302}
303
304impl ShowColumnRow {
305    /// Create a row with the given information. If the data type is a struct or list,
306    /// flatten the data type to also generate rows for its fields.
307    fn flatten(
308        name: ShowColumnName,
309        data_type: DataType,
310        is_hidden: bool,
311        description: Option<String>,
312    ) -> Vec<Self> {
313        // TODO(struct): use struct's type name once supported.
314        let r#type = match &data_type {
315            DataType::Struct(_) => "struct".to_owned(),
316            DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
317            d => d.to_string(),
318        };
319
320        let mut rows = vec![ShowColumnRow {
321            name: name.clone(),
322            r#type,
323            is_hidden: Some(is_hidden.to_string()),
324            description,
325        }];
326
327        match data_type {
328            DataType::Struct(st) => {
329                rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
330                    let mut name = name.clone();
331                    name.0.push(ShowColumnNameSegment::field(field_name));
332                    Self::flatten(name, field_data_type.clone(), is_hidden, None)
333                }));
334            }
335
336            DataType::List(list) if let DataType::Struct(_) = list.elem() => {
337                let mut name = name.clone();
338                name.0.push(ShowColumnNameSegment::ListElement);
339                rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
340            }
341
342            _ => {}
343        }
344
345        rows
346    }
347
348    pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
349        Self::flatten(
350            ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
351            col.column_desc.data_type,
352            col.is_hidden,
353            col.column_desc.description,
354        )
355    }
356}
357
358#[derive(Fields)]
359#[fields(style = "Title Case")]
360struct ShowConnectionRow {
361    name: ObjectNameField,
362    r#type: String,
363    properties: String,
364}
365
366#[derive(Fields)]
367#[fields(style = "Title Case")]
368struct ShowFunctionRow {
369    name: ObjectNameField,
370    arguments: String,
371    return_type: String,
372    language: String,
373    link: Option<String>,
374}
375
376#[derive(Fields)]
377#[fields(style = "Title Case")]
378struct ShowIndexRow {
379    name: String,
380    on: String,
381    key: String,
382    include: String,
383    distributed_by: String,
384}
385
386impl From<Arc<IndexCatalog>> for ShowIndexRow {
387    fn from(index: Arc<IndexCatalog>) -> Self {
388        let index_display = index.display();
389        ShowIndexRow {
390            name: index.name.clone(),
391            on: index.primary_table.name.clone(),
392            key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
393            include: display_comma_separated(&index_display.include_columns).to_string(),
394            distributed_by: display_comma_separated(&index_display.distributed_by_columns)
395                .to_string(),
396        }
397    }
398}
399
400#[derive(Fields)]
401#[fields(style = "Title Case")]
402struct ShowClusterRow {
403    id: WorkerId,
404    addr: String,
405    r#type: String,
406    state: String,
407    parallelism: Option<i32>,
408    is_streaming: Option<bool>,
409    is_serving: Option<bool>,
410    started_at: Option<Timestamptz>,
411}
412
413#[derive(Fields)]
414#[fields(style = "Title Case")]
415struct ShowJobRow {
416    id: i64,
417    statement: String,
418    create_type: String,
419    progress: String,
420}
421
422#[derive(Fields)]
423#[fields(style = "Title Case")]
424struct ShowProcessListRow {
425    worker_id: String,
426    id: String,
427    user: String,
428    host: String,
429    database: String,
430    time: Option<String>,
431    info: Option<String>,
432}
433
434#[derive(Fields)]
435#[fields(style = "Title Case")]
436struct ShowCreateObjectRow {
437    name: String,
438    create_sql: String,
439}
440
441#[derive(Fields)]
442#[fields(style = "Title Case")]
443struct ShowSubscriptionRow {
444    name: ObjectNameField,
445    retention_seconds: i64,
446}
447
448#[derive(Fields)]
449#[fields(style = "Title Case")]
450struct ShowCursorRow {
451    worker_id: String,
452    session_id: String,
453    user: String,
454    host: String,
455    database: String,
456    cursor_name: String,
457    info: Option<String>,
458}
459
460#[derive(Fields)]
461#[fields(style = "Title Case")]
462struct ShowSubscriptionCursorRow {
463    worker_id: String,
464    session_id: String,
465    user: String,
466    host: String,
467    database: String,
468    cursor_name: String,
469    subscription_name: String,
470    state: String,
471    idle_duration_ms: i64,
472    info: Option<String>,
473}
474
475/// Infer the row description for different show objects.
476pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
477    fields_to_descriptors(match objects {
478        ShowObject::Database => ShowDatabaseRow::fields(),
479        ShowObject::Schema => ShowSchemaRow::fields(),
480        ShowObject::Columns { .. } => ShowColumnRow::fields(),
481        ShowObject::Connection { .. } => ShowConnectionRow::fields(),
482        ShowObject::Function { .. } => ShowFunctionRow::fields(),
483        ShowObject::Indexes { .. } => ShowIndexRow::fields(),
484        ShowObject::Cluster => ShowClusterRow::fields(),
485        ShowObject::Jobs => ShowJobRow::fields(),
486        ShowObject::ProcessList => ShowProcessListRow::fields(),
487        ShowObject::Cursor => ShowCursorRow::fields(),
488        ShowObject::SubscriptionCursor => ShowSubscriptionCursorRow::fields(),
489        ShowObject::Subscription { .. } => ShowSubscriptionRow::fields(),
490
491        ShowObject::Table { .. }
492        | ShowObject::InternalTable { .. }
493        | ShowObject::View { .. }
494        | ShowObject::MaterializedView { .. }
495        | ShowObject::Source { .. }
496        | ShowObject::Sink { .. }
497        | ShowObject::Secret { .. } => ShowObjectRow::fields(),
498    })
499}
500
501pub async fn handle_show_object(
502    handler_args: HandlerArgs,
503    command: ShowObject,
504    filter: Option<ShowStatementFilter>,
505) -> Result<RwPgResponse> {
506    let session = handler_args.session;
507
508    if let Some(ShowStatementFilter::Where(..)) = filter {
509        bail_not_implemented!("WHERE clause in SHOW statement");
510    }
511
512    let catalog_reader = session.env().catalog_reader();
513    let user_reader = session.env().user_info_reader();
514    let get_catalog_reader = || {
515        let reader = catalog_reader.read_guard();
516        let user_reader = user_reader.read_guard();
517        let current_user = user_reader
518            .get_user_by_name(&session.user_name())
519            .expect("user not found")
520            .clone();
521        (reader, current_user)
522    };
523
524    let rows: Vec<ShowObjectRow> = match command {
525        ShowObject::Table { schema } => {
526            let (reader, current_user) = get_catalog_reader();
527            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
528                schema
529                    .iter_user_table_with_acl(&current_user)
530                    .map(|t| with_schema_name(&schema.name, &t.name))
531                    .map(|name| ShowObjectRow { name })
532                    .collect()
533            })
534        }
535        ShowObject::InternalTable { schema } => {
536            let (reader, current_user) = get_catalog_reader();
537            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
538                schema
539                    .iter_internal_table_with_acl(&current_user)
540                    .map(|t| with_schema_name(&schema.name, &t.name))
541                    .map(|name| ShowObjectRow { name })
542                    .collect()
543            })
544        }
545        ShowObject::Database => {
546            let reader = catalog_reader.read_guard();
547            let rows = reader
548                .get_all_database_names()
549                .into_iter()
550                .map(|name| ShowDatabaseRow { name });
551            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
552                .rows(rows)
553                .into());
554        }
555        ShowObject::Schema => {
556            let reader = catalog_reader.read_guard();
557            let rows = reader
558                .get_all_schema_names(&session.database())?
559                .into_iter()
560                .map(|name| ShowSchemaRow { name });
561            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
562                .rows(rows)
563                .into());
564        }
565        ShowObject::View { schema } => {
566            let (reader, current_user) = get_catalog_reader();
567            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
568                schema
569                    .iter_view_with_acl(&current_user)
570                    .map(|t| with_schema_name(&schema.name, &t.name))
571                    .map(|name| ShowObjectRow { name })
572                    .collect()
573            })
574        }
575        ShowObject::MaterializedView { schema } => {
576            let (reader, current_user) = get_catalog_reader();
577            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
578                schema
579                    .iter_created_mvs_with_acl(&current_user)
580                    .map(|t| with_schema_name(&schema.name, &t.name))
581                    .map(|name| ShowObjectRow { name })
582                    .collect()
583            })
584        }
585        ShowObject::Source { schema } => {
586            let (reader, current_user) = get_catalog_reader();
587            let mut sources =
588                iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
589                    schema
590                        .iter_source_with_acl(&current_user)
591                        .map(|t| with_schema_name(&schema.name, &t.name))
592                        .map(|name| ShowObjectRow { name })
593                        .collect()
594                });
595            sources.extend(
596                session
597                    .temporary_source_manager()
598                    .keys()
599                    .into_iter()
600                    .map(|t| with_schema_name("", &t))
601                    .map(|name| ShowObjectRow { name }),
602            );
603            sources
604        }
605        ShowObject::Sink { schema } => {
606            let (reader, current_user) = get_catalog_reader();
607            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
608                schema
609                    .iter_sink_with_acl(&current_user)
610                    .map(|t| with_schema_name(&schema.name, &t.name))
611                    .map(|name| ShowObjectRow { name })
612                    .collect()
613            })
614        }
615        ShowObject::Subscription { schema } => {
616            let (reader, current_user) = get_catalog_reader();
617            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
618                schema
619                    .iter_subscription_with_acl(&current_user)
620                    .map(|t| ShowSubscriptionRow {
621                        name: with_schema_name(&schema.name, &t.name),
622                        retention_seconds: t.retention_seconds as i64,
623                    })
624                    .collect()
625            });
626            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
627                .rows(rows)
628                .into());
629        }
630        ShowObject::Secret { schema } => {
631            let (reader, current_user) = get_catalog_reader();
632            iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
633                schema
634                    .iter_secret_with_acl(&current_user)
635                    .map(|t| with_schema_name(&schema.name, &t.name))
636                    .map(|name| ShowObjectRow { name })
637                    .collect()
638            })
639        }
640        ShowObject::Columns { table } => {
641            let columns = match get_columns_from_table(&session, table.clone()) {
642                Ok(columns) => columns,
643                Err(err) if is_permission_denied(&err) => {
644                    return Err(err);
645                }
646                Err(_) => match get_columns_from_sink(&session, table.clone()) {
647                    Ok(columns) => columns,
648                    Err(err) if is_permission_denied(&err) => {
649                        return Err(err);
650                    }
651                    Err(_) => match get_columns_from_view(&session, table.clone()) {
652                        Ok(columns) => columns,
653                        Err(err) if is_permission_denied(&err) => {
654                            return Err(err);
655                        }
656                        Err(_) => {
657                            return Err(CatalogError::not_found(
658                                "table, source, sink or view",
659                                table.to_string(),
660                            )
661                            .into());
662                        }
663                    },
664                },
665            };
666
667            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
668                .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
669                .into());
670        }
671        ShowObject::Indexes { table } => {
672            let indexes = get_indexes_from_table(&session, table)?;
673
674            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
675                .rows(indexes.into_iter().map(ShowIndexRow::from))
676                .into());
677        }
678        ShowObject::Connection { schema } => {
679            let (reader, current_user) = get_catalog_reader();
680            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
681                schema.iter_connections_with_acl(&current_user)
682                .map(|c| {
683                    let name = c.name.clone();
684                    let r#type = match &c.info {
685                        #[expect(deprecated)]
686                        connection::Info::PrivateLinkService(_) => {
687                            PRIVATELINK_CONNECTION.to_owned()
688                        },
689                        connection::Info::ConnectionParams(params) => {
690                            params.get_connection_type().unwrap().as_str_name().to_owned()
691                        }
692                    };
693                    let source_names = schema
694                        .get_source_ids_by_connection(c.id)
695                        .unwrap_or_default()
696                        .into_iter()
697                        .filter_map(|sid| {
698                            schema.get_source_by_id(sid).and_then(|catalog| {
699                                has_access_to_object(&current_user, catalog.id, catalog.owner)
700                                    .then_some(catalog.name.as_str())
701                            })
702                        })
703                        .collect_vec();
704                    let sink_names = schema
705                        .get_sink_ids_by_connection(c.id)
706                        .unwrap_or_default()
707                        .into_iter()
708                        .filter_map(|sid| {
709                            schema.get_sink_by_id(sid).and_then(|catalog| {
710                                has_access_to_object(&current_user, catalog.id, catalog.owner)
711                                    .then_some(catalog.name.as_str())
712                            })
713                        })
714                        .collect_vec();
715                    let properties = match &c.info {
716                        #[expect(deprecated)]
717                        connection::Info::PrivateLinkService(i) => {
718                            format!(
719                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
720                                i.get_provider().unwrap().as_str_name(),
721                                i.service_name,
722                                i.endpoint_id,
723                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
724                                serde_json::to_string(&source_names).unwrap(),
725                                serde_json::to_string(&sink_names).unwrap(),
726                            )
727                        }
728                        connection::Info::ConnectionParams(params) => {
729                            // todo: show dep relations
730                            print_connection_params_with_secret_visibility(
731                                &session.database(),
732                                params,
733                                &reader,
734                                &current_user,
735                            )
736                        }
737                    };
738                    ShowConnectionRow {
739                        name: with_schema_name(&schema.name, &name),
740                        r#type,
741                        properties,
742                    }
743                }).collect_vec()
744            });
745            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
746                .rows(rows)
747                .into());
748        }
749        ShowObject::Function { schema } => {
750            let (reader, current_user) = get_catalog_reader();
751            let rows = iter_schema_items(&session, &schema, &reader, &current_user, |schema| {
752                schema
753                    .iter_function_with_acl(&current_user)
754                    .map(|t| ShowFunctionRow {
755                        name: with_schema_name(&schema.name, &t.name),
756                        arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
757                        return_type: t.return_type.to_string(),
758                        language: t.language.clone(),
759                        link: t.link.clone(),
760                    })
761                    .collect()
762            });
763            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
764                .rows(rows)
765                .into());
766        }
767        ShowObject::Cluster => {
768            let workers = session.env().meta_client().list_all_nodes().await?;
769            let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
770                let addr: HostAddr = worker.host.as_ref().unwrap().into();
771                let property = worker.property.as_ref();
772                ShowClusterRow {
773                    id: worker.id,
774                    addr: addr.to_string(),
775                    r#type: worker.get_type().unwrap().as_str_name().into(),
776                    state: worker.get_state().unwrap().as_str_name().to_owned(),
777                    parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
778                    is_streaming: property.map(|p| p.is_streaming),
779                    is_serving: property.map(|p| p.is_serving),
780                    started_at: worker
781                        .started_at
782                        .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
783                }
784            });
785            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
786                .rows(rows)
787                .into());
788        }
789        ShowObject::Jobs => {
790            let resp = session.env().meta_client().get_ddl_progress().await?;
791            let rows = resp.into_iter().map(|job| ShowJobRow {
792                id: job.id as i64,
793                statement: job.statement,
794                create_type: job.create_type,
795                progress: job.progress,
796            });
797            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
798                .rows(rows)
799                .into());
800        }
801        ShowObject::ProcessList => {
802            let rows = show_process_list_impl(
803                session.env().frontend_client_pool(),
804                session.env().worker_node_manager_ref(),
805            )
806            .await;
807            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
808                .rows(rows)
809                .into());
810        }
811        ShowObject::Cursor => {
812            let rows = show_all_cursors_impl(
813                session.env().frontend_client_pool(),
814                session.env().worker_node_manager_ref(),
815            )
816            .await;
817            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
818                .rows(rows)
819                .into());
820        }
821        ShowObject::SubscriptionCursor => {
822            let rows = show_all_sub_cursors_impl(
823                session.env().frontend_client_pool(),
824                session.env().worker_node_manager_ref(),
825            )
826            .await;
827
828            return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
829                .rows(rows)
830                .into());
831        }
832    };
833
834    // Apply filters.
835    let rows = rows.into_iter().filter(|row| match &filter {
836        Some(ShowStatementFilter::Like(pattern)) => like_default(&row.base_name(), pattern),
837        Some(ShowStatementFilter::ILike(pattern)) => i_like_default(&row.base_name(), pattern),
838        Some(ShowStatementFilter::Where(..)) => unreachable!(),
839        None => true,
840    });
841
842    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
843        .rows(rows)
844        .into())
845}
846
847pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
848    fields_to_descriptors(ShowCreateObjectRow::fields())
849}
850
851pub fn handle_show_create_object(
852    handle_args: HandlerArgs,
853    show_create_type: ShowCreateType,
854    name: ObjectName,
855) -> Result<RwPgResponse> {
856    let session = handle_args.session;
857    let catalog_reader = session.env().catalog_reader().read_guard();
858    let database = session.database();
859    let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
860    let search_path = session.config().search_path();
861    let user_name = &session.user_name();
862    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
863    let user_reader = session.env().user_info_reader().read_guard();
864    let current_user = user_reader
865        .get_user_by_name(user_name)
866        .expect("user not found");
867
868    let (sql, schema_name) = match show_create_type {
869        ShowCreateType::MaterializedView => {
870            let (mv, schema) = schema_path
871                .try_find(|schema_name| {
872                    Ok::<_, RwError>(
873                        catalog_reader
874                            .get_schema_by_name(&database, schema_name)?
875                            .get_created_table_by_name(&object_name)
876                            .filter(|t| {
877                                t.is_mview() && has_access_to_object(current_user, t.id, t.owner)
878                            }),
879                    )
880                })?
881                .ok_or_else(|| CatalogError::not_found("materialized view", name.to_string()))?;
882            (mv.create_sql(), schema)
883        }
884        ShowCreateType::View => {
885            let (view, schema) =
886                catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
887            if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
888                return Err(CatalogError::not_found("view", name.to_string()).into());
889            }
890            (view.create_sql(schema.to_owned()), schema)
891        }
892        ShowCreateType::Table => {
893            let (table, schema) = schema_path
894                .try_find(|schema_name| {
895                    Ok::<_, RwError>(
896                        catalog_reader
897                            .get_schema_by_name(&database, schema_name)?
898                            .get_created_table_by_name(&object_name)
899                            .filter(|t| {
900                                t.is_user_table()
901                                    && has_access_to_object(current_user, t.id, t.owner)
902                            }),
903                    )
904                })?
905                .ok_or_else(|| CatalogError::not_found("table", name.to_string()))?;
906
907            (table.create_sql_purified(), schema)
908        }
909        ShowCreateType::Sink => {
910            let (sink, schema) =
911                catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
912            if !has_access_to_object(current_user, sink.id, sink.owner) {
913                return Err(CatalogError::not_found("sink", name.to_string()).into());
914            }
915            (sink.create_sql(), schema)
916        }
917        ShowCreateType::Source => {
918            let (source, schema) = schema_path
919                .try_find(|schema_name| {
920                    Ok::<_, RwError>(
921                        catalog_reader
922                            .get_schema_by_name(&database, schema_name)?
923                            .get_source_by_name(&object_name)
924                            .filter(|s| {
925                                s.associated_table_id.is_none()
926                                    && has_access_to_object(current_user, s.id, s.owner)
927                            }),
928                    )
929                })?
930                .ok_or_else(|| CatalogError::not_found("source", name.to_string()))?;
931            (source.create_sql_purified(), schema)
932        }
933        ShowCreateType::Index => {
934            let (index, schema) = schema_path
935                .try_find(|schema_name| {
936                    Ok::<_, RwError>(
937                        catalog_reader
938                            .get_schema_by_name(&database, schema_name)?
939                            .get_created_table_by_name(&object_name)
940                            .filter(|t| {
941                                t.is_index() && has_access_to_object(current_user, t.id, t.owner)
942                            }),
943                    )
944                })?
945                .ok_or_else(|| CatalogError::not_found("index", name.to_string()))?;
946            (index.create_sql(), schema)
947        }
948        ShowCreateType::Function => {
949            bail_not_implemented!("show create on: {}", show_create_type);
950        }
951        ShowCreateType::Subscription => {
952            let (subscription, schema) =
953                catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
954            if !has_access_to_object(current_user, subscription.id, subscription.owner) {
955                return Err(CatalogError::not_found("subscription", name.to_string()).into());
956            }
957            (subscription.create_sql(), schema)
958        }
959    };
960    let name = format!("{}.{}", schema_name, object_name);
961
962    Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
963        .rows([ShowCreateObjectRow {
964            name,
965            create_sql: sql,
966        }])
967        .into())
968}
969
970async fn show_all_sub_cursors_impl(
971    frontend_client_pool: FrontendClientPoolRef,
972    worker_node_manager: WorkerNodeManagerRef,
973) -> Vec<ShowSubscriptionCursorRow> {
974    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowSubscriptionCursorRow> {
975        vec![ShowSubscriptionCursorRow {
976            worker_id: format!("{}", worker_id),
977            session_id: "".to_owned(),
978            user: "".to_owned(),
979            host: "".to_owned(),
980            database: "".to_owned(),
981            cursor_name: "".to_owned(),
982            subscription_name: "".to_owned(),
983            state: "".to_owned(),
984            idle_duration_ms: 0,
985            info: Some(format!(
986                "Failed to show subscription cursor from worker {worker_id} due to: {err_msg}"
987            )),
988        }]
989    }
990
991    let futures = worker_node_manager
992        .list_frontend_nodes()
993        .into_iter()
994        .map(|worker| {
995            let frontend_client_pool_ = frontend_client_pool.clone();
996            async move {
997                let client = match frontend_client_pool_.get(&worker).await {
998                    Ok(client) => client,
999                    Err(e) => {
1000                        return on_error(worker.id, format!("{}", e.as_report()));
1001                    }
1002                };
1003
1004                let resp = match client.get_all_sub_cursors(GetAllSubCursorsRequest {}).await {
1005                    Ok(resp) => resp,
1006                    Err(e) => {
1007                        return on_error(worker.id, format!("{}", e.as_report()));
1008                    }
1009                };
1010
1011                resp.into_inner()
1012                    .subscription_cursors
1013                    .into_iter()
1014                    .flat_map(|sub_cursor| {
1015                        let worker_id = worker.id.to_string();
1016                        let session_id = sub_cursor.session_id.to_string();
1017
1018                        let user = sub_cursor.user_name;
1019                        let host = sub_cursor.peer_addr;
1020                        let database = sub_cursor.database;
1021
1022                        let size = sub_cursor.states.len();
1023                        let mut sub_cursors = vec![];
1024                        for index in 0..size {
1025                            sub_cursors.push(ShowSubscriptionCursorRow {
1026                                worker_id: worker_id.clone(),
1027                                session_id: session_id.clone(),
1028                                user: user.clone(),
1029                                host: host.clone(),
1030                                database: database.clone(),
1031                                cursor_name: sub_cursor.cursor_names[index].clone(),
1032                                subscription_name: sub_cursor.subscription_names[index].clone(),
1033                                state: sub_cursor.states[index].clone(),
1034                                idle_duration_ms: sub_cursor.idle_durations[index] as i64,
1035                                info: None,
1036                            })
1037                        }
1038                        sub_cursors
1039                    })
1040                    .collect_vec()
1041            }
1042        })
1043        .collect_vec();
1044    join_all(futures).await.into_iter().flatten().collect()
1045}
1046
1047async fn show_all_cursors_impl(
1048    frontend_client_pool: FrontendClientPoolRef,
1049    worker_node_manager: WorkerNodeManagerRef,
1050) -> Vec<ShowCursorRow> {
1051    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowCursorRow> {
1052        vec![ShowCursorRow {
1053            worker_id: format!("{}", worker_id),
1054            session_id: "".to_owned(),
1055            user: "".to_owned(),
1056            host: "".to_owned(),
1057            database: "".to_owned(),
1058            cursor_name: "".to_owned(),
1059            info: Some(format!(
1060                "Failed to show cursor from worker {worker_id} due to: {err_msg}"
1061            )),
1062        }]
1063    }
1064    let futures = worker_node_manager
1065        .list_frontend_nodes()
1066        .into_iter()
1067        .map(|worker| {
1068            let frontend_client_pool_ = frontend_client_pool.clone();
1069            async move {
1070                let client = match frontend_client_pool_.get(&worker).await {
1071                    Ok(client) => client,
1072                    Err(e) => {
1073                        return on_error(worker.id, format!("{}", e.as_report()));
1074                    }
1075                };
1076
1077                let resp = match client.get_all_cursors(GetAllCursorsRequest {}).await {
1078                    Ok(resp) => resp,
1079                    Err(e) => {
1080                        return on_error(worker.id, format!("{}", e.as_report()));
1081                    }
1082                };
1083
1084                resp.into_inner()
1085                    .all_cursors
1086                    .into_iter()
1087                    .flat_map(|cursors| {
1088                        let worker_id = worker.id.to_string();
1089                        let session_id = cursors.session_id.to_string();
1090
1091                        let user = cursors.user_name;
1092                        let host = cursors.peer_addr;
1093                        let database = cursors.database;
1094
1095                        cursors
1096                            .cursor_names
1097                            .into_iter()
1098                            .map(|cursor_name| ShowCursorRow {
1099                                worker_id: worker_id.clone(),
1100                                session_id: session_id.clone(),
1101                                user: user.clone(),
1102                                host: host.clone(),
1103                                database: database.clone(),
1104                                cursor_name,
1105                                info: None,
1106                            })
1107                            .collect_vec()
1108                    })
1109                    .collect_vec()
1110            }
1111        })
1112        .collect_vec();
1113    join_all(futures).await.into_iter().flatten().collect()
1114}
1115
1116async fn show_process_list_impl(
1117    frontend_client_pool: FrontendClientPoolRef,
1118    worker_node_manager: WorkerNodeManagerRef,
1119) -> Vec<ShowProcessListRow> {
1120    // Create a placeholder row for the worker in case of any errors while fetching its running SQLs.
1121    fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowProcessListRow> {
1122        vec![ShowProcessListRow {
1123            worker_id: format!("{}", worker_id),
1124            id: "".to_owned(),
1125            user: "".to_owned(),
1126            host: "".to_owned(),
1127            database: "".to_owned(),
1128            time: None,
1129            info: Some(format!(
1130                "Failed to show process list from worker {worker_id} due to: {err_msg}"
1131            )),
1132        }]
1133    }
1134    let futures = worker_node_manager
1135        .list_frontend_nodes()
1136        .into_iter()
1137        .map(|worker| {
1138            let frontend_client_pool_ = frontend_client_pool.clone();
1139            async move {
1140                let client = match frontend_client_pool_.get(&worker).await {
1141                    Ok(client) => client,
1142                    Err(e) => {
1143                        return on_error(worker.id, format!("{}", e.as_report()));
1144                    }
1145                };
1146                let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
1147                    Ok(resp) => resp,
1148                    Err(e) => {
1149                        return on_error(worker.id, format!("{}", e.as_report()));
1150                    }
1151                };
1152                resp.into_inner()
1153                    .running_sqls
1154                    .into_iter()
1155                    .map(|sql| ShowProcessListRow {
1156                        worker_id: format!("{}", worker.id),
1157                        id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
1158                        user: sql.user_name,
1159                        host: sql.peer_addr,
1160                        database: sql.database,
1161                        time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
1162                        info: sql
1163                            .sql
1164                            .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
1165                    })
1166                    .collect_vec()
1167            }
1168        })
1169        .collect_vec();
1170    join_all(futures).await.into_iter().flatten().collect()
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175    use std::ops::Index;
1176
1177    use futures_async_stream::for_await;
1178
1179    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1180
1181    #[tokio::test]
1182    async fn test_show_source() {
1183        let frontend = LocalFrontend::new(Default::default()).await;
1184
1185        let sql = r#"CREATE SOURCE t1 (column1 varchar)
1186        WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1187        FORMAT PLAIN ENCODE JSON"#;
1188        frontend.run_sql(sql).await.unwrap();
1189
1190        let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
1191        rows.sort();
1192        assert_eq!(rows, vec!["Row([Some(b\"public.t1\")])".to_owned(),]);
1193    }
1194
1195    #[tokio::test]
1196    async fn test_show_column() {
1197        let proto_file = create_proto_file(PROTO_FILE_DATA);
1198        let sql = format!(
1199            r#"CREATE SOURCE t
1200    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1201    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1202            proto_file.path().to_str().unwrap()
1203        );
1204        let frontend = LocalFrontend::new(Default::default()).await;
1205        frontend.run_sql(sql).await.unwrap();
1206
1207        let sql = "show columns from t";
1208        let mut pg_response = frontend.run_sql(sql).await.unwrap();
1209
1210        let mut columns = Vec::new();
1211        #[for_await]
1212        for row_set in pg_response.values_stream() {
1213            let row_set = row_set.unwrap();
1214            for row in row_set {
1215                columns.push((
1216                    std::str::from_utf8(row.index(0).as_ref().unwrap())
1217                        .unwrap()
1218                        .to_owned(),
1219                    std::str::from_utf8(row.index(1).as_ref().unwrap())
1220                        .unwrap()
1221                        .to_owned(),
1222                ));
1223            }
1224        }
1225
1226        expect_test::expect![[r#"
1227            [
1228                (
1229                    "id",
1230                    "integer",
1231                ),
1232                (
1233                    "country",
1234                    "struct",
1235                ),
1236                (
1237                    "country.address",
1238                    "character varying",
1239                ),
1240                (
1241                    "country.city",
1242                    "struct",
1243                ),
1244                (
1245                    "country.city.address",
1246                    "character varying",
1247                ),
1248                (
1249                    "country.city.zipcode",
1250                    "character varying",
1251                ),
1252                (
1253                    "country.zipcode",
1254                    "character varying",
1255                ),
1256                (
1257                    "zipcode",
1258                    "bigint",
1259                ),
1260                (
1261                    "rate",
1262                    "real",
1263                ),
1264                (
1265                    "_rw_kafka_timestamp",
1266                    "timestamp with time zone",
1267                ),
1268                (
1269                    "_rw_kafka_partition",
1270                    "character varying",
1271                ),
1272                (
1273                    "_rw_kafka_offset",
1274                    "character varying",
1275                ),
1276                (
1277                    "_row_id",
1278                    "serial",
1279                ),
1280            ]
1281        "#]]
1282        .assert_debug_eq(&columns);
1283    }
1284}