1use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::pg_server::Session;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::GetRunningSqlsRequest;
33use risingwave_rpc_client::FrontendClientPoolRef;
34use risingwave_sqlparser::ast::{
35 Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
36};
37use thiserror_ext::AsReport;
38
39use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
40use crate::binder::{Binder, Relation};
41use crate::catalog::catalog_service::CatalogReadGuard;
42use crate::catalog::root_catalog::SchemaPath;
43use crate::catalog::schema_catalog::SchemaCatalog;
44use crate::catalog::{CatalogError, IndexCatalog};
45use crate::error::{Result, RwError};
46use crate::handler::HandlerArgs;
47use crate::handler::create_connection::print_connection_params;
48use crate::session::cursor_manager::SubscriptionCursor;
49use crate::session::{SessionImpl, WorkerProcessId};
50use crate::user::has_access_to_object;
51use crate::user::user_catalog::UserCatalog;
52
53pub fn get_columns_from_table(
54 session: &SessionImpl,
55 table_name: ObjectName,
56) -> Result<Vec<ColumnCatalog>> {
57 let mut binder = Binder::new_for_system(session);
58 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
59 let column_catalogs = match relation {
60 Relation::Source(s) => s.catalog.columns,
61 Relation::BaseTable(t) => t.table_catalog.columns.clone(),
62 Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
63 _ => {
64 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
65 }
66 };
67
68 Ok(column_catalogs)
69}
70
71pub fn get_columns_from_sink(
72 session: &SessionImpl,
73 sink_name: ObjectName,
74) -> Result<Vec<ColumnCatalog>> {
75 let binder = Binder::new_for_system(session);
76 let sink = binder.bind_sink_by_name(sink_name.clone())?;
77 Ok(sink.sink_catalog.full_columns().to_vec())
78}
79
80pub fn get_columns_from_view(
81 session: &SessionImpl,
82 view_name: ObjectName,
83) -> Result<Vec<ColumnCatalog>> {
84 let binder = Binder::new_for_system(session);
85 let view = binder.bind_view_by_name(view_name.clone())?;
86
87 Ok(view
88 .view_catalog
89 .columns
90 .iter()
91 .enumerate()
92 .map(|(idx, field)| ColumnCatalog {
93 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
94 is_hidden: false,
95 })
96 .collect())
97}
98
99pub fn get_indexes_from_table(
100 session: &SessionImpl,
101 table_name: ObjectName,
102) -> Result<Vec<Arc<IndexCatalog>>> {
103 let mut binder = Binder::new_for_system(session);
104 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
105 let indexes = match relation {
106 Relation::BaseTable(t) => t.table_indexes,
107 _ => {
108 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
109 }
110 };
111
112 Ok(indexes)
113}
114
115fn schema_or_search_path(
116 session: &Arc<SessionImpl>,
117 schema: &Option<Ident>,
118 search_path: &SearchPath,
119) -> Vec<String> {
120 if let Some(s) = schema {
121 vec![s.real_value()]
122 } else {
123 search_path
124 .real_path()
125 .iter()
126 .map(|s| {
127 if s.eq(USER_NAME_WILD_CARD) {
128 session.user_name()
129 } else {
130 s.clone()
131 }
132 })
133 .collect()
134 }
135}
136
137fn iter_schema_items<F, T>(
138 session: &Arc<SessionImpl>,
139 schema: &Option<Ident>,
140 reader: &CatalogReadGuard,
141 current_user: &UserCatalog,
142 mut f: F,
143) -> Vec<T>
144where
145 F: FnMut(&SchemaCatalog) -> Vec<T>,
146{
147 let search_path = session.config().search_path();
148
149 schema_or_search_path(session, schema, &search_path)
150 .into_iter()
151 .filter_map(|schema| {
152 if let Ok(schema_catalog) =
153 reader.get_schema_by_name(&session.database(), schema.as_ref())
154 && (current_user.is_super
155 || current_user.has_schema_usage_privilege(schema_catalog.id()))
156 {
157 Some(schema_catalog)
158 } else {
159 None
160 }
161 })
162 .flat_map(|s| f(s).into_iter())
163 .collect()
164}
165
166#[derive(Fields)]
167#[fields(style = "Title Case")]
168struct ShowObjectRow {
169 name: String,
170}
171
172#[derive(Fields)]
173#[fields(style = "Title Case")]
174pub struct ShowColumnRow {
175 pub name: ShowColumnName,
176 pub r#type: String,
177 pub is_hidden: Option<String>, pub description: Option<String>,
179}
180
181#[derive(Clone, Debug)]
182enum ShowColumnNameSegment {
183 Field(Ident),
184 ListElement,
185}
186
187impl ShowColumnNameSegment {
188 pub fn field(name: &str) -> Self {
189 ShowColumnNameSegment::Field(Ident::from_real_value(name))
190 }
191}
192
193#[derive(Clone, Debug)]
195pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
196
197impl ShowColumnName {
198 pub fn special(name: &str) -> Self {
201 ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
202 name,
203 ))])
204 }
205}
206
207impl WithDataType for ShowColumnName {
208 fn default_data_type() -> DataType {
209 DataType::Varchar
210 }
211}
212
213impl ToOwnedDatum for ShowColumnName {
214 fn to_owned_datum(self) -> Datum {
215 use std::fmt::Write;
216
217 let mut s = String::new();
218 for segment in self.0 {
219 match segment {
220 ShowColumnNameSegment::Field(ident) => {
221 if !s.is_empty() {
222 s.push('.');
224 }
225 write!(s, "{ident}").unwrap();
226 }
227 ShowColumnNameSegment::ListElement => {
228 s.push_str("[1]");
229 }
230 }
231 }
232 s.to_owned_datum()
233 }
234}
235
236impl ShowColumnRow {
237 fn flatten(
240 name: ShowColumnName,
241 data_type: DataType,
242 is_hidden: bool,
243 description: Option<String>,
244 ) -> Vec<Self> {
245 let r#type = match &data_type {
247 DataType::Struct(_) => "struct".to_owned(),
248 DataType::List(list) if let DataType::Struct(_) = list.elem() => "struct[]".to_owned(),
249 d => d.to_string(),
250 };
251
252 let mut rows = vec![ShowColumnRow {
253 name: name.clone(),
254 r#type,
255 is_hidden: Some(is_hidden.to_string()),
256 description,
257 }];
258
259 match data_type {
260 DataType::Struct(st) => {
261 rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
262 let mut name = name.clone();
263 name.0.push(ShowColumnNameSegment::field(field_name));
264 Self::flatten(name, field_data_type.clone(), is_hidden, None)
265 }));
266 }
267
268 DataType::List(list) if let DataType::Struct(_) = list.elem() => {
269 let mut name = name.clone();
270 name.0.push(ShowColumnNameSegment::ListElement);
271 rows.extend(Self::flatten(name, list.into_elem(), is_hidden, None));
272 }
273
274 _ => {}
275 }
276
277 rows
278 }
279
280 pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
281 Self::flatten(
282 ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
283 col.column_desc.data_type,
284 col.is_hidden,
285 col.column_desc.description,
286 )
287 }
288}
289
290#[derive(Fields)]
291#[fields(style = "Title Case")]
292struct ShowConnectionRow {
293 name: String,
294 r#type: String,
295 properties: String,
296}
297
298#[derive(Fields)]
299#[fields(style = "Title Case")]
300struct ShowFunctionRow {
301 name: String,
302 arguments: String,
303 return_type: String,
304 language: String,
305 link: Option<String>,
306}
307
308#[derive(Fields)]
309#[fields(style = "Title Case")]
310struct ShowIndexRow {
311 name: String,
312 on: String,
313 key: String,
314 include: String,
315 distributed_by: String,
316}
317
318impl From<Arc<IndexCatalog>> for ShowIndexRow {
319 fn from(index: Arc<IndexCatalog>) -> Self {
320 let index_display = index.display();
321 ShowIndexRow {
322 name: index.name.clone(),
323 on: index.primary_table.name.clone(),
324 key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
325 include: display_comma_separated(&index_display.include_columns).to_string(),
326 distributed_by: display_comma_separated(&index_display.distributed_by_columns)
327 .to_string(),
328 }
329 }
330}
331
332#[derive(Fields)]
333#[fields(style = "Title Case")]
334struct ShowClusterRow {
335 id: i32,
336 addr: String,
337 r#type: String,
338 state: String,
339 parallelism: Option<i32>,
340 is_streaming: Option<bool>,
341 is_serving: Option<bool>,
342 is_unschedulable: Option<bool>,
343 started_at: Option<Timestamptz>,
344}
345
346#[derive(Fields)]
347#[fields(style = "Title Case")]
348struct ShowJobRow {
349 id: i64,
350 statement: String,
351 create_type: String,
352 progress: String,
353}
354
355#[derive(Fields)]
356#[fields(style = "Title Case")]
357struct ShowProcessListRow {
358 worker_id: String,
359 id: String,
360 user: String,
361 host: String,
362 database: String,
363 time: Option<String>,
364 info: Option<String>,
365}
366
367#[derive(Fields)]
368#[fields(style = "Title Case")]
369struct ShowCreateObjectRow {
370 name: String,
371 create_sql: String,
372}
373
374#[derive(Fields)]
375#[fields(style = "Title Case")]
376struct ShowSubscriptionRow {
377 name: String,
378 retention_seconds: i64,
379}
380
381#[derive(Fields)]
382#[fields(style = "Title Case")]
383struct ShowCursorRow {
384 session_id: String,
385 user: String,
386 host: String,
387 database: String,
388 cursor_name: String,
389}
390
391#[derive(Fields)]
392#[fields(style = "Title Case")]
393struct ShowSubscriptionCursorRow {
394 session_id: String,
395 user: String,
396 host: String,
397 database: String,
398 cursor_name: String,
399 subscription_name: String,
400 state: String,
401 idle_duration_ms: i64,
402}
403
404pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
406 fields_to_descriptors(match objects {
407 ShowObject::Columns { .. } => ShowColumnRow::fields(),
408 ShowObject::Connection { .. } => ShowConnectionRow::fields(),
409 ShowObject::Function { .. } => ShowFunctionRow::fields(),
410 ShowObject::Indexes { .. } => ShowIndexRow::fields(),
411 ShowObject::Cluster => ShowClusterRow::fields(),
412 ShowObject::Jobs => ShowJobRow::fields(),
413 ShowObject::ProcessList => ShowProcessListRow::fields(),
414 _ => ShowObjectRow::fields(),
415 })
416}
417
418pub async fn handle_show_object(
419 handler_args: HandlerArgs,
420 command: ShowObject,
421 filter: Option<ShowStatementFilter>,
422) -> Result<RwPgResponse> {
423 let session = handler_args.session;
424
425 if let Some(ShowStatementFilter::Where(..)) = filter {
426 bail_not_implemented!("WHERE clause in SHOW statement");
427 }
428
429 let catalog_reader = session.env().catalog_reader();
430 let user_reader = session.env().user_info_reader();
431 let get_catalog_reader = || {
432 let reader = catalog_reader.read_guard();
433 let user_reader = user_reader.read_guard();
434 let current_user = user_reader
435 .get_user_by_name(&session.user_name())
436 .expect("user not found")
437 .clone();
438 (reader, current_user)
439 };
440
441 let names = match command {
442 ShowObject::Table { schema } => {
443 let (reader, current_user) = get_catalog_reader();
444 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
445 schema.iter_user_table().map(|t| t.name.clone()).collect()
446 })
447 }
448 ShowObject::InternalTable { schema } => {
449 let (reader, current_user) = get_catalog_reader();
450 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
451 schema
452 .iter_internal_table()
453 .map(|t| t.name.clone())
454 .collect()
455 })
456 }
457 ShowObject::Database => {
458 let reader = catalog_reader.read_guard();
459 reader.get_all_database_names()
460 }
461 ShowObject::Schema => {
462 let reader = catalog_reader.read_guard();
463 reader.get_all_schema_names(&session.database())?
464 }
465 ShowObject::View { schema } => {
466 let (reader, current_user) = get_catalog_reader();
467 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
468 schema.iter_view().map(|t| t.name.clone()).collect()
469 })
470 }
471 ShowObject::MaterializedView { schema } => {
472 let (reader, current_user) = get_catalog_reader();
473 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
474 schema.iter_created_mvs().map(|t| t.name.clone()).collect()
475 })
476 }
477 ShowObject::Source { schema } => {
478 let (reader, current_user) = get_catalog_reader();
479 let mut sources =
480 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
481 schema.iter_source().map(|t| t.name.clone()).collect()
482 });
483 sources.extend(session.temporary_source_manager().keys());
484 sources
485 }
486 ShowObject::Sink { schema } => {
487 let (reader, current_user) = get_catalog_reader();
488 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
489 schema.iter_sink().map(|t| t.name.clone()).collect()
490 })
491 }
492 ShowObject::Subscription { schema } => {
493 let (reader, current_user) = get_catalog_reader();
494 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
495 schema
496 .iter_subscription()
497 .map(|t| ShowSubscriptionRow {
498 name: t.name.clone(),
499 retention_seconds: t.retention_seconds as i64,
500 })
501 .collect()
502 });
503 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
504 .rows(rows)
505 .into());
506 }
507 ShowObject::Secret { schema } => {
508 let (reader, current_user) = get_catalog_reader();
509 iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
510 schema.iter_secret().map(|t| t.name.clone()).collect()
511 })
512 }
513 ShowObject::Columns { table } => {
514 let Ok(columns) = get_columns_from_table(&session, table.clone())
515 .or(get_columns_from_sink(&session, table.clone()))
516 .or(get_columns_from_view(&session, table.clone()))
517 else {
518 return Err(CatalogError::NotFound(
519 "table, source, sink or view",
520 table.to_string(),
521 )
522 .into());
523 };
524
525 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
526 .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
527 .into());
528 }
529 ShowObject::Indexes { table } => {
530 let indexes = get_indexes_from_table(&session, table)?;
531
532 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
533 .rows(indexes.into_iter().map(ShowIndexRow::from))
534 .into());
535 }
536 ShowObject::Connection { schema } => {
537 let (reader, current_user) = get_catalog_reader();
538 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
539 schema.iter_connections()
540 .map(|c| {
541 let name = c.name.clone();
542 let r#type = match &c.info {
543 connection::Info::PrivateLinkService(_) => {
544 PRIVATELINK_CONNECTION.to_owned()
545 },
546 connection::Info::ConnectionParams(params) => {
547 params.get_connection_type().unwrap().as_str_name().to_owned()
548 }
549 };
550 let source_names = schema
551 .get_source_ids_by_connection(c.id)
552 .unwrap_or_default()
553 .into_iter()
554 .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
555 .collect_vec();
556 let sink_names = schema
557 .get_sink_ids_by_connection(c.id)
558 .unwrap_or_default()
559 .into_iter()
560 .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
561 .collect_vec();
562 let properties = match &c.info {
563 connection::Info::PrivateLinkService(i) => {
564 format!(
565 "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
566 i.get_provider().unwrap().as_str_name(),
567 i.service_name,
568 i.endpoint_id,
569 serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
570 serde_json::to_string(&source_names).unwrap(),
571 serde_json::to_string(&sink_names).unwrap(),
572 )
573 }
574 connection::Info::ConnectionParams(params) => {
575 print_connection_params(&session.database(), params, &reader)
577 }
578 };
579 ShowConnectionRow {
580 name,
581 r#type,
582 properties,
583 }
584 }).collect_vec()
585 });
586 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
587 .rows(rows)
588 .into());
589 }
590 ShowObject::Function { schema } => {
591 let (reader, current_user) = get_catalog_reader();
592 let rows = iter_schema_items(&session, &schema, &reader, ¤t_user, |schema| {
593 schema
594 .iter_function()
595 .map(|t| ShowFunctionRow {
596 name: t.name.clone(),
597 arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
598 return_type: t.return_type.to_string(),
599 language: t.language.clone(),
600 link: t.link.clone(),
601 })
602 .collect()
603 });
604 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
605 .rows(rows)
606 .into());
607 }
608 ShowObject::Cluster => {
609 let workers = session.env().meta_client().list_all_nodes().await?;
610 let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
611 let addr: HostAddr = worker.host.as_ref().unwrap().into();
612 let property = worker.property.as_ref();
613 ShowClusterRow {
614 id: worker.id as _,
615 addr: addr.to_string(),
616 r#type: worker.get_type().unwrap().as_str_name().into(),
617 state: worker.get_state().unwrap().as_str_name().to_owned(),
618 parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
619 is_streaming: property.map(|p| p.is_streaming),
620 is_serving: property.map(|p| p.is_serving),
621 is_unschedulable: property.map(|p| p.is_unschedulable),
622 started_at: worker
623 .started_at
624 .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
625 }
626 });
627 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
628 .rows(rows)
629 .into());
630 }
631 ShowObject::Jobs => {
632 let resp = session.env().meta_client().get_ddl_progress().await?;
633 let rows = resp.into_iter().map(|job| ShowJobRow {
634 id: job.id as i64,
635 statement: job.statement,
636 create_type: job.create_type,
637 progress: job.progress,
638 });
639 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
640 .rows(rows)
641 .into());
642 }
643 ShowObject::ProcessList => {
644 let rows = show_process_list_impl(
645 session.env().frontend_client_pool(),
646 session.env().worker_node_manager_ref(),
647 )
648 .await;
649 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
650 .rows(rows)
651 .into());
652 }
653 ShowObject::Cursor => {
654 let sessions = session
655 .env()
656 .sessions_map()
657 .read()
658 .values()
659 .cloned()
660 .collect_vec();
661 let mut rows = vec![];
662 for s in sessions {
663 let session_id = format!("{}", s.id().0);
664 let user = s.user_name();
665 let host = format!("{}", s.peer_addr());
666 let database = s.database();
667
668 s.get_cursor_manager()
669 .iter_query_cursors(|cursor_name: &String, _| {
670 rows.push(ShowCursorRow {
671 session_id: session_id.clone(),
672 user: user.clone(),
673 host: host.clone(),
674 database: database.clone(),
675 cursor_name: cursor_name.to_owned(),
676 });
677 })
678 .await;
679 }
680 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
681 .rows(rows)
682 .into());
683 }
684 ShowObject::SubscriptionCursor => {
685 let sessions = session
686 .env()
687 .sessions_map()
688 .read()
689 .values()
690 .cloned()
691 .collect_vec();
692 let mut rows = vec![];
693 for s in sessions {
694 let session_id = format!("{}", s.id().0);
695 let user = s.user_name();
696 let host = format!("{}", s.peer_addr());
697 let database = s.database().to_owned();
698
699 s.get_cursor_manager()
700 .iter_subscription_cursors(
701 |cursor_name: &String, cursor: &SubscriptionCursor| {
702 rows.push(ShowSubscriptionCursorRow {
703 session_id: session_id.clone(),
704 user: user.clone(),
705 host: host.clone(),
706 database: database.clone(),
707 cursor_name: cursor_name.to_owned(),
708 subscription_name: cursor.subscription_name().to_owned(),
709 state: cursor.state_info_string(),
710 idle_duration_ms: cursor.idle_duration().as_millis() as i64,
711 });
712 },
713 )
714 .await;
715 }
716
717 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
718 .rows(rows)
719 .into());
720 }
721 };
722
723 let rows = names
724 .into_iter()
725 .filter(|arg| match &filter {
726 Some(ShowStatementFilter::Like(pattern)) => like_default(arg, pattern),
727 Some(ShowStatementFilter::ILike(pattern)) => i_like_default(arg, pattern),
728 Some(ShowStatementFilter::Where(..)) => unreachable!(),
729 None => true,
730 })
731 .map(|name| ShowObjectRow { name });
732
733 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
734 .rows(rows)
735 .into())
736}
737
738pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
739 fields_to_descriptors(ShowCreateObjectRow::fields())
740}
741
742pub fn handle_show_create_object(
743 handle_args: HandlerArgs,
744 show_create_type: ShowCreateType,
745 name: ObjectName,
746) -> Result<RwPgResponse> {
747 let session = handle_args.session;
748 let catalog_reader = session.env().catalog_reader().read_guard();
749 let database = session.database();
750 let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
751 let search_path = session.config().search_path();
752 let user_name = &session.user_name();
753 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
754 let user_reader = session.env().user_info_reader().read_guard();
755 let current_user = user_reader
756 .get_user_by_name(user_name)
757 .expect("user not found");
758
759 let (sql, schema_name) = match show_create_type {
760 ShowCreateType::MaterializedView => {
761 let (mv, schema) = schema_path
762 .try_find(|schema_name| {
763 Ok::<_, RwError>(
764 catalog_reader
765 .get_schema_by_name(&database, schema_name)?
766 .get_created_table_by_name(&object_name)
767 .filter(|t| {
768 t.is_mview()
769 && has_access_to_object(current_user, t.id.table_id, t.owner)
770 }),
771 )
772 })?
773 .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
774 (mv.create_sql(), schema)
775 }
776 ShowCreateType::View => {
777 let (view, schema) =
778 catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
779 if !view.is_system_view() && !has_access_to_object(current_user, view.id, view.owner) {
780 return Err(CatalogError::NotFound("view", name.to_string()).into());
781 }
782 (view.create_sql(schema.to_owned()), schema)
783 }
784 ShowCreateType::Table => {
785 let (table, schema) = schema_path
786 .try_find(|schema_name| {
787 Ok::<_, RwError>(
788 catalog_reader
789 .get_schema_by_name(&database, schema_name)?
790 .get_created_table_by_name(&object_name)
791 .filter(|t| {
792 t.is_user_table()
793 && has_access_to_object(current_user, t.id.table_id, t.owner)
794 }),
795 )
796 })?
797 .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
798
799 (table.create_sql_purified(), schema)
800 }
801 ShowCreateType::Sink => {
802 let (sink, schema) =
803 catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
804 if !has_access_to_object(current_user, sink.id.sink_id, sink.owner.user_id) {
805 return Err(CatalogError::NotFound("sink", name.to_string()).into());
806 }
807 (sink.create_sql(), schema)
808 }
809 ShowCreateType::Source => {
810 let (source, schema) = schema_path
811 .try_find(|schema_name| {
812 Ok::<_, RwError>(
813 catalog_reader
814 .get_schema_by_name(&database, schema_name)?
815 .get_source_by_name(&object_name)
816 .filter(|s| {
817 s.associated_table_id.is_none()
818 && has_access_to_object(current_user, s.id, s.owner)
819 }),
820 )
821 })?
822 .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
823 (source.create_sql_purified(), schema)
824 }
825 ShowCreateType::Index => {
826 let (index, schema) = schema_path
827 .try_find(|schema_name| {
828 Ok::<_, RwError>(
829 catalog_reader
830 .get_schema_by_name(&database, schema_name)?
831 .get_created_table_by_name(&object_name)
832 .filter(|t| {
833 t.is_index()
834 && has_access_to_object(current_user, t.id.table_id, t.owner)
835 }),
836 )
837 })?
838 .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
839 (index.create_sql(), schema)
840 }
841 ShowCreateType::Function => {
842 bail_not_implemented!("show create on: {}", show_create_type);
843 }
844 ShowCreateType::Subscription => {
845 let (subscription, schema) =
846 catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
847 if !has_access_to_object(
848 current_user,
849 subscription.id.subscription_id,
850 subscription.owner.user_id,
851 ) {
852 return Err(CatalogError::NotFound("subscription", name.to_string()).into());
853 }
854 (subscription.create_sql(), schema)
855 }
856 };
857 let name = format!("{}.{}", schema_name, object_name);
858
859 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
860 .rows([ShowCreateObjectRow {
861 name,
862 create_sql: sql,
863 }])
864 .into())
865}
866
867async fn show_process_list_impl(
868 frontend_client_pool: FrontendClientPoolRef,
869 worker_node_manager: WorkerNodeManagerRef,
870) -> Vec<ShowProcessListRow> {
871 fn on_error(worker_id: u32, err_msg: String) -> Vec<ShowProcessListRow> {
873 vec![ShowProcessListRow {
874 worker_id: format!("{}", worker_id),
875 id: "".to_owned(),
876 user: "".to_owned(),
877 host: "".to_owned(),
878 database: "".to_owned(),
879 time: None,
880 info: Some(format!(
881 "Failed to show process list from worker {worker_id} due to: {err_msg}"
882 )),
883 }]
884 }
885 let futures = worker_node_manager
886 .list_frontend_nodes()
887 .into_iter()
888 .map(|worker| {
889 let frontend_client_pool_ = frontend_client_pool.clone();
890 async move {
891 let client = match frontend_client_pool_.get(&worker).await {
892 Ok(client) => client,
893 Err(e) => {
894 return on_error(worker.id, format!("{}", e.as_report()));
895 }
896 };
897 let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
898 Ok(resp) => resp,
899 Err(e) => {
900 return on_error(worker.id, format!("{}", e.as_report()));
901 }
902 };
903 resp.into_inner()
904 .running_sqls
905 .into_iter()
906 .map(|sql| ShowProcessListRow {
907 worker_id: format!("{}", worker.id),
908 id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
909 user: sql.user_name,
910 host: sql.peer_addr,
911 database: sql.database,
912 time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
913 info: sql
914 .sql
915 .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
916 })
917 .collect_vec()
918 }
919 })
920 .collect_vec();
921 join_all(futures).await.into_iter().flatten().collect()
922}
923
924#[cfg(test)]
925mod tests {
926 use std::ops::Index;
927
928 use futures_async_stream::for_await;
929
930 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
931
932 #[tokio::test]
933 async fn test_show_source() {
934 let frontend = LocalFrontend::new(Default::default()).await;
935
936 let sql = r#"CREATE SOURCE t1 (column1 varchar)
937 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
938 FORMAT PLAIN ENCODE JSON"#;
939 frontend.run_sql(sql).await.unwrap();
940
941 let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
942 rows.sort();
943 assert_eq!(rows, vec!["Row([Some(b\"t1\")])".to_owned(),]);
944 }
945
946 #[tokio::test]
947 async fn test_show_column() {
948 let proto_file = create_proto_file(PROTO_FILE_DATA);
949 let sql = format!(
950 r#"CREATE SOURCE t
951 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
952 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
953 proto_file.path().to_str().unwrap()
954 );
955 let frontend = LocalFrontend::new(Default::default()).await;
956 frontend.run_sql(sql).await.unwrap();
957
958 let sql = "show columns from t";
959 let mut pg_response = frontend.run_sql(sql).await.unwrap();
960
961 let mut columns = Vec::new();
962 #[for_await]
963 for row_set in pg_response.values_stream() {
964 let row_set = row_set.unwrap();
965 for row in row_set {
966 columns.push((
967 std::str::from_utf8(row.index(0).as_ref().unwrap())
968 .unwrap()
969 .to_owned(),
970 std::str::from_utf8(row.index(1).as_ref().unwrap())
971 .unwrap()
972 .to_owned(),
973 ));
974 }
975 }
976
977 expect_test::expect![[r#"
978 [
979 (
980 "id",
981 "integer",
982 ),
983 (
984 "country",
985 "struct",
986 ),
987 (
988 "country.address",
989 "character varying",
990 ),
991 (
992 "country.city",
993 "struct",
994 ),
995 (
996 "country.city.address",
997 "character varying",
998 ),
999 (
1000 "country.city.zipcode",
1001 "character varying",
1002 ),
1003 (
1004 "country.zipcode",
1005 "character varying",
1006 ),
1007 (
1008 "zipcode",
1009 "bigint",
1010 ),
1011 (
1012 "rate",
1013 "real",
1014 ),
1015 (
1016 "_rw_kafka_timestamp",
1017 "timestamp with time zone",
1018 ),
1019 (
1020 "_rw_kafka_partition",
1021 "character varying",
1022 ),
1023 (
1024 "_rw_kafka_offset",
1025 "character varying",
1026 ),
1027 (
1028 "_row_id",
1029 "serial",
1030 ),
1031 ]
1032 "#]]
1033 .assert_debug_eq(&columns);
1034 }
1035}