1use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::pg_server::Session;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::GetRunningSqlsRequest;
33use risingwave_rpc_client::FrontendClientPoolRef;
34use risingwave_sqlparser::ast::{
35 Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
36};
37use thiserror_ext::AsReport;
38
39use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
40use crate::binder::{Binder, Relation};
41use crate::catalog::catalog_service::CatalogReadGuard;
42use crate::catalog::root_catalog::SchemaPath;
43use crate::catalog::schema_catalog::SchemaCatalog;
44use crate::catalog::{CatalogError, IndexCatalog};
45use crate::error::{Result, RwError};
46use crate::handler::HandlerArgs;
47use crate::handler::create_connection::print_connection_params;
48use crate::session::cursor_manager::SubscriptionCursor;
49use crate::session::{SessionImpl, WorkerProcessId};
50use crate::user::has_access_to_object;
51use crate::user::user_catalog::UserCatalog;
52
53pub fn get_columns_from_table(
54 session: &SessionImpl,
55 table_name: ObjectName,
56) -> Result<Vec<ColumnCatalog>> {
57 let mut binder = Binder::new_for_system(session);
58 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
59 let column_catalogs = match relation {
60 Relation::Source(s) => s.catalog.columns,
61 Relation::BaseTable(t) => t.table_catalog.columns.clone(),
62 Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
63 _ => {
64 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
65 }
66 };
67
68 Ok(column_catalogs)
69}
70
71pub fn get_columns_from_sink(
72 session: &SessionImpl,
73 sink_name: ObjectName,
74) -> Result<Vec<ColumnCatalog>> {
75 let binder = Binder::new_for_system(session);
76 let sink = binder.bind_sink_by_name(sink_name)?;
77 Ok(sink.sink_catalog.full_columns().to_vec())
78}
79
80pub fn get_columns_from_view(
81 session: &SessionImpl,
82 view_name: ObjectName,
83) -> Result<Vec<ColumnCatalog>> {
84 let binder = Binder::new_for_system(session);
85 let view = binder.bind_view_by_name(view_name)?;
86
87 Ok(view
88 .view_catalog
89 .columns
90 .iter()
91 .enumerate()
92 .map(|(idx, field)| ColumnCatalog {
93 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
94 is_hidden: false,
95 })
96 .collect())
97}
98
99pub fn get_indexes_from_table(
100 session: &SessionImpl,
101 table_name: ObjectName,
102) -> Result<Vec<Arc<IndexCatalog>>> {
103 let mut binder = Binder::new_for_system(session);
104 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
105 let indexes = match relation {
106 Relation::BaseTable(t) => t.table_indexes,
107 _ => {
108 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
109 }
110 };
111
112 Ok(indexes)
113}
114
115fn schema_or_search_path(
116 session: &Arc<SessionImpl>,
117 schema: &Option<Ident>,
118 search_path: &SearchPath,
119) -> Vec<String> {
120 if let Some(s) = schema {
121 vec![s.real_value()]
122 } else {
123 search_path
124 .real_path()
125 .iter()
126 .map(|s| {
127 if s.eq(USER_NAME_WILD_CARD) {
128 session.user_name()
129 } else {
130 s.clone()
131 }
132 })
133 .collect()
134 }
135}
136
137fn iter_schema_items<F, T>(
138 session: &Arc<SessionImpl>,
139 schema: &Option<Ident>,
140 reader: &CatalogReadGuard,
141 current_user: &UserCatalog,
142 mut f: F,
143) -> Vec<T>
144where
145 F: FnMut(&SchemaCatalog) -> Vec<T>,
146{
147 let search_path = session.config().search_path();
148
149 schema_or_search_path(session, schema, &search_path)
150 .into_iter()
151 .filter_map(|schema| {
152 if let Ok(schema_catalog) =
153 reader.get_schema_by_name(&session.database(), schema.as_ref())
154 && (current_user.is_super
155 || current_user.has_schema_usage_privilege(schema_catalog.id()))
156 {
157 Some(schema_catalog)
158 } else {
159 None
160 }
161 })
162 .flat_map(|s| f(s).into_iter())
163 .collect()
164}
165
166struct ObjectNameField(ObjectName);
169
170fn with_schema_name(schema_name: &str, name: &str) -> ObjectNameField {
171 if schema_name.is_empty() {
172 ObjectNameField(ObjectName(vec![name.into()]))
173 } else {
174 ObjectNameField(ObjectName(vec![schema_name.into(), name.into()]))
175 }
176}
177
178impl WithDataType for ObjectNameField {
179 fn default_data_type() -> DataType {
180 DataType::Varchar
181 }
182}
183
184impl ToOwnedDatum for ObjectNameField {
185 fn to_owned_datum(self) -> Datum {
186 Some(self.0.to_string().into())
187 }
188}
189
190#[derive(Fields)]
191#[fields(style = "Title Case")]
192struct ShowObjectRow {
193 name: ObjectNameField,
194}
195
196impl ShowObjectRow {
197 fn base_name(&self) -> String {
198 self.name.0.base_name()
199 }
200}
201
202#[derive(Fields)]
203#[fields(style = "Title Case")]
204struct ShowDatabaseRow {
205 name: String,
206}
207
208#[derive(Fields)]
209#[fields(style = "Title Case")]
210struct ShowSchemaRow {
211 name: String,
212}
213
214#[derive(Fields)]
215#[fields(style = "Title Case")]
216pub struct ShowColumnRow {
217 pub name: ShowColumnName,
218 pub r#type: String,
219 pub is_hidden: Option<String>, pub description: Option<String>,
221}
222
223#[derive(Clone, Debug)]
224enum ShowColumnNameSegment {
225 Field(Ident),
226 ListElement,
227}
228
229impl ShowColumnNameSegment {
230 pub fn field(name: &str) -> Self {
231 ShowColumnNameSegment::Field(Ident::from_real_value(name))
232 }
233}
234
235#[derive(Clone, Debug)]
237pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
238
239impl ShowColumnName {
240 pub fn special(name: &str) -> Self {
243 ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
244 name,
245 ))])
246 }
247}
248
249impl WithDataType for ShowColumnName {
250 fn default_data_type() -> DataType {
251 DataType::Varchar
252 }
253}
254
255impl ToOwnedDatum for ShowColumnName {
256 fn to_owned_datum(self) -> Datum {
257 use std::fmt::Write;
258
259 let mut s = String::new();
260 for segment in self.0 {
261 match segment {
262 ShowColumnNameSegment::Field(ident) => {
263 if !s.is_empty() {
264 s.push('.');
266 }
267 write!(s, "{ident}").unwrap();
268 }
269 ShowColumnNameSegment::ListElement => {
270 s.push_str("[1]");
271 }
272 }
273 }
274 s.to_owned_datum()
275 }
276}
277
278impl ShowColumnRow {
279 fn flatten(
282 name: ShowColumnName,
283 data_type: DataType,
284 is_hidden: bool,
285 description: Option<String>,
286 ) -> Vec<Self> {
287 let r#type = match &data_type {
289 DataType::Struct(_) => "struct".to_owned(),
290 DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
291 d => d.to_string(),
292 };
293
294 let mut rows = vec![ShowColumnRow {
295 name: name.clone(),
296 r#type,
297 is_hidden: Some(is_hidden.to_string()),
298 description,
299 }];
300
301 match data_type {
302 DataType::Struct(st) => {
303 rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
304 let mut name = name.clone();
305 name.0.push(ShowColumnNameSegment::field(field_name));
306 Self::flatten(name, field_data_type.clone(), is_hidden, None)
307 }));
308 }
309
310 DataType::List(list) if let DataType::Struct(_) = list.elem() => {
311 let mut name = name.clone();
312 name.0.push(ShowColumnNameSegment::ListElement);
313 rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
314 }
315
316 _ => {}
317 }
318
319 rows
320 }
321
322 pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
323 Self::flatten(
324 ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
325 col.column_desc.data_type,
326 col.is_hidden,
327 col.column_desc.description,
328 )
329 }
330}
331
332#[derive(Fields)]
333#[fields(style = "Title Case")]
334struct ShowConnectionRow {
335 name: ObjectNameField,
336 r#type: String,
337 properties: String,
338}
339
340#[derive(Fields)]
341#[fields(style = "Title Case")]
342struct ShowFunctionRow {
343 name: ObjectNameField,
344 arguments: String,
345 return_type: String,
346 language: String,
347 link: Option<String>,
348}
349
350#[derive(Fields)]
351#[fields(style = "Title Case")]
352struct ShowIndexRow {
353 name: String,
354 on: String,
355 key: String,
356 include: String,
357 distributed_by: String,
358}
359
360impl From<Arc<IndexCatalog>> for ShowIndexRow {
361 fn from(index: Arc<IndexCatalog>) -> Self {
362 let index_display = index.display();
363 ShowIndexRow {
364 name: index.name.clone(),
365 on: index.primary_table.name.clone(),
366 key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
367 include: display_comma_separated(&index_display.include_columns).to_string(),
368 distributed_by: display_comma_separated(&index_display.distributed_by_columns)
369 .to_string(),
370 }
371 }
372}
373
374#[derive(Fields)]
375#[fields(style = "Title Case")]
376struct ShowClusterRow {
377 id: i32,
378 addr: String,
379 r#type: String,
380 state: String,
381 parallelism: Option<i32>,
382 is_streaming: Option<bool>,
383 is_serving: Option<bool>,
384 is_unschedulable: Option<bool>,
385 started_at: Option<Timestamptz>,
386}
387
388#[derive(Fields)]
389#[fields(style = "Title Case")]
390struct ShowJobRow {
391 id: i64,
392 statement: String,
393 create_type: String,
394 progress: String,
395}
396
397#[derive(Fields)]
398#[fields(style = "Title Case")]
399struct ShowProcessListRow {
400 worker_id: String,
401 id: String,
402 user: String,
403 host: String,
404 database: String,
405 time: Option<String>,
406 info: Option<String>,
407}
408
409#[derive(Fields)]
410#[fields(style = "Title Case")]
411struct ShowCreateObjectRow {
412 name: String,
413 create_sql: String,
414}
415
416#[derive(Fields)]
417#[fields(style = "Title Case")]
418struct ShowSubscriptionRow {
419 name: ObjectNameField,
420 retention_seconds: i64,
421}
422
423#[derive(Fields)]
424#[fields(style = "Title Case")]
425struct ShowCursorRow {
426 session_id: String,
427 user: String,
428 host: String,
429 database: String,
430 cursor_name: String,
431}
432
433#[derive(Fields)]
434#[fields(style = "Title Case")]
435struct ShowSubscriptionCursorRow {
436 session_id: String,
437 user: String,
438 host: String,
439 database: String,
440 cursor_name: String,
441 subscription_name: String,
442 state: String,
443 idle_duration_ms: i64,
444}
445
446pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
448 fields_to_descriptors(match objects {
449 ShowObject::Database => ShowDatabaseRow::fields(),
450 ShowObject::Schema => ShowSchemaRow::fields(),
451 ShowObject::Columns { .. } => ShowColumnRow::fields(),
452 ShowObject::Connection { .. } => ShowConnectionRow::fields(),
453 ShowObject::Function { .. } => ShowFunctionRow::fields(),
454 ShowObject::Indexes { .. } => ShowIndexRow::fields(),
455 ShowObject::Cluster => ShowClusterRow::fields(),
456 ShowObject::Jobs => ShowJobRow::fields(),
457 ShowObject::ProcessList => ShowProcessListRow::fields(),
458 ShowObject::Cursor => ShowCursorRow::fields(),
459 ShowObject::SubscriptionCursor => ShowSubscriptionCursorRow::fields(),
460 ShowObject::Subscription { .. } => ShowSubscriptionRow::fields(),
461
462 ShowObject::Table { .. }
463 | ShowObject::InternalTable { .. }
464 | ShowObject::View { .. }
465 | ShowObject::MaterializedView { .. }
466 | ShowObject::Source { .. }
467 | ShowObject::Sink { .. }
468 | ShowObject::Secret { .. } => ShowObjectRow::fields(),
469 })
470}
471
472pub async fn handle_show_object(
473 handler_args: HandlerArgs,
474 command: ShowObject,
475 filter: Option<ShowStatementFilter>,
476) -> Result<RwPgResponse> {
477 let session = handler_args.session;
478
479 if let Some(ShowStatementFilter::Where(..)) = filter {
480 bail_not_implemented!("WHERE clause in SHOW statement");
481 }
482
483 let catalog_reader = session.env().catalog_reader();
484 let user_reader = session.env().user_info_reader();
485 let get_catalog_reader = || {
486 let reader = catalog_reader.read_guard();
487 let user_reader = user_reader.read_guard();
488 let current_user = user_reader
489 .get_user_by_name(&session.user_name())
490 .expect("user not found")
491 .clone();
492 (reader, current_user)
493 };
494
495 let rows: Vec<ShowObjectRow> = match command {
496 ShowObject::Table { schema } => {
497 let (reader, current_user) = get_catalog_reader();
498 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
499 schema
500 .iter_user_table()
501 .map(|t| with_schema_name(&schema.name, &t.name))
502 .map(|name| ShowObjectRow { name })
503 .collect()
504 })
505 }
506 ShowObject::InternalTable { schema } => {
507 let (reader, current_user) = get_catalog_reader();
508 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
509 schema
510 .iter_internal_table()
511 .map(|t| with_schema_name(&schema.name, &t.name))
512 .map(|name| ShowObjectRow { name })
513 .collect()
514 })
515 }
516 ShowObject::Database => {
517 let reader = catalog_reader.read_guard();
518 let rows = reader
519 .get_all_database_names()
520 .into_iter()
521 .map(|name| ShowDatabaseRow { name });
522 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
523 .rows(rows)
524 .into());
525 }
526 ShowObject::Schema => {
527 let reader = catalog_reader.read_guard();
528 let rows = reader
529 .get_all_schema_names(&session.database())?
530 .into_iter()
531 .map(|name| ShowSchemaRow { name });
532 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
533 .rows(rows)
534 .into());
535 }
536 ShowObject::View { schema } => {
537 let (reader, current_user) = get_catalog_reader();
538 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
539 schema
540 .iter_view()
541 .map(|t| with_schema_name(&schema.name, &t.name))
542 .map(|name| ShowObjectRow { name })
543 .collect()
544 })
545 }
546 ShowObject::MaterializedView { schema } => {
547 let (reader, current_user) = get_catalog_reader();
548 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
549 schema
550 .iter_created_mvs()
551 .map(|t| with_schema_name(&schema.name, &t.name))
552 .map(|name| ShowObjectRow { name })
553 .collect()
554 })
555 }
556 ShowObject::Source { schema } => {
557 let (reader, current_user) = get_catalog_reader();
558 let mut sources =
559 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
560 schema
561 .iter_source()
562 .map(|t| with_schema_name(&schema.name, &t.name))
563 .map(|name| ShowObjectRow { name })
564 .collect()
565 });
566 sources.extend(
567 session
568 .temporary_source_manager()
569 .keys()
570 .into_iter()
571 .map(|t| with_schema_name("", &t))
572 .map(|name| ShowObjectRow { name }),
573 );
574 sources
575 }
576 ShowObject::Sink { schema } => {
577 let (reader, current_user) = get_catalog_reader();
578 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
579 schema
580 .iter_sink()
581 .map(|t| with_schema_name(&schema.name, &t.name))
582 .map(|name| ShowObjectRow { name })
583 .collect()
584 })
585 }
586 ShowObject::Subscription { schema } => {
587 let (reader, current_user) = get_catalog_reader();
588 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
589 schema
590 .iter_subscription()
591 .map(|t| ShowSubscriptionRow {
592 name: with_schema_name(&schema.name, &t.name),
593 retention_seconds: t.retention_seconds as i64,
594 })
595 .collect()
596 });
597 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
598 .rows(rows)
599 .into());
600 }
601 ShowObject::Secret { schema } => {
602 let (reader, current_user) = get_catalog_reader();
603 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
604 schema
605 .iter_secret()
606 .map(|t| with_schema_name(&schema.name, &t.name))
607 .map(|name| ShowObjectRow { name })
608 .collect()
609 })
610 }
611 ShowObject::Columns { table } => {
612 let Ok(columns) = get_columns_from_table(&session, table.clone())
613 .or(get_columns_from_sink(&session, table.clone()))
614 .or(get_columns_from_view(&session, table.clone()))
615 else {
616 return Err(CatalogError::NotFound(
617 "table, source, sink or view",
618 table.to_string(),
619 )
620 .into());
621 };
622
623 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
624 .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
625 .into());
626 }
627 ShowObject::Indexes { table } => {
628 let indexes = get_indexes_from_table(&session, table)?;
629
630 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
631 .rows(indexes.into_iter().map(ShowIndexRow::from))
632 .into());
633 }
634 ShowObject::Connection { schema } => {
635 let (reader, current_user) = get_catalog_reader();
636 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
637 schema.iter_connections()
638 .map(|c| {
639 let name = c.name.clone();
640 let r#type = match &c.info {
641 connection::Info::PrivateLinkService(_) => {
642 PRIVATELINK_CONNECTION.to_owned()
643 },
644 connection::Info::ConnectionParams(params) => {
645 params.get_connection_type().unwrap().as_str_name().to_owned()
646 }
647 };
648 let source_names = schema
649 .get_source_ids_by_connection(c.id)
650 .unwrap_or_default()
651 .into_iter()
652 .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
653 .collect_vec();
654 let sink_names = schema
655 .get_sink_ids_by_connection(c.id)
656 .unwrap_or_default()
657 .into_iter()
658 .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
659 .collect_vec();
660 let properties = match &c.info {
661 connection::Info::PrivateLinkService(i) => {
662 format!(
663 "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
664 i.get_provider().unwrap().as_str_name(),
665 i.service_name,
666 i.endpoint_id,
667 serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
668 serde_json::to_string(&source_names).unwrap(),
669 serde_json::to_string(&sink_names).unwrap(),
670 )
671 }
672 connection::Info::ConnectionParams(params) => {
673 print_connection_params(&session.database(), params, &reader)
675 }
676 };
677 ShowConnectionRow {
678 name: with_schema_name(&schema.name, &name),
679 r#type,
680 properties,
681 }
682 }).collect_vec()
683 });
684 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
685 .rows(rows)
686 .into());
687 }
688 ShowObject::Function { schema } => {
689 let (reader, current_user) = get_catalog_reader();
690 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
691 schema
692 .iter_function()
693 .map(|t| ShowFunctionRow {
694 name: with_schema_name(&schema.name, &t.name),
695 arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
696 return_type: t.return_type.to_string(),
697 language: t.language.clone(),
698 link: t.link.clone(),
699 })
700 .collect()
701 });
702 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
703 .rows(rows)
704 .into());
705 }
706 ShowObject::Cluster => {
707 let workers = session.env().meta_client().list_all_nodes().await?;
708 let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
709 let addr: HostAddr = worker.host.as_ref().unwrap().into();
710 let property = worker.property.as_ref();
711 ShowClusterRow {
712 id: worker.id as _,
713 addr: addr.to_string(),
714 r#type: worker.get_type().unwrap().as_str_name().into(),
715 state: worker.get_state().unwrap().as_str_name().to_owned(),
716 parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
717 is_streaming: property.map(|p| p.is_streaming),
718 is_serving: property.map(|p| p.is_serving),
719 is_unschedulable: property.map(|p| p.is_unschedulable),
720 started_at: worker
721 .started_at
722 .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
723 }
724 });
725 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
726 .rows(rows)
727 .into());
728 }
729 ShowObject::Jobs => {
730 let resp = session.env().meta_client().get_ddl_progress().await?;
731 let rows = resp.into_iter().map(|job| ShowJobRow {
732 id: job.id as i64,
733 statement: job.statement,
734 create_type: job.create_type,
735 progress: job.progress,
736 });
737 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
738 .rows(rows)
739 .into());
740 }
741 ShowObject::ProcessList => {
742 let rows = show_process_list_impl(
743 session.env().frontend_client_pool(),
744 session.env().worker_node_manager_ref(),
745 )
746 .await;
747 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
748 .rows(rows)
749 .into());
750 }
751 ShowObject::Cursor => {
752 let sessions = session
753 .env()
754 .sessions_map()
755 .read()
756 .values()
757 .cloned()
758 .collect_vec();
759 let mut rows = vec![];
760 for s in sessions {
761 let session_id = format!("{}", s.id().0);
762 let user = s.user_name();
763 let host = format!("{}", s.peer_addr());
764 let database = s.database();
765
766 s.get_cursor_manager()
767 .iter_query_cursors(|cursor_name: &String, _| {
768 rows.push(ShowCursorRow {
769 session_id: session_id.clone(),
770 user: user.clone(),
771 host: host.clone(),
772 database: database.clone(),
773 cursor_name: cursor_name.to_owned(),
774 });
775 })
776 .await;
777 }
778 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
779 .rows(rows)
780 .into());
781 }
782 ShowObject::SubscriptionCursor => {
783 let sessions = session
784 .env()
785 .sessions_map()
786 .read()
787 .values()
788 .cloned()
789 .collect_vec();
790 let mut rows = vec![];
791 for s in sessions {
792 let session_id = format!("{}", s.id().0);
793 let user = s.user_name();
794 let host = format!("{}", s.peer_addr());
795 let database = s.database().clone();
796
797 s.get_cursor_manager()
798 .iter_subscription_cursors(
799 |cursor_name: &String, cursor: &SubscriptionCursor| {
800 rows.push(ShowSubscriptionCursorRow {
801 session_id: session_id.clone(),
802 user: user.clone(),
803 host: host.clone(),
804 database: database.clone(),
805 cursor_name: cursor_name.to_owned(),
806 subscription_name: cursor.subscription_name().to_owned(),
807 state: cursor.state_info_string(),
808 idle_duration_ms: cursor.idle_duration().as_millis() as i64,
809 });
810 },
811 )
812 .await;
813 }
814
815 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
816 .rows(rows)
817 .into());
818 }
819 };
820
821 let rows = rows.into_iter().filter(|row| match &filter {
823 Some(ShowStatementFilter::Like(pattern)) => like_default(&row.base_name(), pattern),
824 Some(ShowStatementFilter::ILike(pattern)) => i_like_default(&row.base_name(), pattern),
825 Some(ShowStatementFilter::Where(..)) => unreachable!(),
826 None => true,
827 });
828
829 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
830 .rows(rows)
831 .into())
832}
833
834pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
835 fields_to_descriptors(ShowCreateObjectRow::fields())
836}
837
838pub fn handle_show_create_object(
839 handle_args: HandlerArgs,
840 show_create_type: ShowCreateType,
841 name: ObjectName,
842) -> Result<RwPgResponse> {
843 let session = handle_args.session;
844 let catalog_reader = session.env().catalog_reader().read_guard();
845 let database = session.database();
846 let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
847 let search_path = session.config().search_path();
848 let user_name = &session.user_name();
849 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
850 let user_reader = session.env().user_info_reader().read_guard();
851 let current_user = user_reader
852 .get_user_by_name(user_name)
853 .expect("user not found");
854
855 let (sql, schema_name) = match show_create_type {
856 ShowCreateType::MaterializedView => {
857 let (mv, schema) = schema_path
858 .try_find(|schema_name| {
859 Ok::<_, RwError>(
860 catalog_reader
861 .get_schema_by_name(&database, schema_name)?
862 .get_created_table_by_name(&object_name)
863 .filter(|t| {
864 t.is_mview()
865 && has_access_to_object(current_user, t.id.table_id, t.owner)
866 }),
867 )
868 })?
869 .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
870 (mv.create_sql(), schema)
871 }
872 ShowCreateType::View => {
873 let (view, schema) =
874 catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
875 if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
876 return Err(CatalogError::NotFound("view", name.to_string()).into());
877 }
878 (view.create_sql(schema.to_owned()), schema)
879 }
880 ShowCreateType::Table => {
881 let (table, schema) = schema_path
882 .try_find(|schema_name| {
883 Ok::<_, RwError>(
884 catalog_reader
885 .get_schema_by_name(&database, schema_name)?
886 .get_created_table_by_name(&object_name)
887 .filter(|t| {
888 t.is_user_table()
889 && has_access_to_object(current_user, t.id.table_id, t.owner)
890 }),
891 )
892 })?
893 .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
894
895 (table.create_sql_purified(), schema)
896 }
897 ShowCreateType::Sink => {
898 let (sink, schema) =
899 catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
900 if !has_access_to_object(current_user, sink.id.sink_id, sink.owner.user_id) {
901 return Err(CatalogError::NotFound("sink", name.to_string()).into());
902 }
903 (sink.create_sql(), schema)
904 }
905 ShowCreateType::Source => {
906 let (source, schema) = schema_path
907 .try_find(|schema_name| {
908 Ok::<_, RwError>(
909 catalog_reader
910 .get_schema_by_name(&database, schema_name)?
911 .get_source_by_name(&object_name)
912 .filter(|s| {
913 s.associated_table_id.is_none()
914 && has_access_to_object(current_user, s.id, s.owner)
915 }),
916 )
917 })?
918 .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
919 (source.create_sql_purified(), schema)
920 }
921 ShowCreateType::Index => {
922 let (index, schema) = schema_path
923 .try_find(|schema_name| {
924 Ok::<_, RwError>(
925 catalog_reader
926 .get_schema_by_name(&database, schema_name)?
927 .get_created_table_by_name(&object_name)
928 .filter(|t| {
929 t.is_index()
930 && has_access_to_object(current_user, t.id.table_id, t.owner)
931 }),
932 )
933 })?
934 .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
935 (index.create_sql(), schema)
936 }
937 ShowCreateType::Function => {
938 bail_not_implemented!("show create on: {}", show_create_type);
939 }
940 ShowCreateType::Subscription => {
941 let (subscription, schema) =
942 catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
943 if !has_access_to_object(
944 current_user,
945 subscription.id.subscription_id,
946 subscription.owner.user_id,
947 ) {
948 return Err(CatalogError::NotFound("subscription", name.to_string()).into());
949 }
950 (subscription.create_sql(), schema)
951 }
952 };
953 let name = format!("{}.{}", schema_name, object_name);
954
955 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
956 .rows([ShowCreateObjectRow {
957 name,
958 create_sql: sql,
959 }])
960 .into())
961}
962
963async fn show_process_list_impl(
964 frontend_client_pool: FrontendClientPoolRef,
965 worker_node_manager: WorkerNodeManagerRef,
966) -> Vec<ShowProcessListRow> {
967 fn on_error(worker_id: u32, err_msg: String) -> Vec<ShowProcessListRow> {
969 vec![ShowProcessListRow {
970 worker_id: format!("{}", worker_id),
971 id: "".to_owned(),
972 user: "".to_owned(),
973 host: "".to_owned(),
974 database: "".to_owned(),
975 time: None,
976 info: Some(format!(
977 "Failed to show process list from worker {worker_id} due to: {err_msg}"
978 )),
979 }]
980 }
981 let futures = worker_node_manager
982 .list_frontend_nodes()
983 .into_iter()
984 .map(|worker| {
985 let frontend_client_pool_ = frontend_client_pool.clone();
986 async move {
987 let client = match frontend_client_pool_.get(&worker).await {
988 Ok(client) => client,
989 Err(e) => {
990 return on_error(worker.id, format!("{}", e.as_report()));
991 }
992 };
993 let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
994 Ok(resp) => resp,
995 Err(e) => {
996 return on_error(worker.id, format!("{}", e.as_report()));
997 }
998 };
999 resp.into_inner()
1000 .running_sqls
1001 .into_iter()
1002 .map(|sql| ShowProcessListRow {
1003 worker_id: format!("{}", worker.id),
1004 id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
1005 user: sql.user_name,
1006 host: sql.peer_addr,
1007 database: sql.database,
1008 time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
1009 info: sql
1010 .sql
1011 .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
1012 })
1013 .collect_vec()
1014 }
1015 })
1016 .collect_vec();
1017 join_all(futures).await.into_iter().flatten().collect()
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022 use std::ops::Index;
1023
1024 use futures_async_stream::for_await;
1025
1026 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1027
1028 #[tokio::test]
1029 async fn test_show_source() {
1030 let frontend = LocalFrontend::new(Default::default()).await;
1031
1032 let sql = r#"CREATE SOURCE t1 (column1 varchar)
1033 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1034 FORMAT PLAIN ENCODE JSON"#;
1035 frontend.run_sql(sql).await.unwrap();
1036
1037 let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
1038 rows.sort();
1039 assert_eq!(rows, vec!["Row([Some(b\"public.t1\")])".to_owned(),]);
1040 }
1041
1042 #[tokio::test]
1043 async fn test_show_column() {
1044 let proto_file = create_proto_file(PROTO_FILE_DATA);
1045 let sql = format!(
1046 r#"CREATE SOURCE t
1047 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1048 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1049 proto_file.path().to_str().unwrap()
1050 );
1051 let frontend = LocalFrontend::new(Default::default()).await;
1052 frontend.run_sql(sql).await.unwrap();
1053
1054 let sql = "show columns from t";
1055 let mut pg_response = frontend.run_sql(sql).await.unwrap();
1056
1057 let mut columns = Vec::new();
1058 #[for_await]
1059 for row_set in pg_response.values_stream() {
1060 let row_set = row_set.unwrap();
1061 for row in row_set {
1062 columns.push((
1063 std::str::from_utf8(row.index(0).as_ref().unwrap())
1064 .unwrap()
1065 .to_owned(),
1066 std::str::from_utf8(row.index(1).as_ref().unwrap())
1067 .unwrap()
1068 .to_owned(),
1069 ));
1070 }
1071 }
1072
1073 expect_test::expect![[r#"
1074 [
1075 (
1076 "id",
1077 "integer",
1078 ),
1079 (
1080 "country",
1081 "struct",
1082 ),
1083 (
1084 "country.address",
1085 "character varying",
1086 ),
1087 (
1088 "country.city",
1089 "struct",
1090 ),
1091 (
1092 "country.city.address",
1093 "character varying",
1094 ),
1095 (
1096 "country.city.zipcode",
1097 "character varying",
1098 ),
1099 (
1100 "country.zipcode",
1101 "character varying",
1102 ),
1103 (
1104 "zipcode",
1105 "bigint",
1106 ),
1107 (
1108 "rate",
1109 "real",
1110 ),
1111 (
1112 "_rw_kafka_timestamp",
1113 "timestamp with time zone",
1114 ),
1115 (
1116 "_rw_kafka_partition",
1117 "character varying",
1118 ),
1119 (
1120 "_rw_kafka_offset",
1121 "character varying",
1122 ),
1123 (
1124 "_row_id",
1125 "serial",
1126 ),
1127 ]
1128 "#]]
1129 .assert_debug_eq(&columns);
1130 }
1131}