1use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
23use risingwave_common::acl::AclMode;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::{
33 GetAllCursorsRequest, GetAllSubCursorsRequest, GetRunningSqlsRequest,
34};
35use risingwave_pb::id::WorkerId;
36use risingwave_rpc_client::FrontendClientPoolRef;
37use risingwave_sqlparser::ast::{
38 Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
39};
40use thiserror_ext::AsReport;
41
42use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
43use crate::binder::{Binder, Relation};
44use crate::catalog::catalog_service::CatalogReadGuard;
45use crate::catalog::root_catalog::SchemaPath;
46use crate::catalog::schema_catalog::SchemaCatalog;
47use crate::catalog::{CatalogError, IndexCatalog};
48use crate::error::{ErrorCode, Result, RwError};
49use crate::handler::HandlerArgs;
50use crate::handler::create_connection::print_connection_params_with_secret_visibility;
51use crate::handler::privilege::ObjectCheckItem;
52use crate::session::{SessionImpl, WorkerProcessId};
53use crate::user::user_catalog::UserCatalog;
54use crate::user::{has_access_to_object, has_schema_usage_privilege};
55
56pub fn get_columns_from_table(
57 session: &SessionImpl,
58 table_name: ObjectName,
59) -> Result<Vec<ColumnCatalog>> {
60 let mut binder = Binder::new_for_batch(session);
61 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
62 let column_catalogs = match relation {
63 Relation::Source(s) => s.catalog.columns,
64 Relation::BaseTable(t) => t.table_catalog.columns.clone(),
65 Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
66 _ => {
67 return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
68 }
69 };
70
71 Ok(column_catalogs)
72}
73
74pub fn get_columns_from_sink(
75 session: &SessionImpl,
76 sink_name: ObjectName,
77) -> Result<Vec<ColumnCatalog>> {
78 let binder = Binder::new_for_system(session);
79 let sink = binder.bind_sink_by_name(sink_name)?;
80
81 let catalog_reader = session.env().catalog_reader().read_guard();
82 let schema = catalog_reader
83 .get_schema_by_id(sink.sink_catalog.database_id, sink.sink_catalog.schema_id)?;
84 session.check_privileges(&[
85 ObjectCheckItem::new(schema.owner, AclMode::Usage, schema.name(), schema.id()),
86 ObjectCheckItem::new(
87 sink.sink_catalog.owner,
88 AclMode::Select,
89 sink.sink_catalog.name.clone(),
90 sink.sink_catalog.id,
91 ),
92 ])?;
93
94 Ok(sink.sink_catalog.full_columns().to_vec())
95}
96
97pub fn get_columns_from_view(
98 session: &SessionImpl,
99 view_name: ObjectName,
100) -> Result<Vec<ColumnCatalog>> {
101 let _authorized_relation =
102 Binder::new_for_batch(session).bind_relation_by_name(&view_name, None, None, false)?;
103 let binder = Binder::new_for_system(session);
104 let view = binder.bind_view_by_name(view_name)?;
105
106 Ok(view
107 .view_catalog
108 .columns
109 .iter()
110 .enumerate()
111 .map(|(idx, field)| ColumnCatalog {
112 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
113 is_hidden: false,
114 })
115 .collect())
116}
117
118pub fn get_indexes_from_table(
119 session: &SessionImpl,
120 table_name: ObjectName,
121) -> Result<Vec<Arc<IndexCatalog>>> {
122 let mut binder = Binder::new_for_batch(session);
123 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
124 let indexes = match relation {
125 Relation::BaseTable(t) => t.table_indexes,
126 _ => {
127 return Err(CatalogError::not_found("table or source", table_name.to_string()).into());
128 }
129 };
130
131 Ok(indexes)
132}
133
134fn schema_or_search_path(
135 session: &Arc<SessionImpl>,
136 schema: &Option<Ident>,
137 search_path: &SearchPath,
138) -> Vec<String> {
139 if let Some(s) = schema {
140 vec![s.real_value()]
141 } else {
142 search_path
143 .real_path()
144 .iter()
145 .map(|s| {
146 if s.eq(USER_NAME_WILD_CARD) {
147 session.user_name()
148 } else {
149 s.clone()
150 }
151 })
152 .collect()
153 }
154}
155
156fn iter_schema_items<F, T>(
157 session: &Arc<SessionImpl>,
158 schema: &Option<Ident>,
159 reader: &CatalogReadGuard,
160 current_user: &UserCatalog,
161 mut f: F,
162) -> Vec<T>
163where
164 F: FnMut(&SchemaCatalog) -> Vec<T>,
165{
166 let search_path = session.config().search_path();
167
168 schema_or_search_path(session, schema, &search_path)
169 .into_iter()
170 .filter_map(|schema| {
171 if let Ok(schema_catalog) =
172 reader.get_schema_by_name(&session.database(), schema.as_ref())
173 && (has_schema_usage_privilege(
174 current_user,
175 schema_catalog.id(),
176 schema_catalog.owner,
177 ))
178 {
179 Some(schema_catalog)
180 } else {
181 None
182 }
183 })
184 .flat_map(|s| f(s).into_iter())
185 .collect()
186}
187
188struct ObjectNameField(ObjectName);
191
192fn with_schema_name(schema_name: &str, name: &str) -> ObjectNameField {
193 if schema_name.is_empty() {
194 ObjectNameField(ObjectName(vec![name.into()]))
195 } else {
196 ObjectNameField(ObjectName(vec![schema_name.into(), name.into()]))
197 }
198}
199
200impl WithDataType for ObjectNameField {
201 fn default_data_type() -> DataType {
202 DataType::Varchar
203 }
204}
205
206impl ToOwnedDatum for ObjectNameField {
207 fn to_owned_datum(self) -> Datum {
208 Some(self.0.to_string().into())
209 }
210}
211
212#[derive(Fields)]
213#[fields(style = "Title Case")]
214struct ShowObjectRow {
215 name: ObjectNameField,
216}
217
218impl ShowObjectRow {
219 fn base_name(&self) -> String {
220 self.name.0.base_name()
221 }
222}
223
224fn is_permission_denied(err: &RwError) -> bool {
225 matches!(err.inner(), ErrorCode::PermissionDenied(_))
226}
227
228#[derive(Fields)]
229#[fields(style = "Title Case")]
230struct ShowDatabaseRow {
231 name: String,
232}
233
234#[derive(Fields)]
235#[fields(style = "Title Case")]
236struct ShowSchemaRow {
237 name: String,
238}
239
240#[derive(Fields)]
241#[fields(style = "Title Case")]
242pub struct ShowColumnRow {
243 pub name: ShowColumnName,
244 pub r#type: String,
245 pub is_hidden: Option<String>, pub description: Option<String>,
247}
248
249#[derive(Clone, Debug)]
250enum ShowColumnNameSegment {
251 Field(Ident),
252 ListElement,
253}
254
255impl ShowColumnNameSegment {
256 pub fn field(name: &str) -> Self {
257 ShowColumnNameSegment::Field(Ident::from_real_value(name))
258 }
259}
260
261#[derive(Clone, Debug)]
263pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
264
265impl ShowColumnName {
266 pub fn special(name: &str) -> Self {
269 ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
270 name,
271 ))])
272 }
273}
274
275impl WithDataType for ShowColumnName {
276 fn default_data_type() -> DataType {
277 DataType::Varchar
278 }
279}
280
281impl ToOwnedDatum for ShowColumnName {
282 fn to_owned_datum(self) -> Datum {
283 use std::fmt::Write;
284
285 let mut s = String::new();
286 for segment in self.0 {
287 match segment {
288 ShowColumnNameSegment::Field(ident) => {
289 if !s.is_empty() {
290 s.push('.');
292 }
293 write!(s, "{ident}").unwrap();
294 }
295 ShowColumnNameSegment::ListElement => {
296 s.push_str("[1]");
297 }
298 }
299 }
300 s.to_owned_datum()
301 }
302}
303
304impl ShowColumnRow {
305 fn flatten(
308 name: ShowColumnName,
309 data_type: DataType,
310 is_hidden: bool,
311 description: Option<String>,
312 ) -> Vec<Self> {
313 let r#type = match &data_type {
315 DataType::Struct(_) => "struct".to_owned(),
316 DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
317 d => d.to_string(),
318 };
319
320 let mut rows = vec![ShowColumnRow {
321 name: name.clone(),
322 r#type,
323 is_hidden: Some(is_hidden.to_string()),
324 description,
325 }];
326
327 match data_type {
328 DataType::Struct(st) => {
329 rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
330 let mut name = name.clone();
331 name.0.push(ShowColumnNameSegment::field(field_name));
332 Self::flatten(name, field_data_type.clone(), is_hidden, None)
333 }));
334 }
335
336 DataType::List(list) if let DataType::Struct(_) = list.elem() => {
337 let mut name = name.clone();
338 name.0.push(ShowColumnNameSegment::ListElement);
339 rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
340 }
341
342 _ => {}
343 }
344
345 rows
346 }
347
348 pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
349 Self::flatten(
350 ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
351 col.column_desc.data_type,
352 col.is_hidden,
353 col.column_desc.description,
354 )
355 }
356}
357
358#[derive(Fields)]
359#[fields(style = "Title Case")]
360struct ShowConnectionRow {
361 name: ObjectNameField,
362 r#type: String,
363 properties: String,
364}
365
366#[derive(Fields)]
367#[fields(style = "Title Case")]
368struct ShowFunctionRow {
369 name: ObjectNameField,
370 arguments: String,
371 return_type: String,
372 language: String,
373 link: Option<String>,
374}
375
376#[derive(Fields)]
377#[fields(style = "Title Case")]
378struct ShowIndexRow {
379 name: String,
380 on: String,
381 key: String,
382 include: String,
383 distributed_by: String,
384}
385
386impl From<Arc<IndexCatalog>> for ShowIndexRow {
387 fn from(index: Arc<IndexCatalog>) -> Self {
388 let index_display = index.display();
389 ShowIndexRow {
390 name: index.name.clone(),
391 on: index.primary_table.name.clone(),
392 key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
393 include: display_comma_separated(&index_display.include_columns).to_string(),
394 distributed_by: display_comma_separated(&index_display.distributed_by_columns)
395 .to_string(),
396 }
397 }
398}
399
400#[derive(Fields)]
401#[fields(style = "Title Case")]
402struct ShowClusterRow {
403 id: WorkerId,
404 addr: String,
405 r#type: String,
406 state: String,
407 parallelism: Option<i32>,
408 is_streaming: Option<bool>,
409 is_serving: Option<bool>,
410 started_at: Option<Timestamptz>,
411}
412
413#[derive(Fields)]
414#[fields(style = "Title Case")]
415struct ShowJobRow {
416 id: i64,
417 statement: String,
418 create_type: String,
419 progress: String,
420}
421
422#[derive(Fields)]
423#[fields(style = "Title Case")]
424struct ShowProcessListRow {
425 worker_id: String,
426 id: String,
427 user: String,
428 host: String,
429 database: String,
430 time: Option<String>,
431 info: Option<String>,
432}
433
434#[derive(Fields)]
435#[fields(style = "Title Case")]
436struct ShowCreateObjectRow {
437 name: String,
438 create_sql: String,
439}
440
441#[derive(Fields)]
442#[fields(style = "Title Case")]
443struct ShowSubscriptionRow {
444 name: ObjectNameField,
445 retention_seconds: i64,
446}
447
448#[derive(Fields)]
449#[fields(style = "Title Case")]
450struct ShowCursorRow {
451 worker_id: String,
452 session_id: String,
453 user: String,
454 host: String,
455 database: String,
456 cursor_name: String,
457 info: Option<String>,
458}
459
460#[derive(Fields)]
461#[fields(style = "Title Case")]
462struct ShowSubscriptionCursorRow {
463 worker_id: String,
464 session_id: String,
465 user: String,
466 host: String,
467 database: String,
468 cursor_name: String,
469 subscription_name: String,
470 state: String,
471 idle_duration_ms: i64,
472 info: Option<String>,
473}
474
475pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
477 fields_to_descriptors(match objects {
478 ShowObject::Database => ShowDatabaseRow::fields(),
479 ShowObject::Schema => ShowSchemaRow::fields(),
480 ShowObject::Columns { .. } => ShowColumnRow::fields(),
481 ShowObject::Connection { .. } => ShowConnectionRow::fields(),
482 ShowObject::Function { .. } => ShowFunctionRow::fields(),
483 ShowObject::Indexes { .. } => ShowIndexRow::fields(),
484 ShowObject::Cluster => ShowClusterRow::fields(),
485 ShowObject::Jobs => ShowJobRow::fields(),
486 ShowObject::ProcessList => ShowProcessListRow::fields(),
487 ShowObject::Cursor => ShowCursorRow::fields(),
488 ShowObject::SubscriptionCursor => ShowSubscriptionCursorRow::fields(),
489 ShowObject::Subscription { .. } => ShowSubscriptionRow::fields(),
490
491 ShowObject::Table { .. }
492 | ShowObject::InternalTable { .. }
493 | ShowObject::View { .. }
494 | ShowObject::MaterializedView { .. }
495 | ShowObject::Source { .. }
496 | ShowObject::Sink { .. }
497 | ShowObject::Secret { .. } => ShowObjectRow::fields(),
498 })
499}
500
501pub async fn handle_show_object(
502 handler_args: HandlerArgs,
503 command: ShowObject,
504 filter: Option<ShowStatementFilter>,
505) -> Result<RwPgResponse> {
506 let session = handler_args.session;
507
508 if let Some(ShowStatementFilter::Where(..)) = filter {
509 bail_not_implemented!("WHERE clause in SHOW statement");
510 }
511
512 let catalog_reader = session.env().catalog_reader();
513 let user_reader = session.env().user_info_reader();
514 let get_catalog_reader = || {
515 let reader = catalog_reader.read_guard();
516 let user_reader = user_reader.read_guard();
517 let current_user = user_reader
518 .get_user_by_name(&session.user_name())
519 .expect("user not found")
520 .clone();
521 (reader, current_user)
522 };
523
524 let rows: Vec<ShowObjectRow> = match command {
525 ShowObject::Table { schema } => {
526 let (reader, current_user) = get_catalog_reader();
527 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
528 schema
529 .iter_user_table_with_acl(¤t_user)
530 .map(|t| with_schema_name(&schema.name, &t.name))
531 .map(|name| ShowObjectRow { name })
532 .collect()
533 })
534 }
535 ShowObject::InternalTable { schema } => {
536 let (reader, current_user) = get_catalog_reader();
537 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
538 schema
539 .iter_internal_table_with_acl(¤t_user)
540 .map(|t| with_schema_name(&schema.name, &t.name))
541 .map(|name| ShowObjectRow { name })
542 .collect()
543 })
544 }
545 ShowObject::Database => {
546 let reader = catalog_reader.read_guard();
547 let rows = reader
548 .get_all_database_names()
549 .into_iter()
550 .map(|name| ShowDatabaseRow { name });
551 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
552 .rows(rows)
553 .into());
554 }
555 ShowObject::Schema => {
556 let reader = catalog_reader.read_guard();
557 let rows = reader
558 .get_all_schema_names(&session.database())?
559 .into_iter()
560 .map(|name| ShowSchemaRow { name });
561 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
562 .rows(rows)
563 .into());
564 }
565 ShowObject::View { schema } => {
566 let (reader, current_user) = get_catalog_reader();
567 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
568 schema
569 .iter_view_with_acl(¤t_user)
570 .map(|t| with_schema_name(&schema.name, &t.name))
571 .map(|name| ShowObjectRow { name })
572 .collect()
573 })
574 }
575 ShowObject::MaterializedView { schema } => {
576 let (reader, current_user) = get_catalog_reader();
577 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
578 schema
579 .iter_created_mvs_with_acl(¤t_user)
580 .map(|t| with_schema_name(&schema.name, &t.name))
581 .map(|name| ShowObjectRow { name })
582 .collect()
583 })
584 }
585 ShowObject::Source { schema } => {
586 let (reader, current_user) = get_catalog_reader();
587 let mut sources =
588 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
589 schema
590 .iter_source_with_acl(¤t_user)
591 .map(|t| with_schema_name(&schema.name, &t.name))
592 .map(|name| ShowObjectRow { name })
593 .collect()
594 });
595 sources.extend(
596 session
597 .temporary_source_manager()
598 .keys()
599 .into_iter()
600 .map(|t| with_schema_name("", &t))
601 .map(|name| ShowObjectRow { name }),
602 );
603 sources
604 }
605 ShowObject::Sink { schema } => {
606 let (reader, current_user) = get_catalog_reader();
607 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
608 schema
609 .iter_sink_with_acl(¤t_user)
610 .map(|t| with_schema_name(&schema.name, &t.name))
611 .map(|name| ShowObjectRow { name })
612 .collect()
613 })
614 }
615 ShowObject::Subscription { schema } => {
616 let (reader, current_user) = get_catalog_reader();
617 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
618 schema
619 .iter_subscription_with_acl(¤t_user)
620 .map(|t| ShowSubscriptionRow {
621 name: with_schema_name(&schema.name, &t.name),
622 retention_seconds: t.retention_seconds as i64,
623 })
624 .collect()
625 });
626 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
627 .rows(rows)
628 .into());
629 }
630 ShowObject::Secret { schema } => {
631 let (reader, current_user) = get_catalog_reader();
632 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
633 schema
634 .iter_secret_with_acl(¤t_user)
635 .map(|t| with_schema_name(&schema.name, &t.name))
636 .map(|name| ShowObjectRow { name })
637 .collect()
638 })
639 }
640 ShowObject::Columns { table } => {
641 let columns = match get_columns_from_table(&session, table.clone()) {
642 Ok(columns) => columns,
643 Err(err) if is_permission_denied(&err) => {
644 return Err(err);
645 }
646 Err(_) => match get_columns_from_sink(&session, table.clone()) {
647 Ok(columns) => columns,
648 Err(err) if is_permission_denied(&err) => {
649 return Err(err);
650 }
651 Err(_) => match get_columns_from_view(&session, table.clone()) {
652 Ok(columns) => columns,
653 Err(err) if is_permission_denied(&err) => {
654 return Err(err);
655 }
656 Err(_) => {
657 return Err(CatalogError::not_found(
658 "table, source, sink or view",
659 table.to_string(),
660 )
661 .into());
662 }
663 },
664 },
665 };
666
667 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
668 .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
669 .into());
670 }
671 ShowObject::Indexes { table } => {
672 let indexes = get_indexes_from_table(&session, table)?;
673
674 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
675 .rows(indexes.into_iter().map(ShowIndexRow::from))
676 .into());
677 }
678 ShowObject::Connection { schema } => {
679 let (reader, current_user) = get_catalog_reader();
680 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
681 schema.iter_connections_with_acl(¤t_user)
682 .map(|c| {
683 let name = c.name.clone();
684 let r#type = match &c.info {
685 #[expect(deprecated)]
686 connection::Info::PrivateLinkService(_) => {
687 PRIVATELINK_CONNECTION.to_owned()
688 },
689 connection::Info::ConnectionParams(params) => {
690 params.get_connection_type().unwrap().as_str_name().to_owned()
691 }
692 };
693 let source_names = schema
694 .get_source_ids_by_connection(c.id)
695 .unwrap_or_default()
696 .into_iter()
697 .filter_map(|sid| {
698 schema.get_source_by_id(sid).and_then(|catalog| {
699 has_access_to_object(¤t_user, catalog.id, catalog.owner)
700 .then_some(catalog.name.as_str())
701 })
702 })
703 .collect_vec();
704 let sink_names = schema
705 .get_sink_ids_by_connection(c.id)
706 .unwrap_or_default()
707 .into_iter()
708 .filter_map(|sid| {
709 schema.get_sink_by_id(sid).and_then(|catalog| {
710 has_access_to_object(¤t_user, catalog.id, catalog.owner)
711 .then_some(catalog.name.as_str())
712 })
713 })
714 .collect_vec();
715 let properties = match &c.info {
716 #[expect(deprecated)]
717 connection::Info::PrivateLinkService(i) => {
718 format!(
719 "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
720 i.get_provider().unwrap().as_str_name(),
721 i.service_name,
722 i.endpoint_id,
723 serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
724 serde_json::to_string(&source_names).unwrap(),
725 serde_json::to_string(&sink_names).unwrap(),
726 )
727 }
728 connection::Info::ConnectionParams(params) => {
729 print_connection_params_with_secret_visibility(
731 &session.database(),
732 params,
733 &reader,
734 ¤t_user,
735 )
736 }
737 };
738 ShowConnectionRow {
739 name: with_schema_name(&schema.name, &name),
740 r#type,
741 properties,
742 }
743 }).collect_vec()
744 });
745 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
746 .rows(rows)
747 .into());
748 }
749 ShowObject::Function { schema } => {
750 let (reader, current_user) = get_catalog_reader();
751 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
752 schema
753 .iter_function_with_acl(¤t_user)
754 .map(|t| ShowFunctionRow {
755 name: with_schema_name(&schema.name, &t.name),
756 arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
757 return_type: t.return_type.to_string(),
758 language: t.language.clone(),
759 link: t.link.clone(),
760 })
761 .collect()
762 });
763 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
764 .rows(rows)
765 .into());
766 }
767 ShowObject::Cluster => {
768 let workers = session.env().meta_client().list_all_nodes().await?;
769 let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
770 let addr: HostAddr = worker.host.as_ref().unwrap().into();
771 let property = worker.property.as_ref();
772 ShowClusterRow {
773 id: worker.id,
774 addr: addr.to_string(),
775 r#type: worker.get_type().unwrap().as_str_name().into(),
776 state: worker.get_state().unwrap().as_str_name().to_owned(),
777 parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
778 is_streaming: property.map(|p| p.is_streaming),
779 is_serving: property.map(|p| p.is_serving),
780 started_at: worker
781 .started_at
782 .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
783 }
784 });
785 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
786 .rows(rows)
787 .into());
788 }
789 ShowObject::Jobs => {
790 let resp = session.env().meta_client().get_ddl_progress().await?;
791 let rows = resp.into_iter().map(|job| ShowJobRow {
792 id: job.id as i64,
793 statement: job.statement,
794 create_type: job.create_type,
795 progress: job.progress,
796 });
797 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
798 .rows(rows)
799 .into());
800 }
801 ShowObject::ProcessList => {
802 let rows = show_process_list_impl(
803 session.env().frontend_client_pool(),
804 session.env().worker_node_manager_ref(),
805 )
806 .await;
807 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
808 .rows(rows)
809 .into());
810 }
811 ShowObject::Cursor => {
812 let rows = show_all_cursors_impl(
813 session.env().frontend_client_pool(),
814 session.env().worker_node_manager_ref(),
815 )
816 .await;
817 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
818 .rows(rows)
819 .into());
820 }
821 ShowObject::SubscriptionCursor => {
822 let rows = show_all_sub_cursors_impl(
823 session.env().frontend_client_pool(),
824 session.env().worker_node_manager_ref(),
825 )
826 .await;
827
828 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
829 .rows(rows)
830 .into());
831 }
832 };
833
834 let rows = rows.into_iter().filter(|row| match &filter {
836 Some(ShowStatementFilter::Like(pattern)) => like_default(&row.base_name(), pattern),
837 Some(ShowStatementFilter::ILike(pattern)) => i_like_default(&row.base_name(), pattern),
838 Some(ShowStatementFilter::Where(..)) => unreachable!(),
839 None => true,
840 });
841
842 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
843 .rows(rows)
844 .into())
845}
846
847pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
848 fields_to_descriptors(ShowCreateObjectRow::fields())
849}
850
851pub fn handle_show_create_object(
852 handle_args: HandlerArgs,
853 show_create_type: ShowCreateType,
854 name: ObjectName,
855) -> Result<RwPgResponse> {
856 let session = handle_args.session;
857 let catalog_reader = session.env().catalog_reader().read_guard();
858 let database = session.database();
859 let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
860 let search_path = session.config().search_path();
861 let user_name = &session.user_name();
862 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
863 let user_reader = session.env().user_info_reader().read_guard();
864 let current_user = user_reader
865 .get_user_by_name(user_name)
866 .expect("user not found");
867
868 let (sql, schema_name) = match show_create_type {
869 ShowCreateType::MaterializedView => {
870 let (mv, schema) = schema_path
871 .try_find(|schema_name| {
872 Ok::<_, RwError>(
873 catalog_reader
874 .get_schema_by_name(&database, schema_name)?
875 .get_created_table_by_name(&object_name)
876 .filter(|t| {
877 t.is_mview() && has_access_to_object(current_user, t.id, t.owner)
878 }),
879 )
880 })?
881 .ok_or_else(|| CatalogError::not_found("materialized view", name.to_string()))?;
882 (mv.create_sql(), schema)
883 }
884 ShowCreateType::View => {
885 let (view, schema) =
886 catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
887 if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
888 return Err(CatalogError::not_found("view", name.to_string()).into());
889 }
890 (view.create_sql(schema.to_owned()), schema)
891 }
892 ShowCreateType::Table => {
893 let (table, schema) = schema_path
894 .try_find(|schema_name| {
895 Ok::<_, RwError>(
896 catalog_reader
897 .get_schema_by_name(&database, schema_name)?
898 .get_created_table_by_name(&object_name)
899 .filter(|t| {
900 t.is_user_table()
901 && has_access_to_object(current_user, t.id, t.owner)
902 }),
903 )
904 })?
905 .ok_or_else(|| CatalogError::not_found("table", name.to_string()))?;
906
907 (table.create_sql_purified(), schema)
908 }
909 ShowCreateType::Sink => {
910 let (sink, schema) =
911 catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
912 if !has_access_to_object(current_user, sink.id, sink.owner) {
913 return Err(CatalogError::not_found("sink", name.to_string()).into());
914 }
915 (sink.create_sql(), schema)
916 }
917 ShowCreateType::Source => {
918 let (source, schema) = schema_path
919 .try_find(|schema_name| {
920 Ok::<_, RwError>(
921 catalog_reader
922 .get_schema_by_name(&database, schema_name)?
923 .get_source_by_name(&object_name)
924 .filter(|s| {
925 s.associated_table_id.is_none()
926 && has_access_to_object(current_user, s.id, s.owner)
927 }),
928 )
929 })?
930 .ok_or_else(|| CatalogError::not_found("source", name.to_string()))?;
931 (source.create_sql_purified(), schema)
932 }
933 ShowCreateType::Index => {
934 let (index, schema) = schema_path
935 .try_find(|schema_name| {
936 Ok::<_, RwError>(
937 catalog_reader
938 .get_schema_by_name(&database, schema_name)?
939 .get_created_table_by_name(&object_name)
940 .filter(|t| {
941 t.is_index() && has_access_to_object(current_user, t.id, t.owner)
942 }),
943 )
944 })?
945 .ok_or_else(|| CatalogError::not_found("index", name.to_string()))?;
946 (index.create_sql(), schema)
947 }
948 ShowCreateType::Function => {
949 bail_not_implemented!("show create on: {}", show_create_type);
950 }
951 ShowCreateType::Subscription => {
952 let (subscription, schema) =
953 catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
954 if !has_access_to_object(current_user, subscription.id, subscription.owner) {
955 return Err(CatalogError::not_found("subscription", name.to_string()).into());
956 }
957 (subscription.create_sql(), schema)
958 }
959 };
960 let name = format!("{}.{}", schema_name, object_name);
961
962 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
963 .rows([ShowCreateObjectRow {
964 name,
965 create_sql: sql,
966 }])
967 .into())
968}
969
970async fn show_all_sub_cursors_impl(
971 frontend_client_pool: FrontendClientPoolRef,
972 worker_node_manager: WorkerNodeManagerRef,
973) -> Vec<ShowSubscriptionCursorRow> {
974 fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowSubscriptionCursorRow> {
975 vec![ShowSubscriptionCursorRow {
976 worker_id: format!("{}", worker_id),
977 session_id: "".to_owned(),
978 user: "".to_owned(),
979 host: "".to_owned(),
980 database: "".to_owned(),
981 cursor_name: "".to_owned(),
982 subscription_name: "".to_owned(),
983 state: "".to_owned(),
984 idle_duration_ms: 0,
985 info: Some(format!(
986 "Failed to show subscription cursor from worker {worker_id} due to: {err_msg}"
987 )),
988 }]
989 }
990
991 let futures = worker_node_manager
992 .list_frontend_nodes()
993 .into_iter()
994 .map(|worker| {
995 let frontend_client_pool_ = frontend_client_pool.clone();
996 async move {
997 let client = match frontend_client_pool_.get(&worker).await {
998 Ok(client) => client,
999 Err(e) => {
1000 return on_error(worker.id, format!("{}", e.as_report()));
1001 }
1002 };
1003
1004 let resp = match client.get_all_sub_cursors(GetAllSubCursorsRequest {}).await {
1005 Ok(resp) => resp,
1006 Err(e) => {
1007 return on_error(worker.id, format!("{}", e.as_report()));
1008 }
1009 };
1010
1011 resp.into_inner()
1012 .subscription_cursors
1013 .into_iter()
1014 .flat_map(|sub_cursor| {
1015 let worker_id = worker.id.to_string();
1016 let session_id = sub_cursor.session_id.to_string();
1017
1018 let user = sub_cursor.user_name;
1019 let host = sub_cursor.peer_addr;
1020 let database = sub_cursor.database;
1021
1022 let size = sub_cursor.states.len();
1023 let mut sub_cursors = vec![];
1024 for index in 0..size {
1025 sub_cursors.push(ShowSubscriptionCursorRow {
1026 worker_id: worker_id.clone(),
1027 session_id: session_id.clone(),
1028 user: user.clone(),
1029 host: host.clone(),
1030 database: database.clone(),
1031 cursor_name: sub_cursor.cursor_names[index].clone(),
1032 subscription_name: sub_cursor.subscription_names[index].clone(),
1033 state: sub_cursor.states[index].clone(),
1034 idle_duration_ms: sub_cursor.idle_durations[index] as i64,
1035 info: None,
1036 })
1037 }
1038 sub_cursors
1039 })
1040 .collect_vec()
1041 }
1042 })
1043 .collect_vec();
1044 join_all(futures).await.into_iter().flatten().collect()
1045}
1046
1047async fn show_all_cursors_impl(
1048 frontend_client_pool: FrontendClientPoolRef,
1049 worker_node_manager: WorkerNodeManagerRef,
1050) -> Vec<ShowCursorRow> {
1051 fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowCursorRow> {
1052 vec![ShowCursorRow {
1053 worker_id: format!("{}", worker_id),
1054 session_id: "".to_owned(),
1055 user: "".to_owned(),
1056 host: "".to_owned(),
1057 database: "".to_owned(),
1058 cursor_name: "".to_owned(),
1059 info: Some(format!(
1060 "Failed to show cursor from worker {worker_id} due to: {err_msg}"
1061 )),
1062 }]
1063 }
1064 let futures = worker_node_manager
1065 .list_frontend_nodes()
1066 .into_iter()
1067 .map(|worker| {
1068 let frontend_client_pool_ = frontend_client_pool.clone();
1069 async move {
1070 let client = match frontend_client_pool_.get(&worker).await {
1071 Ok(client) => client,
1072 Err(e) => {
1073 return on_error(worker.id, format!("{}", e.as_report()));
1074 }
1075 };
1076
1077 let resp = match client.get_all_cursors(GetAllCursorsRequest {}).await {
1078 Ok(resp) => resp,
1079 Err(e) => {
1080 return on_error(worker.id, format!("{}", e.as_report()));
1081 }
1082 };
1083
1084 resp.into_inner()
1085 .all_cursors
1086 .into_iter()
1087 .flat_map(|cursors| {
1088 let worker_id = worker.id.to_string();
1089 let session_id = cursors.session_id.to_string();
1090
1091 let user = cursors.user_name;
1092 let host = cursors.peer_addr;
1093 let database = cursors.database;
1094
1095 cursors
1096 .cursor_names
1097 .into_iter()
1098 .map(|cursor_name| ShowCursorRow {
1099 worker_id: worker_id.clone(),
1100 session_id: session_id.clone(),
1101 user: user.clone(),
1102 host: host.clone(),
1103 database: database.clone(),
1104 cursor_name,
1105 info: None,
1106 })
1107 .collect_vec()
1108 })
1109 .collect_vec()
1110 }
1111 })
1112 .collect_vec();
1113 join_all(futures).await.into_iter().flatten().collect()
1114}
1115
1116async fn show_process_list_impl(
1117 frontend_client_pool: FrontendClientPoolRef,
1118 worker_node_manager: WorkerNodeManagerRef,
1119) -> Vec<ShowProcessListRow> {
1120 fn on_error(worker_id: WorkerId, err_msg: String) -> Vec<ShowProcessListRow> {
1122 vec![ShowProcessListRow {
1123 worker_id: format!("{}", worker_id),
1124 id: "".to_owned(),
1125 user: "".to_owned(),
1126 host: "".to_owned(),
1127 database: "".to_owned(),
1128 time: None,
1129 info: Some(format!(
1130 "Failed to show process list from worker {worker_id} due to: {err_msg}"
1131 )),
1132 }]
1133 }
1134 let futures = worker_node_manager
1135 .list_frontend_nodes()
1136 .into_iter()
1137 .map(|worker| {
1138 let frontend_client_pool_ = frontend_client_pool.clone();
1139 async move {
1140 let client = match frontend_client_pool_.get(&worker).await {
1141 Ok(client) => client,
1142 Err(e) => {
1143 return on_error(worker.id, format!("{}", e.as_report()));
1144 }
1145 };
1146 let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
1147 Ok(resp) => resp,
1148 Err(e) => {
1149 return on_error(worker.id, format!("{}", e.as_report()));
1150 }
1151 };
1152 resp.into_inner()
1153 .running_sqls
1154 .into_iter()
1155 .map(|sql| ShowProcessListRow {
1156 worker_id: format!("{}", worker.id),
1157 id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
1158 user: sql.user_name,
1159 host: sql.peer_addr,
1160 database: sql.database,
1161 time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
1162 info: sql
1163 .sql
1164 .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
1165 })
1166 .collect_vec()
1167 }
1168 })
1169 .collect_vec();
1170 join_all(futures).await.into_iter().flatten().collect()
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175 use std::ops::Index;
1176
1177 use futures_async_stream::for_await;
1178
1179 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1180
1181 #[tokio::test]
1182 async fn test_show_source() {
1183 let frontend = LocalFrontend::new(Default::default()).await;
1184
1185 let sql = r#"CREATE SOURCE t1 (column1 varchar)
1186 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1187 FORMAT PLAIN ENCODE JSON"#;
1188 frontend.run_sql(sql).await.unwrap();
1189
1190 let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
1191 rows.sort();
1192 assert_eq!(rows, vec!["Row([Some(b\"public.t1\")])".to_owned(),]);
1193 }
1194
1195 #[tokio::test]
1196 async fn test_show_column() {
1197 let proto_file = create_proto_file(PROTO_FILE_DATA);
1198 let sql = format!(
1199 r#"CREATE SOURCE t
1200 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1201 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1202 proto_file.path().to_str().unwrap()
1203 );
1204 let frontend = LocalFrontend::new(Default::default()).await;
1205 frontend.run_sql(sql).await.unwrap();
1206
1207 let sql = "show columns from t";
1208 let mut pg_response = frontend.run_sql(sql).await.unwrap();
1209
1210 let mut columns = Vec::new();
1211 #[for_await]
1212 for row_set in pg_response.values_stream() {
1213 let row_set = row_set.unwrap();
1214 for row in row_set {
1215 columns.push((
1216 std::str::from_utf8(row.index(0).as_ref().unwrap())
1217 .unwrap()
1218 .to_owned(),
1219 std::str::from_utf8(row.index(1).as_ref().unwrap())
1220 .unwrap()
1221 .to_owned(),
1222 ));
1223 }
1224 }
1225
1226 expect_test::expect![[r#"
1227 [
1228 (
1229 "id",
1230 "integer",
1231 ),
1232 (
1233 "country",
1234 "struct",
1235 ),
1236 (
1237 "country.address",
1238 "character varying",
1239 ),
1240 (
1241 "country.city",
1242 "struct",
1243 ),
1244 (
1245 "country.city.address",
1246 "character varying",
1247 ),
1248 (
1249 "country.city.zipcode",
1250 "character varying",
1251 ),
1252 (
1253 "country.zipcode",
1254 "character varying",
1255 ),
1256 (
1257 "zipcode",
1258 "bigint",
1259 ),
1260 (
1261 "rate",
1262 "real",
1263 ),
1264 (
1265 "_rw_kafka_timestamp",
1266 "timestamp with time zone",
1267 ),
1268 (
1269 "_rw_kafka_partition",
1270 "character varying",
1271 ),
1272 (
1273 "_rw_kafka_offset",
1274 "character varying",
1275 ),
1276 (
1277 "_row_id",
1278 "serial",
1279 ),
1280 ]
1281 "#]]
1282 .assert_debug_eq(&columns);
1283 }
1284}