1use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
23use risingwave_common::bail_not_implemented;
24use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
25use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
26use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
27use risingwave_common::util::addr::HostAddr;
28use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
29use risingwave_expr::scalar::like::{i_like_default, like_default};
30use risingwave_pb::catalog::connection;
31use risingwave_pb::frontend_service::{
32 GetAllCursorsRequest, GetAllSubCursorsRequest, GetRunningSqlsRequest,
33};
34use risingwave_pb::id::WorkerId;
35use risingwave_rpc_client::FrontendClientPoolRef;
36use risingwave_sqlparser::ast::{
37 Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
38};
39use thiserror_ext::AsReport;
40
41use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
42use crate::binder::{Binder, Relation};
43use crate::catalog::catalog_service::CatalogReadGuard;
44use crate::catalog::root_catalog::SchemaPath;
45use crate::catalog::schema_catalog::SchemaCatalog;
46use crate::catalog::{CatalogError, IndexCatalog};
47use crate::error::{Result, RwError};
48use crate::handler::HandlerArgs;
49use crate::handler::create_connection::print_connection_params;
50use crate::session::{SessionImpl, WorkerProcessId};
51use crate::user::has_access_to_object;
52use crate::user::user_catalog::UserCatalog;
53
54pub fn get_columns_from_table(
55 session: &SessionImpl,
56 table_name: ObjectName,
57) -> Result<Vec<ColumnCatalog>> {
58 let mut binder = Binder::new_for_system(session);
59 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
60 let column_catalogs = match relation {
61 Relation::Source(s) => s.catalog.columns,
62 Relation::BaseTable(t) => t.table_catalog.columns.clone(),
63 Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
64 _ => {
65 return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
66 }
67 };
68
69 Ok(column_catalogs)
70}
71
72pub fn get_columns_from_sink(
73 session: &SessionImpl,
74 sink_name: ObjectName,
75) -> Result<Vec<ColumnCatalog>> {
76 let binder = Binder::new_for_system(session);
77 let sink = binder.bind_sink_by_name(sink_name)?;
78 Ok(sink.sink_catalog.full_columns().to_vec())
79}
80
81pub fn get_columns_from_view(
82 session: &SessionImpl,
83 view_name: ObjectName,
84) -> Result<Vec<ColumnCatalog>> {
85 let binder = Binder::new_for_system(session);
86 let view = binder.bind_view_by_name(view_name)?;
87
88 Ok(view
89 .view_catalog
90 .columns
91 .iter()
92 .enumerate()
93 .map(|(idx, field)| ColumnCatalog {
94 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
95 is_hidden: false,
96 })
97 .collect())
98}
99
100pub fn get_indexes_from_table(
101 session: &SessionImpl,
102 table_name: ObjectName,
103) -> Result<Vec<Arc<IndexCatalog>>> {
104 let mut binder = Binder::new_for_system(session);
105 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
106 let indexes = match relation {
107 Relation::BaseTable(t) => t.table_indexes,
108 _ => {
109 return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
110 }
111 };
112
113 Ok(indexes)
114}
115
116fn schema_or_search_path(
117 session: &Arc<SessionImpl>,
118 schema: &Option<Ident>,
119 search_path: &SearchPath,
120) -> Vec<String> {
121 if let Some(s) = schema {
122 vec![s.real_value()]
123 } else {
124 search_path
125 .real_path()
126 .iter()
127 .map(|s| {
128 if s.eq(USER_NAME_WILD_CARD) {
129 session.user_name()
130 } else {
131 s.clone()
132 }
133 })
134 .collect()
135 }
136}
137
138fn iter_schema_items<F, T>(
139 session: &Arc<SessionImpl>,
140 schema: &Option<Ident>,
141 reader: &CatalogReadGuard,
142 current_user: &UserCatalog,
143 mut f: F,
144) -> Vec<T>
145where
146 F: FnMut(&SchemaCatalog) -> Vec<T>,
147{
148 let search_path = session.config().search_path();
149
150 schema_or_search_path(session, schema, &search_path)
151 .into_iter()
152 .filter_map(|schema| {
153 if let Ok(schema_catalog) =
154 reader.get_schema_by_name(&session.database(), schema.as_ref())
155 && (current_user.is_super
156 || current_user.has_schema_usage_privilege(schema_catalog.id()))
157 {
158 Some(schema_catalog)
159 } else {
160 None
161 }
162 })
163 .flat_map(|s| f(s).into_iter())
164 .collect()
165}
166
167struct ObjectNameField(ObjectName);
170
171fn with_schema_name(schema_name: &str, name: &str) -> ObjectNameField {
172 if schema_name.is_empty() {
173 ObjectNameField(ObjectName(vec![name.into()]))
174 } else {
175 ObjectNameField(ObjectName(vec![schema_name.into(), name.into()]))
176 }
177}
178
179impl WithDataType for ObjectNameField {
180 fn default_data_type() -> DataType {
181 DataType::Varchar
182 }
183}
184
185impl ToOwnedDatum for ObjectNameField {
186 fn to_owned_datum(self) -> Datum {
187 Some(self.0.to_string().into())
188 }
189}
190
191#[derive(Fields)]
192#[fields(style = "Title Case")]
193struct ShowObjectRow {
194 name: ObjectNameField,
195}
196
197impl ShowObjectRow {
198 fn base_name(&self) -> String {
199 self.name.0.base_name()
200 }
201}
202
203#[derive(Fields)]
204#[fields(style = "Title Case")]
205struct ShowDatabaseRow {
206 name: String,
207}
208
209#[derive(Fields)]
210#[fields(style = "Title Case")]
211struct ShowSchemaRow {
212 name: String,
213}
214
215#[derive(Fields)]
216#[fields(style = "Title Case")]
217pub struct ShowColumnRow {
218 pub name: ShowColumnName,
219 pub r#type: String,
220 pub is_hidden: Option<String>, pub description: Option<String>,
222}
223
224#[derive(Clone, Debug)]
225enum ShowColumnNameSegment {
226 Field(Ident),
227 ListElement,
228}
229
230impl ShowColumnNameSegment {
231 pub fn field(name: &str) -> Self {
232 ShowColumnNameSegment::Field(Ident::from_real_value(name))
233 }
234}
235
236#[derive(Clone, Debug)]
238pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
239
240impl ShowColumnName {
241 pub fn special(name: &str) -> Self {
244 ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
245 name,
246 ))])
247 }
248}
249
250impl WithDataType for ShowColumnName {
251 fn default_data_type() -> DataType {
252 DataType::Varchar
253 }
254}
255
256impl ToOwnedDatum for ShowColumnName {
257 fn to_owned_datum(self) -> Datum {
258 use std::fmt::Write;
259
260 let mut s = String::new();
261 for segment in self.0 {
262 match segment {
263 ShowColumnNameSegment::Field(ident) => {
264 if !s.is_empty() {
265 s.push('.');
267 }
268 write!(s, "{ident}").unwrap();
269 }
270 ShowColumnNameSegment::ListElement => {
271 s.push_str("[1]");
272 }
273 }
274 }
275 s.to_owned_datum()
276 }
277}
278
279impl ShowColumnRow {
280 fn flatten(
283 name: ShowColumnName,
284 data_type: DataType,
285 is_hidden: bool,
286 description: Option<String>,
287 ) -> Vec<Self> {
288 let r#type = match &data_type {
290 DataType::Struct(_) => "struct".to_owned(),
291 DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
292 d => d.to_string(),
293 };
294
295 let mut rows = vec![ShowColumnRow {
296 name: name.clone(),
297 r#type,
298 is_hidden: Some(is_hidden.to_string()),
299 description,
300 }];
301
302 match data_type {
303 DataType::Struct(st) => {
304 rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
305 let mut name = name.clone();
306 name.0.push(ShowColumnNameSegment::field(field_name));
307 Self::flatten(name, field_data_type.clone(), is_hidden, None)
308 }));
309 }
310
311 DataType::List(list) if let DataType::Struct(_) = list.elem() => {
312 let mut name = name.clone();
313 name.0.push(ShowColumnNameSegment::ListElement);
314 rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
315 }
316
317 _ => {}
318 }
319
320 rows
321 }
322
323 pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
324 Self::flatten(
325 ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
326 col.column_desc.data_type,
327 col.is_hidden,
328 col.column_desc.description,
329 )
330 }
331}
332
333#[derive(Fields)]
334#[fields(style = "Title Case")]
335struct ShowConnectionRow {
336 name: ObjectNameField,
337 r#type: String,
338 properties: String,
339}
340
341#[derive(Fields)]
342#[fields(style = "Title Case")]
343struct ShowFunctionRow {
344 name: ObjectNameField,
345 arguments: String,
346 return_type: String,
347 language: String,
348 link: Option<String>,
349}
350
351#[derive(Fields)]
352#[fields(style = "Title Case")]
353struct ShowIndexRow {
354 name: String,
355 on: String,
356 key: String,
357 include: String,
358 distributed_by: String,
359}
360
361impl From<Arc<IndexCatalog>> for ShowIndexRow {
362 fn from(index: Arc<IndexCatalog>) -> Self {
363 let index_display = index.display();
364 ShowIndexRow {
365 name: index.name.clone(),
366 on: index.primary_table.name.clone(),
367 key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
368 include: display_comma_separated(&index_display.include_columns).to_string(),
369 distributed_by: display_comma_separated(&index_display.distributed_by_columns)
370 .to_string(),
371 }
372 }
373}
374
375#[derive(Fields)]
376#[fields(style = "Title Case")]
377struct ShowClusterRow {
378 id: WorkerId,
379 addr: String,
380 r#type: String,
381 state: String,
382 parallelism: Option<i32>,
383 is_streaming: Option<bool>,
384 is_serving: Option<bool>,
385 is_unschedulable: Option<bool>,
386 started_at: Option<Timestamptz>,
387}
388
389#[derive(Fields)]
390#[fields(style = "Title Case")]
391struct ShowJobRow {
392 id: i64,
393 statement: String,
394 create_type: String,
395 progress: String,
396}
397
398#[derive(Fields)]
399#[fields(style = "Title Case")]
400struct ShowProcessListRow {
401 worker_id: String,
402 id: String,
403 user: String,
404 host: String,
405 database: String,
406 time: Option<String>,
407 info: Option<String>,
408}
409
410#[derive(Fields)]
411#[fields(style = "Title Case")]
412struct ShowCreateObjectRow {
413 name: String,
414 create_sql: String,
415}
416
417#[derive(Fields)]
418#[fields(style = "Title Case")]
419struct ShowSubscriptionRow {
420 name: ObjectNameField,
421 retention_seconds: i64,
422}
423
424#[derive(Fields)]
425#[fields(style = "Title Case")]
426struct ShowCursorRow {
427 worker_id: String,
428 session_id: String,
429 user: String,
430 host: String,
431 database: String,
432 cursor_name: String,
433 info: Option<String>,
434}
435
436#[derive(Fields)]
437#[fields(style = "Title Case")]
438struct ShowSubscriptionCursorRow {
439 worker_id: String,
440 session_id: String,
441 user: String,
442 host: String,
443 database: String,
444 cursor_name: String,
445 subscription_name: String,
446 state: String,
447 idle_duration_ms: i64,
448 info: Option<String>,
449}
450
451pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
453 fields_to_descriptors(match objects {
454 ShowObject::Database => ShowDatabaseRow::fields(),
455 ShowObject::Schema => ShowSchemaRow::fields(),
456 ShowObject::Columns { .. } => ShowColumnRow::fields(),
457 ShowObject::Connection { .. } => ShowConnectionRow::fields(),
458 ShowObject::Function { .. } => ShowFunctionRow::fields(),
459 ShowObject::Indexes { .. } => ShowIndexRow::fields(),
460 ShowObject::Cluster => ShowClusterRow::fields(),
461 ShowObject::Jobs => ShowJobRow::fields(),
462 ShowObject::ProcessList => ShowProcessListRow::fields(),
463 ShowObject::Cursor => ShowCursorRow::fields(),
464 ShowObject::SubscriptionCursor => ShowSubscriptionCursorRow::fields(),
465 ShowObject::Subscription { .. } => ShowSubscriptionRow::fields(),
466
467 ShowObject::Table { .. }
468 | ShowObject::InternalTable { .. }
469 | ShowObject::View { .. }
470 | ShowObject::MaterializedView { .. }
471 | ShowObject::Source { .. }
472 | ShowObject::Sink { .. }
473 | ShowObject::Secret { .. } => ShowObjectRow::fields(),
474 })
475}
476
477pub async fn handle_show_object(
478 handler_args: HandlerArgs,
479 command: ShowObject,
480 filter: Option<ShowStatementFilter>,
481) -> Result<RwPgResponse> {
482 let session = handler_args.session;
483
484 if let Some(ShowStatementFilter::Where(..)) = filter {
485 bail_not_implemented!("WHERE clause in SHOW statement");
486 }
487
488 let catalog_reader = session.env().catalog_reader();
489 let user_reader = session.env().user_info_reader();
490 let get_catalog_reader = || {
491 let reader = catalog_reader.read_guard();
492 let user_reader = user_reader.read_guard();
493 let current_user = user_reader
494 .get_user_by_name(&session.user_name())
495 .expect("user not found")
496 .clone();
497 (reader, current_user)
498 };
499
500 let rows: Vec<ShowObjectRow> = match command {
501 ShowObject::Table { schema } => {
502 let (reader, current_user) = get_catalog_reader();
503 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
504 schema
505 .iter_user_table()
506 .map(|t| with_schema_name(&schema.name, &t.name))
507 .map(|name| ShowObjectRow { name })
508 .collect()
509 })
510 }
511 ShowObject::InternalTable { schema } => {
512 let (reader, current_user) = get_catalog_reader();
513 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
514 schema
515 .iter_internal_table()
516 .map(|t| with_schema_name(&schema.name, &t.name))
517 .map(|name| ShowObjectRow { name })
518 .collect()
519 })
520 }
521 ShowObject::Database => {
522 let reader = catalog_reader.read_guard();
523 let rows = reader
524 .get_all_database_names()
525 .into_iter()
526 .map(|name| ShowDatabaseRow { name });
527 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
528 .rows(rows)
529 .into());
530 }
531 ShowObject::Schema => {
532 let reader = catalog_reader.read_guard();
533 let rows = reader
534 .get_all_schema_names(&session.database())?
535 .into_iter()
536 .map(|name| ShowSchemaRow { name });
537 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
538 .rows(rows)
539 .into());
540 }
541 ShowObject::View { schema } => {
542 let (reader, current_user) = get_catalog_reader();
543 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
544 schema
545 .iter_view()
546 .map(|t| with_schema_name(&schema.name, &t.name))
547 .map(|name| ShowObjectRow { name })
548 .collect()
549 })
550 }
551 ShowObject::MaterializedView { schema } => {
552 let (reader, current_user) = get_catalog_reader();
553 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
554 schema
555 .iter_created_mvs()
556 .map(|t| with_schema_name(&schema.name, &t.name))
557 .map(|name| ShowObjectRow { name })
558 .collect()
559 })
560 }
561 ShowObject::Source { schema } => {
562 let (reader, current_user) = get_catalog_reader();
563 let mut sources =
564 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
565 schema
566 .iter_source()
567 .map(|t| with_schema_name(&schema.name, &t.name))
568 .map(|name| ShowObjectRow { name })
569 .collect()
570 });
571 sources.extend(
572 session
573 .temporary_source_manager()
574 .keys()
575 .into_iter()
576 .map(|t| with_schema_name("", &t))
577 .map(|name| ShowObjectRow { name }),
578 );
579 sources
580 }
581 ShowObject::Sink { schema } => {
582 let (reader, current_user) = get_catalog_reader();
583 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
584 schema
585 .iter_sink()
586 .map(|t| with_schema_name(&schema.name, &t.name))
587 .map(|name| ShowObjectRow { name })
588 .collect()
589 })
590 }
591 ShowObject::Subscription { schema } => {
592 let (reader, current_user) = get_catalog_reader();
593 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
594 schema
595 .iter_subscription()
596 .map(|t| ShowSubscriptionRow {
597 name: with_schema_name(&schema.name, &t.name),
598 retention_seconds: t.retention_seconds as i64,
599 })
600 .collect()
601 });
602 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
603 .rows(rows)
604 .into());
605 }
606 ShowObject::Secret { schema } => {
607 let (reader, current_user) = get_catalog_reader();
608 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
609 schema
610 .iter_secret()
611 .map(|t| with_schema_name(&schema.name, &t.name))
612 .map(|name| ShowObjectRow { name })
613 .collect()
614 })
615 }
616 ShowObject::Columns { table } => {
617 let Ok(columns) = get_columns_from_table(&session, table.clone())
618 .or(get_columns_from_sink(&session, table.clone()))
619 .or(get_columns_from_view(&session, table.clone()))
620 else {
621 return Err(CatalogError::not_found(
622 "table, source, sink or view",
623 table.to_string(),
624 )
625 .into());
626 };
627
628 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
629 .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
630 .into());
631 }
632 ShowObject::Indexes { table } => {
633 let indexes = get_indexes_from_table(&session, table)?;
634
635 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
636 .rows(indexes.into_iter().map(ShowIndexRow::from))
637 .into());
638 }
639 ShowObject::Connection { schema } => {
640 let (reader, current_user) = get_catalog_reader();
641 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
642 schema.iter_connections()
643 .map(|c| {
644 let name = c.name.clone();
645 let r#type = match &c.info {
646 connection::Info::PrivateLinkService(_) => {
647 PRIVATELINK_CONNECTION.to_owned()
648 },
649 connection::Info::ConnectionParams(params) => {
650 params.get_connection_type().unwrap().as_str_name().to_owned()
651 }
652 };
653 let source_names = schema
654 .get_source_ids_by_connection(c.id)
655 .unwrap_or_default()
656 .into_iter()
657 .filter_map(|sid| schema.get_source_by_id(sid).map(|catalog| catalog.name.as_str()))
658 .collect_vec();
659 let sink_names = schema
660 .get_sink_ids_by_connection(c.id)
661 .unwrap_or_default()
662 .into_iter()
663 .filter_map(|sid| schema.get_sink_by_id(sid).map(|catalog| catalog.name.as_str()))
664 .collect_vec();
665 let properties = match &c.info {
666 connection::Info::PrivateLinkService(i) => {
667 format!(
668 "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
669 i.get_provider().unwrap().as_str_name(),
670 i.service_name,
671 i.endpoint_id,
672 serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
673 serde_json::to_string(&source_names).unwrap(),
674 serde_json::to_string(&sink_names).unwrap(),
675 )
676 }
677 connection::Info::ConnectionParams(params) => {
678 print_connection_params(&session.database(), params, &reader)
680 }
681 };
682 ShowConnectionRow {
683 name: with_schema_name(&schema.name, &name),
684 r#type,
685 properties,
686 }
687 }).collect_vec()
688 });
689 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
690 .rows(rows)
691 .into());
692 }
693 ShowObject::Function { schema } => {
694 let (reader, current_user) = get_catalog_reader();
695 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
696 schema
697 .iter_function()
698 .map(|t| ShowFunctionRow {
699 name: with_schema_name(&schema.name, &t.name),
700 arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
701 return_type: t.return_type.to_string(),
702 language: t.language.clone(),
703 link: t.link.clone(),
704 })
705 .collect()
706 });
707 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
708 .rows(rows)
709 .into());
710 }
711 ShowObject::Cluster => {
712 let workers = session.env().meta_client().list_all_nodes().await?;
713 let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
714 let addr: HostAddr = worker.host.as_ref().unwrap().into();
715 let property = worker.property.as_ref();
716 ShowClusterRow {
717 id: worker.id,
718 addr: addr.to_string(),
719 r#type: worker.get_type().unwrap().as_str_name().into(),
720 state: worker.get_state().unwrap().as_str_name().to_owned(),
721 parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
722 is_streaming: property.map(|p| p.is_streaming),
723 is_serving: property.map(|p| p.is_serving),
724 is_unschedulable: property.map(|p| p.is_unschedulable),
725 started_at: worker
726 .started_at
727 .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
728 }
729 });
730 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
731 .rows(rows)
732 .into());
733 }
734 ShowObject::Jobs => {
735 let resp = session.env().meta_client().get_ddl_progress().await?;
736 let rows = resp.into_iter().map(|job| ShowJobRow {
737 id: job.id as i64,
738 statement: job.statement,
739 create_type: job.create_type,
740 progress: job.progress,
741 });
742 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
743 .rows(rows)
744 .into());
745 }
746 ShowObject::ProcessList => {
747 let rows = show_process_list_impl(
748 session.env().frontend_client_pool(),
749 session.env().worker_node_manager_ref(),
750 )
751 .await;
752 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
753 .rows(rows)
754 .into());
755 }
756 ShowObject::Cursor => {
757 let rows = show_all_cursors_impl(
758 session.env().frontend_client_pool(),
759 session.env().worker_node_manager_ref(),
760 )
761 .await;
762 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
763 .rows(rows)
764 .into());
765 }
766 ShowObject::SubscriptionCursor => {
767 let rows = show_all_sub_cursors_impl(
768 session.env().frontend_client_pool(),
769 session.env().worker_node_manager_ref(),
770 )
771 .await;
772
773 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
774 .rows(rows)
775 .into());
776 }
777 };
778
779 let rows = rows.into_iter().filter(|row| match &filter {
781 Some(ShowStatementFilter::Like(pattern)) => like_default(&row.base_name(), pattern),
782 Some(ShowStatementFilter::ILike(pattern)) => i_like_default(&row.base_name(), pattern),
783 Some(ShowStatementFilter::Where(..)) => unreachable!(),
784 None => true,
785 });
786
787 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
788 .rows(rows)
789 .into())
790}
791
792pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
793 fields_to_descriptors(ShowCreateObjectRow::fields())
794}
795
796pub fn handle_show_create_object(
797 handle_args: HandlerArgs,
798 show_create_type: ShowCreateType,
799 name: ObjectName,
800) -> Result<RwPgResponse> {
801 let session = handle_args.session;
802 let catalog_reader = session.env().catalog_reader().read_guard();
803 let database = session.database();
804 let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
805 let search_path = session.config().search_path();
806 let user_name = &session.user_name();
807 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
808 let user_reader = session.env().user_info_reader().read_guard();
809 let current_user = user_reader
810 .get_user_by_name(user_name)
811 .expect("user not found");
812
813 let (sql, schema_name) = match show_create_type {
814 ShowCreateType::MaterializedView => {
815 let (mv, schema) = schema_path
816 .try_find(|schema_name| {
817 Ok::<_, RwError>(
818 catalog_reader
819 .get_schema_by_name(&database, schema_name)?
820 .get_created_table_by_name(&object_name)
821 .filter(|t| {
822 t.is_mview() && has_access_to_object(current_user, t.id, t.owner)
823 }),
824 )
825 })?
826 .ok_or_else(|| CatalogError::not_found("materialized view", name.to_string()))?;
827 (mv.create_sql(), schema)
828 }
829 ShowCreateType::View => {
830 let (view, schema) =
831 catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
832 if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
833 return Err(CatalogError::not_found("view", name.to_string()).into());
834 }
835 (view.create_sql(schema.to_owned()), schema)
836 }
837 ShowCreateType::Table => {
838 let (table, schema) = schema_path
839 .try_find(|schema_name| {
840 Ok::<_, RwError>(
841 catalog_reader
842 .get_schema_by_name(&database, schema_name)?
843 .get_created_table_by_name(&object_name)
844 .filter(|t| {
845 t.is_user_table()
846 && has_access_to_object(current_user, t.id, t.owner)
847 }),
848 )
849 })?
850 .ok_or_else(|| CatalogError::not_found("table", name.to_string()))?;
851
852 (table.create_sql_purified(), schema)
853 }
854 ShowCreateType::Sink => {
855 let (sink, schema) =
856 catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
857 if !has_access_to_object(current_user, sink.id, sink.owner) {
858 return Err(CatalogError::not_found("sink", name.to_string()).into());
859 }
860 (sink.create_sql(), schema)
861 }
862 ShowCreateType::Source => {
863 let (source, schema) = schema_path
864 .try_find(|schema_name| {
865 Ok::<_, RwError>(
866 catalog_reader
867 .get_schema_by_name(&database, schema_name)?
868 .get_source_by_name(&object_name)
869 .filter(|s| {
870 s.associated_table_id.is_none()
871 && has_access_to_object(current_user, s.id, s.owner)
872 }),
873 )
874 })?
875 .ok_or_else(|| CatalogError::not_found("source", name.to_string()))?;
876 (source.create_sql_purified(), schema)
877 }
878 ShowCreateType::Index => {
879 let (index, schema) = schema_path
880 .try_find(|schema_name| {
881 Ok::<_, RwError>(
882 catalog_reader
883 .get_schema_by_name(&database, schema_name)?
884 .get_created_table_by_name(&object_name)
885 .filter(|t| {
886 t.is_index() && has_access_to_object(current_user, t.id, t.owner)
887 }),
888 )
889 })?
890 .ok_or_else(|| CatalogError::not_found("index", name.to_string()))?;
891 (index.create_sql(), schema)
892 }
893 ShowCreateType::Function => {
894 bail_not_implemented!("show create on: {}", show_create_type);
895 }
896 ShowCreateType::Subscription => {
897 let (subscription, schema) =
898 catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
899 if !has_access_to_object(current_user, subscription.id, subscription.owner) {
900 return Err(CatalogError::not_found("subscription", name.to_string()).into());
901 }
902 (subscription.create_sql(), schema)
903 }
904 };
905 let name = format!("{}.{}", schema_name, object_name);
906
907 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
908 .rows([ShowCreateObjectRow {
909 name,
910 create_sql: sql,
911 }])
912 .into())
913}
914
915async fn show_all_sub_cursors_impl(
916 frontend_client_pool: FrontendClientPoolRef,
917 worker_node_manager: WorkerNodeManagerRef,
918) -> Vec<ShowSubscriptionCursorRow> {
919 fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowSubscriptionCursorRow> {
920 vec![ShowSubscriptionCursorRow {
921 worker_id: format!("{}", worker_id),
922 session_id: "".to_owned(),
923 user: "".to_owned(),
924 host: "".to_owned(),
925 database: "".to_owned(),
926 cursor_name: "".to_owned(),
927 subscription_name: "".to_owned(),
928 state: "".to_owned(),
929 idle_duration_ms: 0,
930 info: Some(format!(
931 "Failed to show subscription cursor from worker {worker_id} due to: {err_msg}"
932 )),
933 }]
934 }
935
936 let futures = worker_node_manager
937 .list_frontend_nodes()
938 .into_iter()
939 .map(|worker| {
940 let frontend_client_pool_ = frontend_client_pool.clone();
941 async move {
942 let client = match frontend_client_pool_.get(&worker).await {
943 Ok(client) => client,
944 Err(e) => {
945 return on_error(worker.id, format!("{}", e.as_report()));
946 }
947 };
948
949 let resp = match client.get_all_sub_cursors(GetAllSubCursorsRequest {}).await {
950 Ok(resp) => resp,
951 Err(e) => {
952 return on_error(worker.id, format!("{}", e.as_report()));
953 }
954 };
955
956 resp.into_inner()
957 .subscription_cursors
958 .into_iter()
959 .flat_map(|sub_cursor| {
960 let worker_id = worker.id.to_string();
961 let session_id = sub_cursor.session_id.to_string();
962
963 let user = sub_cursor.user_name;
964 let host = sub_cursor.peer_addr;
965 let database = sub_cursor.database;
966
967 let size = sub_cursor.states.len();
968 let mut sub_cursors = vec![];
969 for index in 0..size {
970 sub_cursors.push(ShowSubscriptionCursorRow {
971 worker_id: worker_id.clone(),
972 session_id: session_id.clone(),
973 user: user.clone(),
974 host: host.clone(),
975 database: database.clone(),
976 cursor_name: sub_cursor.cursor_names[index].clone(),
977 subscription_name: sub_cursor.subscription_names[index].clone(),
978 state: sub_cursor.states[index].clone(),
979 idle_duration_ms: sub_cursor.idle_durations[index] as i64,
980 info: None,
981 })
982 }
983 sub_cursors
984 })
985 .collect_vec()
986 }
987 })
988 .collect_vec();
989 join_all(futures).await.into_iter().flatten().collect()
990}
991
992async fn show_all_cursors_impl(
993 frontend_client_pool: FrontendClientPoolRef,
994 worker_node_manager: WorkerNodeManagerRef,
995) -> Vec<ShowCursorRow> {
996 fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowCursorRow> {
997 vec![ShowCursorRow {
998 worker_id: format!("{}", worker_id),
999 session_id: "".to_owned(),
1000 user: "".to_owned(),
1001 host: "".to_owned(),
1002 database: "".to_owned(),
1003 cursor_name: "".to_owned(),
1004 info: Some(format!(
1005 "Failed to show cursor from worker {worker_id} due to: {err_msg}"
1006 )),
1007 }]
1008 }
1009 let futures = worker_node_manager
1010 .list_frontend_nodes()
1011 .into_iter()
1012 .map(|worker| {
1013 let frontend_client_pool_ = frontend_client_pool.clone();
1014 async move {
1015 let client = match frontend_client_pool_.get(&worker).await {
1016 Ok(client) => client,
1017 Err(e) => {
1018 return on_error(worker.id, format!("{}", e.as_report()));
1019 }
1020 };
1021
1022 let resp = match client.get_all_cursors(GetAllCursorsRequest {}).await {
1023 Ok(resp) => resp,
1024 Err(e) => {
1025 return on_error(worker.id, format!("{}", e.as_report()));
1026 }
1027 };
1028
1029 resp.into_inner()
1030 .all_cursors
1031 .into_iter()
1032 .flat_map(|cursors| {
1033 let worker_id = worker.id.to_string();
1034 let session_id = cursors.session_id.to_string();
1035
1036 let user = cursors.user_name;
1037 let host = cursors.peer_addr;
1038 let database = cursors.database;
1039
1040 cursors
1041 .cursor_names
1042 .into_iter()
1043 .map(|cursor_name| ShowCursorRow {
1044 worker_id: worker_id.clone(),
1045 session_id: session_id.clone(),
1046 user: user.clone(),
1047 host: host.clone(),
1048 database: database.clone(),
1049 cursor_name,
1050 info: None,
1051 })
1052 .collect_vec()
1053 })
1054 .collect_vec()
1055 }
1056 })
1057 .collect_vec();
1058 join_all(futures).await.into_iter().flatten().collect()
1059}
1060
1061async fn show_process_list_impl(
1062 frontend_client_pool: FrontendClientPoolRef,
1063 worker_node_manager: WorkerNodeManagerRef,
1064) -> Vec<ShowProcessListRow> {
1065 fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowProcessListRow> {
1067 vec![ShowProcessListRow {
1068 worker_id: format!("{}", worker_id),
1069 id: "".to_owned(),
1070 user: "".to_owned(),
1071 host: "".to_owned(),
1072 database: "".to_owned(),
1073 time: None,
1074 info: Some(format!(
1075 "Failed to show process list from worker {worker_id} due to: {err_msg}"
1076 )),
1077 }]
1078 }
1079 let futures = worker_node_manager
1080 .list_frontend_nodes()
1081 .into_iter()
1082 .map(|worker| {
1083 let frontend_client_pool_ = frontend_client_pool.clone();
1084 async move {
1085 let client = match frontend_client_pool_.get(&worker).await {
1086 Ok(client) => client,
1087 Err(e) => {
1088 return on_error(worker.id, format!("{}", e.as_report()));
1089 }
1090 };
1091 let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
1092 Ok(resp) => resp,
1093 Err(e) => {
1094 return on_error(worker.id, format!("{}", e.as_report()));
1095 }
1096 };
1097 resp.into_inner()
1098 .running_sqls
1099 .into_iter()
1100 .map(|sql| ShowProcessListRow {
1101 worker_id: format!("{}", worker.id),
1102 id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
1103 user: sql.user_name,
1104 host: sql.peer_addr,
1105 database: sql.database,
1106 time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
1107 info: sql
1108 .sql
1109 .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
1110 })
1111 .collect_vec()
1112 }
1113 })
1114 .collect_vec();
1115 join_all(futures).await.into_iter().flatten().collect()
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120 use std::ops::Index;
1121
1122 use futures_async_stream::for_await;
1123
1124 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1125
1126 #[tokio::test]
1127 async fn test_show_source() {
1128 let frontend = LocalFrontend::new(Default::default()).await;
1129
1130 let sql = r#"CREATE SOURCE t1 (column1 varchar)
1131 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1132 FORMAT PLAIN ENCODE JSON"#;
1133 frontend.run_sql(sql).await.unwrap();
1134
1135 let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
1136 rows.sort();
1137 assert_eq!(rows, vec!["Row([Some(b\"public.t1\")])".to_owned(),]);
1138 }
1139
1140 #[tokio::test]
1141 async fn test_show_column() {
1142 let proto_file = create_proto_file(PROTO_FILE_DATA);
1143 let sql = format!(
1144 r#"CREATE SOURCE t
1145 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1146 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1147 proto_file.path().to_str().unwrap()
1148 );
1149 let frontend = LocalFrontend::new(Default::default()).await;
1150 frontend.run_sql(sql).await.unwrap();
1151
1152 let sql = "show columns from t";
1153 let mut pg_response = frontend.run_sql(sql).await.unwrap();
1154
1155 let mut columns = Vec::new();
1156 #[for_await]
1157 for row_set in pg_response.values_stream() {
1158 let row_set = row_set.unwrap();
1159 for row in row_set {
1160 columns.push((
1161 std::str::from_utf8(row.index(0).as_ref().unwrap())
1162 .unwrap()
1163 .to_owned(),
1164 std::str::from_utf8(row.index(1).as_ref().unwrap())
1165 .unwrap()
1166 .to_owned(),
1167 ));
1168 }
1169 }
1170
1171 expect_test::expect![[r#"
1172 [
1173 (
1174 "id",
1175 "integer",
1176 ),
1177 (
1178 "country",
1179 "struct",
1180 ),
1181 (
1182 "country.address",
1183 "character varying",
1184 ),
1185 (
1186 "country.city",
1187 "struct",
1188 ),
1189 (
1190 "country.city.address",
1191 "character varying",
1192 ),
1193 (
1194 "country.city.zipcode",
1195 "character varying",
1196 ),
1197 (
1198 "country.zipcode",
1199 "character varying",
1200 ),
1201 (
1202 "zipcode",
1203 "bigint",
1204 ),
1205 (
1206 "rate",
1207 "real",
1208 ),
1209 (
1210 "_rw_kafka_timestamp",
1211 "timestamp with time zone",
1212 ),
1213 (
1214 "_rw_kafka_partition",
1215 "character varying",
1216 ),
1217 (
1218 "_rw_kafka_offset",
1219 "character varying",
1220 ),
1221 (
1222 "_row_id",
1223 "serial",
1224 ),
1225 ]
1226 "#]]
1227 .assert_debug_eq(&columns);
1228 }
1229}