1use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
23use risingwave_common::bail_not_implemented;
24use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
25use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
26use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
27use risingwave_common::util::addr::HostAddr;
28use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
29use risingwave_expr::scalar::like::{i_like_default, like_default};
30use risingwave_pb::catalog::connection;
31use risingwave_pb::frontend_service::{
32 GetAllCursorsRequest, GetAllSubCursorsRequest, GetRunningSqlsRequest,
33};
34use risingwave_pb::id::WorkerId;
35use risingwave_rpc_client::FrontendClientPoolRef;
36use risingwave_sqlparser::ast::{
37 Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
38};
39use thiserror_ext::AsReport;
40
41use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
42use crate::binder::{Binder, Relation};
43use crate::catalog::catalog_service::CatalogReadGuard;
44use crate::catalog::root_catalog::SchemaPath;
45use crate::catalog::schema_catalog::SchemaCatalog;
46use crate::catalog::{CatalogError, IndexCatalog};
47use crate::error::{Result, RwError};
48use crate::handler::HandlerArgs;
49use crate::handler::create_connection::print_connection_params;
50use crate::session::{SessionImpl, WorkerProcessId};
51use crate::user::user_catalog::UserCatalog;
52use crate::user::{has_access_to_object, has_schema_usage_privilege};
53
54pub fn get_columns_from_table(
55 session: &SessionImpl,
56 table_name: ObjectName,
57) -> Result<Vec<ColumnCatalog>> {
58 let mut binder = Binder::new_for_system(session);
59 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
60 let column_catalogs = match relation {
61 Relation::Source(s) => s.catalog.columns,
62 Relation::BaseTable(t) => t.table_catalog.columns.clone(),
63 Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
64 _ => {
65 return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
66 }
67 };
68
69 Ok(column_catalogs)
70}
71
72pub fn get_columns_from_sink(
73 session: &SessionImpl,
74 sink_name: ObjectName,
75) -> Result<Vec<ColumnCatalog>> {
76 let binder = Binder::new_for_system(session);
77 let sink = binder.bind_sink_by_name(sink_name)?;
78 Ok(sink.sink_catalog.full_columns().to_vec())
79}
80
81pub fn get_columns_from_view(
82 session: &SessionImpl,
83 view_name: ObjectName,
84) -> Result<Vec<ColumnCatalog>> {
85 let binder = Binder::new_for_system(session);
86 let view = binder.bind_view_by_name(view_name)?;
87
88 Ok(view
89 .view_catalog
90 .columns
91 .iter()
92 .enumerate()
93 .map(|(idx, field)| ColumnCatalog {
94 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
95 is_hidden: false,
96 })
97 .collect())
98}
99
100pub fn get_indexes_from_table(
101 session: &SessionImpl,
102 table_name: ObjectName,
103) -> Result<Vec<Arc<IndexCatalog>>> {
104 let mut binder = Binder::new_for_system(session);
105 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
106 let indexes = match relation {
107 Relation::BaseTable(t) => t.table_indexes,
108 _ => {
109 return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
110 }
111 };
112
113 Ok(indexes)
114}
115
116fn schema_or_search_path(
117 session: &Arc<SessionImpl>,
118 schema: &Option<Ident>,
119 search_path: &SearchPath,
120) -> Vec<String> {
121 if let Some(s) = schema {
122 vec![s.real_value()]
123 } else {
124 search_path
125 .real_path()
126 .iter()
127 .map(|s| {
128 if s.eq(USER_NAME_WILD_CARD) {
129 session.user_name()
130 } else {
131 s.clone()
132 }
133 })
134 .collect()
135 }
136}
137
138fn iter_schema_items<F, T>(
139 session: &Arc<SessionImpl>,
140 schema: &Option<Ident>,
141 reader: &CatalogReadGuard,
142 current_user: &UserCatalog,
143 mut f: F,
144) -> Vec<T>
145where
146 F: FnMut(&SchemaCatalog) -> Vec<T>,
147{
148 let search_path = session.config().search_path();
149
150 schema_or_search_path(session, schema, &search_path)
151 .into_iter()
152 .filter_map(|schema| {
153 if let Ok(schema_catalog) =
154 reader.get_schema_by_name(&session.database(), schema.as_ref())
155 && (has_schema_usage_privilege(
156 current_user,
157 schema_catalog.id(),
158 schema_catalog.owner,
159 ))
160 {
161 Some(schema_catalog)
162 } else {
163 None
164 }
165 })
166 .flat_map(|s| f(s).into_iter())
167 .collect()
168}
169
170struct ObjectNameField(ObjectName);
173
174fn with_schema_name(schema_name: &str, name: &str) -> ObjectNameField {
175 if schema_name.is_empty() {
176 ObjectNameField(ObjectName(vec![name.into()]))
177 } else {
178 ObjectNameField(ObjectName(vec![schema_name.into(), name.into()]))
179 }
180}
181
182impl WithDataType for ObjectNameField {
183 fn default_data_type() -> DataType {
184 DataType::Varchar
185 }
186}
187
188impl ToOwnedDatum for ObjectNameField {
189 fn to_owned_datum(self) -> Datum {
190 Some(self.0.to_string().into())
191 }
192}
193
194#[derive(Fields)]
195#[fields(style = "Title Case")]
196struct ShowObjectRow {
197 name: ObjectNameField,
198}
199
200impl ShowObjectRow {
201 fn base_name(&self) -> String {
202 self.name.0.base_name()
203 }
204}
205
206#[derive(Fields)]
207#[fields(style = "Title Case")]
208struct ShowDatabaseRow {
209 name: String,
210}
211
212#[derive(Fields)]
213#[fields(style = "Title Case")]
214struct ShowSchemaRow {
215 name: String,
216}
217
218#[derive(Fields)]
219#[fields(style = "Title Case")]
220pub struct ShowColumnRow {
221 pub name: ShowColumnName,
222 pub r#type: String,
223 pub is_hidden: Option<String>, pub description: Option<String>,
225}
226
227#[derive(Clone, Debug)]
228enum ShowColumnNameSegment {
229 Field(Ident),
230 ListElement,
231}
232
233impl ShowColumnNameSegment {
234 pub fn field(name: &str) -> Self {
235 ShowColumnNameSegment::Field(Ident::from_real_value(name))
236 }
237}
238
239#[derive(Clone, Debug)]
241pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
242
243impl ShowColumnName {
244 pub fn special(name: &str) -> Self {
247 ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
248 name,
249 ))])
250 }
251}
252
253impl WithDataType for ShowColumnName {
254 fn default_data_type() -> DataType {
255 DataType::Varchar
256 }
257}
258
259impl ToOwnedDatum for ShowColumnName {
260 fn to_owned_datum(self) -> Datum {
261 use std::fmt::Write;
262
263 let mut s = String::new();
264 for segment in self.0 {
265 match segment {
266 ShowColumnNameSegment::Field(ident) => {
267 if !s.is_empty() {
268 s.push('.');
270 }
271 write!(s, "{ident}").unwrap();
272 }
273 ShowColumnNameSegment::ListElement => {
274 s.push_str("[1]");
275 }
276 }
277 }
278 s.to_owned_datum()
279 }
280}
281
282impl ShowColumnRow {
283 fn flatten(
286 name: ShowColumnName,
287 data_type: DataType,
288 is_hidden: bool,
289 description: Option<String>,
290 ) -> Vec<Self> {
291 let r#type = match &data_type {
293 DataType::Struct(_) => "struct".to_owned(),
294 DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
295 d => d.to_string(),
296 };
297
298 let mut rows = vec![ShowColumnRow {
299 name: name.clone(),
300 r#type,
301 is_hidden: Some(is_hidden.to_string()),
302 description,
303 }];
304
305 match data_type {
306 DataType::Struct(st) => {
307 rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
308 let mut name = name.clone();
309 name.0.push(ShowColumnNameSegment::field(field_name));
310 Self::flatten(name, field_data_type.clone(), is_hidden, None)
311 }));
312 }
313
314 DataType::List(list) if let DataType::Struct(_) = list.elem() => {
315 let mut name = name.clone();
316 name.0.push(ShowColumnNameSegment::ListElement);
317 rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
318 }
319
320 _ => {}
321 }
322
323 rows
324 }
325
326 pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
327 Self::flatten(
328 ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
329 col.column_desc.data_type,
330 col.is_hidden,
331 col.column_desc.description,
332 )
333 }
334}
335
336#[derive(Fields)]
337#[fields(style = "Title Case")]
338struct ShowConnectionRow {
339 name: ObjectNameField,
340 r#type: String,
341 properties: String,
342}
343
344#[derive(Fields)]
345#[fields(style = "Title Case")]
346struct ShowFunctionRow {
347 name: ObjectNameField,
348 arguments: String,
349 return_type: String,
350 language: String,
351 link: Option<String>,
352}
353
354#[derive(Fields)]
355#[fields(style = "Title Case")]
356struct ShowIndexRow {
357 name: String,
358 on: String,
359 key: String,
360 include: String,
361 distributed_by: String,
362}
363
364impl From<Arc<IndexCatalog>> for ShowIndexRow {
365 fn from(index: Arc<IndexCatalog>) -> Self {
366 let index_display = index.display();
367 ShowIndexRow {
368 name: index.name.clone(),
369 on: index.primary_table.name.clone(),
370 key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
371 include: display_comma_separated(&index_display.include_columns).to_string(),
372 distributed_by: display_comma_separated(&index_display.distributed_by_columns)
373 .to_string(),
374 }
375 }
376}
377
378#[derive(Fields)]
379#[fields(style = "Title Case")]
380struct ShowClusterRow {
381 id: WorkerId,
382 addr: String,
383 r#type: String,
384 state: String,
385 parallelism: Option<i32>,
386 is_streaming: Option<bool>,
387 is_serving: Option<bool>,
388 started_at: Option<Timestamptz>,
389}
390
391#[derive(Fields)]
392#[fields(style = "Title Case")]
393struct ShowJobRow {
394 id: i64,
395 statement: String,
396 create_type: String,
397 progress: String,
398}
399
400#[derive(Fields)]
401#[fields(style = "Title Case")]
402struct ShowProcessListRow {
403 worker_id: String,
404 id: String,
405 user: String,
406 host: String,
407 database: String,
408 time: Option<String>,
409 info: Option<String>,
410}
411
412#[derive(Fields)]
413#[fields(style = "Title Case")]
414struct ShowCreateObjectRow {
415 name: String,
416 create_sql: String,
417}
418
419#[derive(Fields)]
420#[fields(style = "Title Case")]
421struct ShowSubscriptionRow {
422 name: ObjectNameField,
423 retention_seconds: i64,
424}
425
426#[derive(Fields)]
427#[fields(style = "Title Case")]
428struct ShowCursorRow {
429 worker_id: String,
430 session_id: String,
431 user: String,
432 host: String,
433 database: String,
434 cursor_name: String,
435 info: Option<String>,
436}
437
438#[derive(Fields)]
439#[fields(style = "Title Case")]
440struct ShowSubscriptionCursorRow {
441 worker_id: String,
442 session_id: String,
443 user: String,
444 host: String,
445 database: String,
446 cursor_name: String,
447 subscription_name: String,
448 state: String,
449 idle_duration_ms: i64,
450 info: Option<String>,
451}
452
453pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
455 fields_to_descriptors(match objects {
456 ShowObject::Database => ShowDatabaseRow::fields(),
457 ShowObject::Schema => ShowSchemaRow::fields(),
458 ShowObject::Columns { .. } => ShowColumnRow::fields(),
459 ShowObject::Connection { .. } => ShowConnectionRow::fields(),
460 ShowObject::Function { .. } => ShowFunctionRow::fields(),
461 ShowObject::Indexes { .. } => ShowIndexRow::fields(),
462 ShowObject::Cluster => ShowClusterRow::fields(),
463 ShowObject::Jobs => ShowJobRow::fields(),
464 ShowObject::ProcessList => ShowProcessListRow::fields(),
465 ShowObject::Cursor => ShowCursorRow::fields(),
466 ShowObject::SubscriptionCursor => ShowSubscriptionCursorRow::fields(),
467 ShowObject::Subscription { .. } => ShowSubscriptionRow::fields(),
468
469 ShowObject::Table { .. }
470 | ShowObject::InternalTable { .. }
471 | ShowObject::View { .. }
472 | ShowObject::MaterializedView { .. }
473 | ShowObject::Source { .. }
474 | ShowObject::Sink { .. }
475 | ShowObject::Secret { .. } => ShowObjectRow::fields(),
476 })
477}
478
479pub async fn handle_show_object(
480 handler_args: HandlerArgs,
481 command: ShowObject,
482 filter: Option<ShowStatementFilter>,
483) -> Result<RwPgResponse> {
484 let session = handler_args.session;
485
486 if let Some(ShowStatementFilter::Where(..)) = filter {
487 bail_not_implemented!("WHERE clause in SHOW statement");
488 }
489
490 let catalog_reader = session.env().catalog_reader();
491 let user_reader = session.env().user_info_reader();
492 let get_catalog_reader = || {
493 let reader = catalog_reader.read_guard();
494 let user_reader = user_reader.read_guard();
495 let current_user = user_reader
496 .get_user_by_name(&session.user_name())
497 .expect("user not found")
498 .clone();
499 (reader, current_user)
500 };
501
502 let rows: Vec<ShowObjectRow> = match command {
503 ShowObject::Table { schema } => {
504 let (reader, current_user) = get_catalog_reader();
505 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
506 schema
507 .iter_user_table()
508 .map(|t| with_schema_name(&schema.name, &t.name))
509 .map(|name| ShowObjectRow { name })
510 .collect()
511 })
512 }
513 ShowObject::InternalTable { schema } => {
514 let (reader, current_user) = get_catalog_reader();
515 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
516 schema
517 .iter_internal_table()
518 .map(|t| with_schema_name(&schema.name, &t.name))
519 .map(|name| ShowObjectRow { name })
520 .collect()
521 })
522 }
523 ShowObject::Database => {
524 let reader = catalog_reader.read_guard();
525 let rows = reader
526 .get_all_database_names()
527 .into_iter()
528 .map(|name| ShowDatabaseRow { name });
529 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
530 .rows(rows)
531 .into());
532 }
533 ShowObject::Schema => {
534 let reader = catalog_reader.read_guard();
535 let rows = reader
536 .get_all_schema_names(&session.database())?
537 .into_iter()
538 .map(|name| ShowSchemaRow { name });
539 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
540 .rows(rows)
541 .into());
542 }
543 ShowObject::View { schema } => {
544 let (reader, current_user) = get_catalog_reader();
545 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
546 schema
547 .iter_view()
548 .map(|t| with_schema_name(&schema.name, &t.name))
549 .map(|name| ShowObjectRow { name })
550 .collect()
551 })
552 }
553 ShowObject::MaterializedView { schema } => {
554 let (reader, current_user) = get_catalog_reader();
555 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
556 schema
557 .iter_created_mvs()
558 .map(|t| with_schema_name(&schema.name, &t.name))
559 .map(|name| ShowObjectRow { name })
560 .collect()
561 })
562 }
563 ShowObject::Source { schema } => {
564 let (reader, current_user) = get_catalog_reader();
565 let mut sources =
566 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
567 schema
568 .iter_source()
569 .map(|t| with_schema_name(&schema.name, &t.name))
570 .map(|name| ShowObjectRow { name })
571 .collect()
572 });
573 sources.extend(
574 session
575 .temporary_source_manager()
576 .keys()
577 .into_iter()
578 .map(|t| with_schema_name("", &t))
579 .map(|name| ShowObjectRow { name }),
580 );
581 sources
582 }
583 ShowObject::Sink { schema } => {
584 let (reader, current_user) = get_catalog_reader();
585 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
586 schema
587 .iter_sink()
588 .map(|t| with_schema_name(&schema.name, &t.name))
589 .map(|name| ShowObjectRow { name })
590 .collect()
591 })
592 }
593 ShowObject::Subscription { schema } => {
594 let (reader, current_user) = get_catalog_reader();
595 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
596 schema
597 .iter_subscription()
598 .map(|t| ShowSubscriptionRow {
599 name: with_schema_name(&schema.name, &t.name),
600 retention_seconds: t.retention_seconds as i64,
601 })
602 .collect()
603 });
604 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
605 .rows(rows)
606 .into());
607 }
608 ShowObject::Secret { schema } => {
609 let (reader, current_user) = get_catalog_reader();
610 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
611 schema
612 .iter_secret()
613 .map(|t| with_schema_name(&schema.name, &t.name))
614 .map(|name| ShowObjectRow { name })
615 .collect()
616 })
617 }
618 ShowObject::Columns { table } => {
619 let Ok(columns) = get_columns_from_table(&session, table.clone())
620 .or(get_columns_from_sink(&session, table.clone()))
621 .or(get_columns_from_view(&session, table.clone()))
622 else {
623 return Err(CatalogError::not_found(
624 "table, source, sink or view",
625 table.to_string(),
626 )
627 .into());
628 };
629
630 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
631 .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
632 .into());
633 }
634 ShowObject::Indexes { table } => {
635 let indexes = get_indexes_from_table(&session, table)?;
636
637 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
638 .rows(indexes.into_iter().map(ShowIndexRow::from))
639 .into());
640 }
641 ShowObject::Connection { schema } => {
642 let (reader, current_user) = get_catalog_reader();
643 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
644 schema.iter_connections()
645 .map(|c| {
646 let name = c.name.clone();
647 let r#type = match &c.info {
648 #[expect(deprecated)]
649 connection::Info::PrivateLinkService(_) => {
650 PRIVATELINK_CONNECTION.to_owned()
651 },
652 connection::Info::ConnectionParams(params) => {
653 params.get_connection_type().unwrap().as_str_name().to_owned()
654 }
655 };
656 let source_names = schema
657 .get_source_ids_by_connection(c.id)
658 .unwrap_or_default()
659 .into_iter()
660 .filter_map(|sid| schema.get_source_by_id(sid).map(|catalog| catalog.name.as_str()))
661 .collect_vec();
662 let sink_names = schema
663 .get_sink_ids_by_connection(c.id)
664 .unwrap_or_default()
665 .into_iter()
666 .filter_map(|sid| schema.get_sink_by_id(sid).map(|catalog| catalog.name.as_str()))
667 .collect_vec();
668 let properties = match &c.info {
669 #[expect(deprecated)]
670 connection::Info::PrivateLinkService(i) => {
671 format!(
672 "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
673 i.get_provider().unwrap().as_str_name(),
674 i.service_name,
675 i.endpoint_id,
676 serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
677 serde_json::to_string(&source_names).unwrap(),
678 serde_json::to_string(&sink_names).unwrap(),
679 )
680 }
681 connection::Info::ConnectionParams(params) => {
682 print_connection_params(&session.database(), params, &reader)
684 }
685 };
686 ShowConnectionRow {
687 name: with_schema_name(&schema.name, &name),
688 r#type,
689 properties,
690 }
691 }).collect_vec()
692 });
693 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
694 .rows(rows)
695 .into());
696 }
697 ShowObject::Function { schema } => {
698 let (reader, current_user) = get_catalog_reader();
699 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
700 schema
701 .iter_function()
702 .map(|t| ShowFunctionRow {
703 name: with_schema_name(&schema.name, &t.name),
704 arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
705 return_type: t.return_type.to_string(),
706 language: t.language.clone(),
707 link: t.link.clone(),
708 })
709 .collect()
710 });
711 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
712 .rows(rows)
713 .into());
714 }
715 ShowObject::Cluster => {
716 let workers = session.env().meta_client().list_all_nodes().await?;
717 let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
718 let addr: HostAddr = worker.host.as_ref().unwrap().into();
719 let property = worker.property.as_ref();
720 ShowClusterRow {
721 id: worker.id,
722 addr: addr.to_string(),
723 r#type: worker.get_type().unwrap().as_str_name().into(),
724 state: worker.get_state().unwrap().as_str_name().to_owned(),
725 parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
726 is_streaming: property.map(|p| p.is_streaming),
727 is_serving: property.map(|p| p.is_serving),
728 started_at: worker
729 .started_at
730 .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
731 }
732 });
733 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
734 .rows(rows)
735 .into());
736 }
737 ShowObject::Jobs => {
738 let resp = session.env().meta_client().get_ddl_progress().await?;
739 let rows = resp.into_iter().map(|job| ShowJobRow {
740 id: job.id as i64,
741 statement: job.statement,
742 create_type: job.create_type,
743 progress: job.progress,
744 });
745 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
746 .rows(rows)
747 .into());
748 }
749 ShowObject::ProcessList => {
750 let rows = show_process_list_impl(
751 session.env().frontend_client_pool(),
752 session.env().worker_node_manager_ref(),
753 )
754 .await;
755 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
756 .rows(rows)
757 .into());
758 }
759 ShowObject::Cursor => {
760 let rows = show_all_cursors_impl(
761 session.env().frontend_client_pool(),
762 session.env().worker_node_manager_ref(),
763 )
764 .await;
765 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
766 .rows(rows)
767 .into());
768 }
769 ShowObject::SubscriptionCursor => {
770 let rows = show_all_sub_cursors_impl(
771 session.env().frontend_client_pool(),
772 session.env().worker_node_manager_ref(),
773 )
774 .await;
775
776 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
777 .rows(rows)
778 .into());
779 }
780 };
781
782 let rows = rows.into_iter().filter(|row| match &filter {
784 Some(ShowStatementFilter::Like(pattern)) => like_default(&row.base_name(), pattern),
785 Some(ShowStatementFilter::ILike(pattern)) => i_like_default(&row.base_name(), pattern),
786 Some(ShowStatementFilter::Where(..)) => unreachable!(),
787 None => true,
788 });
789
790 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
791 .rows(rows)
792 .into())
793}
794
795pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
796 fields_to_descriptors(ShowCreateObjectRow::fields())
797}
798
799pub fn handle_show_create_object(
800 handle_args: HandlerArgs,
801 show_create_type: ShowCreateType,
802 name: ObjectName,
803) -> Result<RwPgResponse> {
804 let session = handle_args.session;
805 let catalog_reader = session.env().catalog_reader().read_guard();
806 let database = session.database();
807 let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
808 let search_path = session.config().search_path();
809 let user_name = &session.user_name();
810 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
811 let user_reader = session.env().user_info_reader().read_guard();
812 let current_user = user_reader
813 .get_user_by_name(user_name)
814 .expect("user not found");
815
816 let (sql, schema_name) = match show_create_type {
817 ShowCreateType::MaterializedView => {
818 let (mv, schema) = schema_path
819 .try_find(|schema_name| {
820 Ok::<_, RwError>(
821 catalog_reader
822 .get_schema_by_name(&database, schema_name)?
823 .get_created_table_by_name(&object_name)
824 .filter(|t| {
825 t.is_mview() && has_access_to_object(current_user, t.id, t.owner)
826 }),
827 )
828 })?
829 .ok_or_else(|| CatalogError::not_found("materialized view", name.to_string()))?;
830 (mv.create_sql(), schema)
831 }
832 ShowCreateType::View => {
833 let (view, schema) =
834 catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
835 if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
836 return Err(CatalogError::not_found("view", name.to_string()).into());
837 }
838 (view.create_sql(schema.to_owned()), schema)
839 }
840 ShowCreateType::Table => {
841 let (table, schema) = schema_path
842 .try_find(|schema_name| {
843 Ok::<_, RwError>(
844 catalog_reader
845 .get_schema_by_name(&database, schema_name)?
846 .get_created_table_by_name(&object_name)
847 .filter(|t| {
848 t.is_user_table()
849 && has_access_to_object(current_user, t.id, t.owner)
850 }),
851 )
852 })?
853 .ok_or_else(|| CatalogError::not_found("table", name.to_string()))?;
854
855 (table.create_sql_purified(), schema)
856 }
857 ShowCreateType::Sink => {
858 let (sink, schema) =
859 catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
860 if !has_access_to_object(current_user, sink.id, sink.owner) {
861 return Err(CatalogError::not_found("sink", name.to_string()).into());
862 }
863 (sink.create_sql(), schema)
864 }
865 ShowCreateType::Source => {
866 let (source, schema) = schema_path
867 .try_find(|schema_name| {
868 Ok::<_, RwError>(
869 catalog_reader
870 .get_schema_by_name(&database, schema_name)?
871 .get_source_by_name(&object_name)
872 .filter(|s| {
873 s.associated_table_id.is_none()
874 && has_access_to_object(current_user, s.id, s.owner)
875 }),
876 )
877 })?
878 .ok_or_else(|| CatalogError::not_found("source", name.to_string()))?;
879 (source.create_sql_purified(), schema)
880 }
881 ShowCreateType::Index => {
882 let (index, schema) = schema_path
883 .try_find(|schema_name| {
884 Ok::<_, RwError>(
885 catalog_reader
886 .get_schema_by_name(&database, schema_name)?
887 .get_created_table_by_name(&object_name)
888 .filter(|t| {
889 t.is_index() && has_access_to_object(current_user, t.id, t.owner)
890 }),
891 )
892 })?
893 .ok_or_else(|| CatalogError::not_found("index", name.to_string()))?;
894 (index.create_sql(), schema)
895 }
896 ShowCreateType::Function => {
897 bail_not_implemented!("show create on: {}", show_create_type);
898 }
899 ShowCreateType::Subscription => {
900 let (subscription, schema) =
901 catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
902 if !has_access_to_object(current_user, subscription.id, subscription.owner) {
903 return Err(CatalogError::not_found("subscription", name.to_string()).into());
904 }
905 (subscription.create_sql(), schema)
906 }
907 };
908 let name = format!("{}.{}", schema_name, object_name);
909
910 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
911 .rows([ShowCreateObjectRow {
912 name,
913 create_sql: sql,
914 }])
915 .into())
916}
917
918async fn show_all_sub_cursors_impl(
919 frontend_client_pool: FrontendClientPoolRef,
920 worker_node_manager: WorkerNodeManagerRef,
921) -> Vec<ShowSubscriptionCursorRow> {
922 fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowSubscriptionCursorRow> {
923 vec![ShowSubscriptionCursorRow {
924 worker_id: format!("{}", worker_id),
925 session_id: "".to_owned(),
926 user: "".to_owned(),
927 host: "".to_owned(),
928 database: "".to_owned(),
929 cursor_name: "".to_owned(),
930 subscription_name: "".to_owned(),
931 state: "".to_owned(),
932 idle_duration_ms: 0,
933 info: Some(format!(
934 "Failed to show subscription cursor from worker {worker_id} due to: {err_msg}"
935 )),
936 }]
937 }
938
939 let futures = worker_node_manager
940 .list_frontend_nodes()
941 .into_iter()
942 .map(|worker| {
943 let frontend_client_pool_ = frontend_client_pool.clone();
944 async move {
945 let client = match frontend_client_pool_.get(&worker).await {
946 Ok(client) => client,
947 Err(e) => {
948 return on_error(worker.id, format!("{}", e.as_report()));
949 }
950 };
951
952 let resp = match client.get_all_sub_cursors(GetAllSubCursorsRequest {}).await {
953 Ok(resp) => resp,
954 Err(e) => {
955 return on_error(worker.id, format!("{}", e.as_report()));
956 }
957 };
958
959 resp.into_inner()
960 .subscription_cursors
961 .into_iter()
962 .flat_map(|sub_cursor| {
963 let worker_id = worker.id.to_string();
964 let session_id = sub_cursor.session_id.to_string();
965
966 let user = sub_cursor.user_name;
967 let host = sub_cursor.peer_addr;
968 let database = sub_cursor.database;
969
970 let size = sub_cursor.states.len();
971 let mut sub_cursors = vec![];
972 for index in 0..size {
973 sub_cursors.push(ShowSubscriptionCursorRow {
974 worker_id: worker_id.clone(),
975 session_id: session_id.clone(),
976 user: user.clone(),
977 host: host.clone(),
978 database: database.clone(),
979 cursor_name: sub_cursor.cursor_names[index].clone(),
980 subscription_name: sub_cursor.subscription_names[index].clone(),
981 state: sub_cursor.states[index].clone(),
982 idle_duration_ms: sub_cursor.idle_durations[index] as i64,
983 info: None,
984 })
985 }
986 sub_cursors
987 })
988 .collect_vec()
989 }
990 })
991 .collect_vec();
992 join_all(futures).await.into_iter().flatten().collect()
993}
994
995async fn show_all_cursors_impl(
996 frontend_client_pool: FrontendClientPoolRef,
997 worker_node_manager: WorkerNodeManagerRef,
998) -> Vec<ShowCursorRow> {
999 fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowCursorRow> {
1000 vec![ShowCursorRow {
1001 worker_id: format!("{}", worker_id),
1002 session_id: "".to_owned(),
1003 user: "".to_owned(),
1004 host: "".to_owned(),
1005 database: "".to_owned(),
1006 cursor_name: "".to_owned(),
1007 info: Some(format!(
1008 "Failed to show cursor from worker {worker_id} due to: {err_msg}"
1009 )),
1010 }]
1011 }
1012 let futures = worker_node_manager
1013 .list_frontend_nodes()
1014 .into_iter()
1015 .map(|worker| {
1016 let frontend_client_pool_ = frontend_client_pool.clone();
1017 async move {
1018 let client = match frontend_client_pool_.get(&worker).await {
1019 Ok(client) => client,
1020 Err(e) => {
1021 return on_error(worker.id, format!("{}", e.as_report()));
1022 }
1023 };
1024
1025 let resp = match client.get_all_cursors(GetAllCursorsRequest {}).await {
1026 Ok(resp) => resp,
1027 Err(e) => {
1028 return on_error(worker.id, format!("{}", e.as_report()));
1029 }
1030 };
1031
1032 resp.into_inner()
1033 .all_cursors
1034 .into_iter()
1035 .flat_map(|cursors| {
1036 let worker_id = worker.id.to_string();
1037 let session_id = cursors.session_id.to_string();
1038
1039 let user = cursors.user_name;
1040 let host = cursors.peer_addr;
1041 let database = cursors.database;
1042
1043 cursors
1044 .cursor_names
1045 .into_iter()
1046 .map(|cursor_name| ShowCursorRow {
1047 worker_id: worker_id.clone(),
1048 session_id: session_id.clone(),
1049 user: user.clone(),
1050 host: host.clone(),
1051 database: database.clone(),
1052 cursor_name,
1053 info: None,
1054 })
1055 .collect_vec()
1056 })
1057 .collect_vec()
1058 }
1059 })
1060 .collect_vec();
1061 join_all(futures).await.into_iter().flatten().collect()
1062}
1063
1064async fn show_process_list_impl(
1065 frontend_client_pool: FrontendClientPoolRef,
1066 worker_node_manager: WorkerNodeManagerRef,
1067) -> Vec<ShowProcessListRow> {
1068 fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowProcessListRow> {
1070 vec![ShowProcessListRow {
1071 worker_id: format!("{}", worker_id),
1072 id: "".to_owned(),
1073 user: "".to_owned(),
1074 host: "".to_owned(),
1075 database: "".to_owned(),
1076 time: None,
1077 info: Some(format!(
1078 "Failed to show process list from worker {worker_id} due to: {err_msg}"
1079 )),
1080 }]
1081 }
1082 let futures = worker_node_manager
1083 .list_frontend_nodes()
1084 .into_iter()
1085 .map(|worker| {
1086 let frontend_client_pool_ = frontend_client_pool.clone();
1087 async move {
1088 let client = match frontend_client_pool_.get(&worker).await {
1089 Ok(client) => client,
1090 Err(e) => {
1091 return on_error(worker.id, format!("{}", e.as_report()));
1092 }
1093 };
1094 let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
1095 Ok(resp) => resp,
1096 Err(e) => {
1097 return on_error(worker.id, format!("{}", e.as_report()));
1098 }
1099 };
1100 resp.into_inner()
1101 .running_sqls
1102 .into_iter()
1103 .map(|sql| ShowProcessListRow {
1104 worker_id: format!("{}", worker.id),
1105 id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
1106 user: sql.user_name,
1107 host: sql.peer_addr,
1108 database: sql.database,
1109 time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
1110 info: sql
1111 .sql
1112 .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
1113 })
1114 .collect_vec()
1115 }
1116 })
1117 .collect_vec();
1118 join_all(futures).await.into_iter().flatten().collect()
1119}
1120
1121#[cfg(test)]
1122mod tests {
1123 use std::ops::Index;
1124
1125 use futures_async_stream::for_await;
1126
1127 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1128
1129 #[tokio::test]
1130 async fn test_show_source() {
1131 let frontend = LocalFrontend::new(Default::default()).await;
1132
1133 let sql = r#"CREATE SOURCE t1 (column1 varchar)
1134 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1135 FORMAT PLAIN ENCODE JSON"#;
1136 frontend.run_sql(sql).await.unwrap();
1137
1138 let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
1139 rows.sort();
1140 assert_eq!(rows, vec!["Row([Some(b\"public.t1\")])".to_owned(),]);
1141 }
1142
1143 #[tokio::test]
1144 async fn test_show_column() {
1145 let proto_file = create_proto_file(PROTO_FILE_DATA);
1146 let sql = format!(
1147 r#"CREATE SOURCE t
1148 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1149 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1150 proto_file.path().to_str().unwrap()
1151 );
1152 let frontend = LocalFrontend::new(Default::default()).await;
1153 frontend.run_sql(sql).await.unwrap();
1154
1155 let sql = "show columns from t";
1156 let mut pg_response = frontend.run_sql(sql).await.unwrap();
1157
1158 let mut columns = Vec::new();
1159 #[for_await]
1160 for row_set in pg_response.values_stream() {
1161 let row_set = row_set.unwrap();
1162 for row in row_set {
1163 columns.push((
1164 std::str::from_utf8(row.index(0).as_ref().unwrap())
1165 .unwrap()
1166 .to_owned(),
1167 std::str::from_utf8(row.index(1).as_ref().unwrap())
1168 .unwrap()
1169 .to_owned(),
1170 ));
1171 }
1172 }
1173
1174 expect_test::expect![[r#"
1175 [
1176 (
1177 "id",
1178 "integer",
1179 ),
1180 (
1181 "country",
1182 "struct",
1183 ),
1184 (
1185 "country.address",
1186 "character varying",
1187 ),
1188 (
1189 "country.city",
1190 "struct",
1191 ),
1192 (
1193 "country.city.address",
1194 "character varying",
1195 ),
1196 (
1197 "country.city.zipcode",
1198 "character varying",
1199 ),
1200 (
1201 "country.zipcode",
1202 "character varying",
1203 ),
1204 (
1205 "zipcode",
1206 "bigint",
1207 ),
1208 (
1209 "rate",
1210 "real",
1211 ),
1212 (
1213 "_rw_kafka_timestamp",
1214 "timestamp with time zone",
1215 ),
1216 (
1217 "_rw_kafka_partition",
1218 "character varying",
1219 ),
1220 (
1221 "_rw_kafka_offset",
1222 "character varying",
1223 ),
1224 (
1225 "_row_id",
1226 "serial",
1227 ),
1228 ]
1229 "#]]
1230 .assert_debug_eq(&columns);
1231 }
1232}