1use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::pg_server::Session;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::GetRunningSqlsRequest;
33use risingwave_rpc_client::FrontendClientPoolRef;
34use risingwave_sqlparser::ast::{
35 Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
36};
37use thiserror_ext::AsReport;
38
39use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
40use crate::binder::{Binder, Relation};
41use crate::catalog::catalog_service::CatalogReadGuard;
42use crate::catalog::root_catalog::SchemaPath;
43use crate::catalog::schema_catalog::SchemaCatalog;
44use crate::catalog::{CatalogError, IndexCatalog};
45use crate::error::{Result, RwError};
46use crate::handler::HandlerArgs;
47use crate::handler::create_connection::print_connection_params;
48use crate::session::cursor_manager::SubscriptionCursor;
49use crate::session::{SessionImpl, WorkerProcessId};
50use crate::user::has_access_to_object;
51
52pub fn get_columns_from_table(
53 session: &SessionImpl,
54 table_name: ObjectName,
55) -> Result<Vec<ColumnCatalog>> {
56 let mut binder = Binder::new_for_system(session);
57 let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?;
58 let column_catalogs = match relation {
59 Relation::Source(s) => s.catalog.columns,
60 Relation::BaseTable(t) => t.table_catalog.columns.clone(),
61 Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
62 _ => {
63 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
64 }
65 };
66
67 Ok(column_catalogs)
68}
69
70pub fn get_columns_from_sink(
71 session: &SessionImpl,
72 sink_name: ObjectName,
73) -> Result<Vec<ColumnCatalog>> {
74 let binder = Binder::new_for_system(session);
75 let sink = binder.bind_sink_by_name(sink_name.clone())?;
76 Ok(sink.sink_catalog.full_columns().to_vec())
77}
78
79pub fn get_columns_from_view(
80 session: &SessionImpl,
81 view_name: ObjectName,
82) -> Result<Vec<ColumnCatalog>> {
83 let binder = Binder::new_for_system(session);
84 let view = binder.bind_view_by_name(view_name.clone())?;
85
86 Ok(view
87 .view_catalog
88 .columns
89 .iter()
90 .enumerate()
91 .map(|(idx, field)| ColumnCatalog {
92 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
93 is_hidden: false,
94 })
95 .collect())
96}
97
98pub fn get_indexes_from_table(
99 session: &SessionImpl,
100 table_name: ObjectName,
101) -> Result<Vec<Arc<IndexCatalog>>> {
102 let mut binder = Binder::new_for_system(session);
103 let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?;
104 let indexes = match relation {
105 Relation::BaseTable(t) => t.table_indexes,
106 _ => {
107 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
108 }
109 };
110
111 Ok(indexes)
112}
113
114fn schema_or_search_path(
115 session: &Arc<SessionImpl>,
116 schema: &Option<Ident>,
117 search_path: &SearchPath,
118) -> Vec<String> {
119 if let Some(s) = schema {
120 vec![s.real_value()]
121 } else {
122 search_path
123 .real_path()
124 .iter()
125 .map(|s| {
126 if s.eq(USER_NAME_WILD_CARD) {
127 session.user_name()
128 } else {
129 s.to_string()
130 }
131 })
132 .collect()
133 }
134}
135
136fn iter_schema_items<F, T>(
137 session: &Arc<SessionImpl>,
138 schema: &Option<Ident>,
139 reader: &CatalogReadGuard,
140 mut f: F,
141) -> Vec<T>
142where
143 F: FnMut(&SchemaCatalog) -> Vec<T>,
144{
145 let search_path = session.config().search_path();
146
147 schema_or_search_path(session, schema, &search_path)
148 .into_iter()
149 .filter_map(|schema| {
150 reader
151 .get_schema_by_name(&session.database(), schema.as_ref())
152 .ok()
153 })
154 .flat_map(|s| f(s).into_iter())
155 .collect()
156}
157
158#[derive(Fields)]
159#[fields(style = "Title Case")]
160struct ShowObjectRow {
161 name: String,
162}
163
164#[derive(Fields)]
165#[fields(style = "Title Case")]
166pub struct ShowColumnRow {
167 pub name: ShowColumnName,
168 pub r#type: String,
169 pub is_hidden: Option<String>, pub description: Option<String>,
171}
172
173#[derive(Clone, Debug)]
174enum ShowColumnNameSegment {
175 Field(Ident),
176 ListElement,
177}
178
179impl ShowColumnNameSegment {
180 pub fn field(name: &str) -> Self {
181 ShowColumnNameSegment::Field(Ident::from_real_value(name))
182 }
183}
184
185#[derive(Clone, Debug)]
187pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
188
189impl ShowColumnName {
190 pub fn special(name: &str) -> Self {
193 ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
194 name,
195 ))])
196 }
197}
198
199impl WithDataType for ShowColumnName {
200 fn default_data_type() -> DataType {
201 DataType::Varchar
202 }
203}
204
205impl ToOwnedDatum for ShowColumnName {
206 fn to_owned_datum(self) -> Datum {
207 use std::fmt::Write;
208
209 let mut s = String::new();
210 for segment in self.0 {
211 match segment {
212 ShowColumnNameSegment::Field(ident) => {
213 if !s.is_empty() {
214 s.push('.');
216 }
217 write!(s, "{ident}").unwrap();
218 }
219 ShowColumnNameSegment::ListElement => {
220 s.push_str("[1]");
221 }
222 }
223 }
224 s.to_owned_datum()
225 }
226}
227
228impl ShowColumnRow {
229 fn flatten(
232 name: ShowColumnName,
233 data_type: DataType,
234 is_hidden: bool,
235 description: Option<String>,
236 ) -> Vec<Self> {
237 let r#type = match &data_type {
239 DataType::Struct(_) => "struct".to_owned(),
240 DataType::List(box DataType::Struct(_)) => "struct[]".to_owned(),
241 d => d.to_string(),
242 };
243
244 let mut rows = vec![ShowColumnRow {
245 name: name.clone(),
246 r#type,
247 is_hidden: Some(is_hidden.to_string()),
248 description,
249 }];
250
251 match data_type {
252 DataType::Struct(st) => {
253 rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
254 let mut name = name.clone();
255 name.0.push(ShowColumnNameSegment::field(field_name));
256 Self::flatten(name, field_data_type.clone(), is_hidden, None)
257 }));
258 }
259
260 DataType::List(inner @ box DataType::Struct(_)) => {
261 let mut name = name.clone();
262 name.0.push(ShowColumnNameSegment::ListElement);
263 rows.extend(Self::flatten(name, *inner, is_hidden, None));
264 }
265
266 _ => {}
267 }
268
269 rows
270 }
271
272 pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
273 Self::flatten(
274 ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
275 col.column_desc.data_type,
276 col.is_hidden,
277 col.column_desc.description,
278 )
279 }
280}
281
282#[derive(Fields)]
283#[fields(style = "Title Case")]
284struct ShowConnectionRow {
285 name: String,
286 r#type: String,
287 properties: String,
288}
289
290#[derive(Fields)]
291#[fields(style = "Title Case")]
292struct ShowFunctionRow {
293 name: String,
294 arguments: String,
295 return_type: String,
296 language: String,
297 link: Option<String>,
298}
299
300#[derive(Fields)]
301#[fields(style = "Title Case")]
302struct ShowIndexRow {
303 name: String,
304 on: String,
305 key: String,
306 include: String,
307 distributed_by: String,
308}
309
310impl From<Arc<IndexCatalog>> for ShowIndexRow {
311 fn from(index: Arc<IndexCatalog>) -> Self {
312 let index_display = index.display();
313 ShowIndexRow {
314 name: index.name.clone(),
315 on: index.primary_table.name.clone(),
316 key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
317 include: display_comma_separated(&index_display.include_columns).to_string(),
318 distributed_by: display_comma_separated(&index_display.distributed_by_columns)
319 .to_string(),
320 }
321 }
322}
323
324#[derive(Fields)]
325#[fields(style = "Title Case")]
326struct ShowClusterRow {
327 id: i32,
328 addr: String,
329 r#type: String,
330 state: String,
331 parallelism: Option<i32>,
332 is_streaming: Option<bool>,
333 is_serving: Option<bool>,
334 is_unschedulable: Option<bool>,
335 started_at: Option<Timestamptz>,
336}
337
338#[derive(Fields)]
339#[fields(style = "Title Case")]
340struct ShowJobRow {
341 id: i64,
342 statement: String,
343 progress: String,
344}
345
346#[derive(Fields)]
347#[fields(style = "Title Case")]
348struct ShowProcessListRow {
349 worker_id: String,
350 id: String,
351 user: String,
352 host: String,
353 database: String,
354 time: Option<String>,
355 info: Option<String>,
356}
357
358#[derive(Fields)]
359#[fields(style = "Title Case")]
360struct ShowCreateObjectRow {
361 name: String,
362 create_sql: String,
363}
364
365#[derive(Fields)]
366#[fields(style = "Title Case")]
367struct ShowSubscriptionRow {
368 name: String,
369 retention_seconds: i64,
370}
371
372#[derive(Fields)]
373#[fields(style = "Title Case")]
374struct ShowCursorRow {
375 session_id: String,
376 user: String,
377 host: String,
378 database: String,
379 cursor_name: String,
380}
381
382#[derive(Fields)]
383#[fields(style = "Title Case")]
384struct ShowSubscriptionCursorRow {
385 session_id: String,
386 user: String,
387 host: String,
388 database: String,
389 cursor_name: String,
390 subscription_name: String,
391 state: String,
392 idle_duration_ms: i64,
393}
394
395pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
397 fields_to_descriptors(match objects {
398 ShowObject::Columns { .. } => ShowColumnRow::fields(),
399 ShowObject::Connection { .. } => ShowConnectionRow::fields(),
400 ShowObject::Function { .. } => ShowFunctionRow::fields(),
401 ShowObject::Indexes { .. } => ShowIndexRow::fields(),
402 ShowObject::Cluster => ShowClusterRow::fields(),
403 ShowObject::Jobs => ShowJobRow::fields(),
404 ShowObject::ProcessList => ShowProcessListRow::fields(),
405 _ => ShowObjectRow::fields(),
406 })
407}
408
409pub async fn handle_show_object(
410 handler_args: HandlerArgs,
411 command: ShowObject,
412 filter: Option<ShowStatementFilter>,
413) -> Result<RwPgResponse> {
414 let session = handler_args.session;
415
416 if let Some(ShowStatementFilter::Where(..)) = filter {
417 bail_not_implemented!("WHERE clause in SHOW statement");
418 }
419
420 let catalog_reader = session.env().catalog_reader();
421 let user_reader = session.env().user_info_reader();
422
423 let names = match command {
424 ShowObject::Table { schema } => {
425 let reader = catalog_reader.read_guard();
426 let user_reader = user_reader.read_guard();
427 let current_user = user_reader
428 .get_user_by_name(&session.user_name())
429 .expect("user not found");
430 iter_schema_items(&session, &schema, &reader, |schema| {
431 schema
432 .iter_user_table_with_acl(current_user)
433 .map(|t| t.name.clone())
434 .collect()
435 })
436 }
437 ShowObject::InternalTable { schema } => {
438 let reader = catalog_reader.read_guard();
439 let user_reader = user_reader.read_guard();
440 let current_user = user_reader
441 .get_user_by_name(&session.user_name())
442 .expect("user not found");
443 iter_schema_items(&session, &schema, &reader, |schema| {
444 schema
445 .iter_internal_table_with_acl(current_user)
446 .map(|t| t.name.clone())
447 .collect()
448 })
449 }
450 ShowObject::Database => catalog_reader.read_guard().get_all_database_names(),
451 ShowObject::Schema => catalog_reader
452 .read_guard()
453 .get_all_schema_names(&session.database())?,
454 ShowObject::View { schema } => {
455 let reader = catalog_reader.read_guard();
456 let user_reader = user_reader.read_guard();
457 let current_user = user_reader
458 .get_user_by_name(&session.user_name())
459 .expect("user not found");
460 iter_schema_items(&session, &schema, &reader, |schema| {
461 schema
462 .iter_view_with_acl(current_user)
463 .map(|t| t.name.clone())
464 .collect()
465 })
466 }
467 ShowObject::MaterializedView { schema } => {
468 let reader = catalog_reader.read_guard();
469 let user_reader = user_reader.read_guard();
470 let current_user = user_reader
471 .get_user_by_name(&session.user_name())
472 .expect("user not found");
473 iter_schema_items(&session, &schema, &reader, |schema| {
474 schema
475 .iter_created_mvs_with_acl(current_user)
476 .map(|t| t.name.clone())
477 .collect()
478 })
479 }
480 ShowObject::Source { schema } => {
481 let reader = catalog_reader.read_guard();
482 let user_reader = user_reader.read_guard();
483 let current_user = user_reader
484 .get_user_by_name(&session.user_name())
485 .expect("user not found");
486 let mut sources = iter_schema_items(&session, &schema, &reader, |schema| {
487 schema
488 .iter_source_with_acl(current_user)
489 .map(|t| t.name.clone())
490 .collect()
491 });
492 sources.extend(session.temporary_source_manager().keys());
493 sources
494 }
495 ShowObject::Sink { schema } => {
496 let reader = catalog_reader.read_guard();
497 let user_reader = user_reader.read_guard();
498 let current_user = user_reader
499 .get_user_by_name(&session.user_name())
500 .expect("user not found");
501 iter_schema_items(&session, &schema, &reader, |schema| {
502 schema
503 .iter_sink_with_acl(current_user)
504 .map(|t| t.name.clone())
505 .collect()
506 })
507 }
508 ShowObject::Subscription { schema } => {
509 let reader = catalog_reader.read_guard();
510 let rows = iter_schema_items(&session, &schema, &reader, |schema| {
511 schema
512 .iter_subscription()
513 .map(|t| ShowSubscriptionRow {
514 name: t.name.clone(),
515 retention_seconds: t.retention_seconds as i64,
516 })
517 .collect()
518 });
519 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
520 .rows(rows)
521 .into());
522 }
523 ShowObject::Secret { schema } => {
524 let reader = catalog_reader.read_guard();
525 iter_schema_items(&session, &schema, &reader, |schema| {
526 schema.iter_secret().map(|t| t.name.clone()).collect()
527 })
528 }
529 ShowObject::Columns { table } => {
530 let Ok(columns) = get_columns_from_table(&session, table.clone())
531 .or(get_columns_from_sink(&session, table.clone()))
532 .or(get_columns_from_view(&session, table.clone()))
533 else {
534 return Err(CatalogError::NotFound(
535 "table, source, sink or view",
536 table.to_string(),
537 )
538 .into());
539 };
540
541 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
542 .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
543 .into());
544 }
545 ShowObject::Indexes { table } => {
546 let indexes = get_indexes_from_table(&session, table)?;
547
548 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
549 .rows(indexes.into_iter().map(ShowIndexRow::from))
550 .into());
551 }
552 ShowObject::Connection { schema } => {
553 let reader = catalog_reader.read_guard();
554 let rows = iter_schema_items(&session, &schema, &reader, |schema| {
555 schema.iter_connections()
556 .map(|c| {
557 let name = c.name.clone();
558 let r#type = match &c.info {
559 connection::Info::PrivateLinkService(_) => {
560 PRIVATELINK_CONNECTION.to_owned()
561 },
562 connection::Info::ConnectionParams(params) => {
563 params.get_connection_type().unwrap().as_str_name().to_owned()
564 }
565 };
566 let source_names = schema
567 .get_source_ids_by_connection(c.id)
568 .unwrap_or_default()
569 .into_iter()
570 .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
571 .collect_vec();
572 let sink_names = schema
573 .get_sink_ids_by_connection(c.id)
574 .unwrap_or_default()
575 .into_iter()
576 .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
577 .collect_vec();
578 let properties = match &c.info {
579 connection::Info::PrivateLinkService(i) => {
580 format!(
581 "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
582 i.get_provider().unwrap().as_str_name(),
583 i.service_name,
584 i.endpoint_id,
585 serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
586 serde_json::to_string(&source_names).unwrap(),
587 serde_json::to_string(&sink_names).unwrap(),
588 )
589 }
590 connection::Info::ConnectionParams(params) => {
591 print_connection_params(params, schema)
593 }
594 };
595 ShowConnectionRow {
596 name,
597 r#type,
598 properties,
599 }
600 }).collect_vec()
601 });
602 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
603 .rows(rows)
604 .into());
605 }
606 ShowObject::Function { schema } => {
607 let reader = catalog_reader.read_guard();
608 let rows = iter_schema_items(&session, &schema, &reader, |schema| {
609 schema
610 .iter_function()
611 .map(|t| ShowFunctionRow {
612 name: t.name.clone(),
613 arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
614 return_type: t.return_type.to_string(),
615 language: t.language.clone(),
616 link: t.link.clone(),
617 })
618 .collect()
619 });
620 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
621 .rows(rows)
622 .into());
623 }
624 ShowObject::Cluster => {
625 let workers = session.env().meta_client().list_all_nodes().await?;
626 let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
627 let addr: HostAddr = worker.host.as_ref().unwrap().into();
628 let property = worker.property.as_ref();
629 ShowClusterRow {
630 id: worker.id as _,
631 addr: addr.to_string(),
632 r#type: worker.get_type().unwrap().as_str_name().into(),
633 state: worker.get_state().unwrap().as_str_name().to_owned(),
634 parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
635 is_streaming: property.map(|p| p.is_streaming),
636 is_serving: property.map(|p| p.is_serving),
637 is_unschedulable: property.map(|p| p.is_unschedulable),
638 started_at: worker
639 .started_at
640 .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
641 }
642 });
643 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
644 .rows(rows)
645 .into());
646 }
647 ShowObject::Jobs => {
648 let resp = session.env().meta_client().get_ddl_progress().await?;
649 let rows = resp.into_iter().map(|job| ShowJobRow {
650 id: job.id as i64,
651 statement: job.statement,
652 progress: job.progress,
653 });
654 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
655 .rows(rows)
656 .into());
657 }
658 ShowObject::ProcessList => {
659 let rows = show_process_list_impl(
660 session.env().frontend_client_pool(),
661 session.env().worker_node_manager_ref(),
662 )
663 .await;
664 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
665 .rows(rows)
666 .into());
667 }
668 ShowObject::Cursor => {
669 let sessions = session
670 .env()
671 .sessions_map()
672 .read()
673 .values()
674 .cloned()
675 .collect_vec();
676 let mut rows = vec![];
677 for s in sessions {
678 let session_id = format!("{}", s.id().0);
679 let user = s.user_name();
680 let host = format!("{}", s.peer_addr());
681 let database = s.database();
682
683 s.get_cursor_manager()
684 .iter_query_cursors(|cursor_name: &String, _| {
685 rows.push(ShowCursorRow {
686 session_id: session_id.clone(),
687 user: user.clone(),
688 host: host.clone(),
689 database: database.clone(),
690 cursor_name: cursor_name.to_owned(),
691 });
692 })
693 .await;
694 }
695 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
696 .rows(rows)
697 .into());
698 }
699 ShowObject::SubscriptionCursor => {
700 let sessions = session
701 .env()
702 .sessions_map()
703 .read()
704 .values()
705 .cloned()
706 .collect_vec();
707 let mut rows = vec![];
708 for s in sessions {
709 let session_id = format!("{}", s.id().0);
710 let user = s.user_name();
711 let host = format!("{}", s.peer_addr());
712 let database = s.database().to_owned();
713
714 s.get_cursor_manager()
715 .iter_subscription_cursors(
716 |cursor_name: &String, cursor: &SubscriptionCursor| {
717 rows.push(ShowSubscriptionCursorRow {
718 session_id: session_id.clone(),
719 user: user.clone(),
720 host: host.clone(),
721 database: database.clone(),
722 cursor_name: cursor_name.to_owned(),
723 subscription_name: cursor.subscription_name().to_owned(),
724 state: cursor.state_info_string(),
725 idle_duration_ms: cursor.idle_duration().as_millis() as i64,
726 });
727 },
728 )
729 .await;
730 }
731
732 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
733 .rows(rows)
734 .into());
735 }
736 };
737
738 let rows = names
739 .into_iter()
740 .filter(|arg| match &filter {
741 Some(ShowStatementFilter::Like(pattern)) => like_default(arg, pattern),
742 Some(ShowStatementFilter::ILike(pattern)) => i_like_default(arg, pattern),
743 Some(ShowStatementFilter::Where(..)) => unreachable!(),
744 None => true,
745 })
746 .map(|name| ShowObjectRow { name });
747
748 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
749 .rows(rows)
750 .into())
751}
752
753pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
754 fields_to_descriptors(ShowCreateObjectRow::fields())
755}
756
757pub fn handle_show_create_object(
758 handle_args: HandlerArgs,
759 show_create_type: ShowCreateType,
760 name: ObjectName,
761) -> Result<RwPgResponse> {
762 let session = handle_args.session;
763 let catalog_reader = session.env().catalog_reader().read_guard();
764 let database = session.database();
765 let (schema_name, object_name) =
766 Binder::resolve_schema_qualified_name(&database, name.clone())?;
767 let search_path = session.config().search_path();
768 let user_name = &session.user_name();
769 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
770 let user_reader = session.env().user_info_reader().read_guard();
771 let current_user = user_reader
772 .get_user_by_name(user_name)
773 .expect("user not found");
774
775 let (sql, schema_name) = match show_create_type {
776 ShowCreateType::MaterializedView => {
777 let (mv, schema) = schema_path
778 .try_find(|schema_name| {
779 Ok::<_, RwError>(
780 catalog_reader
781 .get_schema_by_name(&database, schema_name)?
782 .get_created_table_by_name(&object_name)
783 .filter(|t| {
784 t.is_mview()
785 && has_access_to_object(
786 current_user,
787 schema_name,
788 t.id.table_id,
789 t.owner,
790 )
791 }),
792 )
793 })?
794 .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
795 (mv.create_sql(), schema)
796 }
797 ShowCreateType::View => {
798 let (view, schema) =
799 catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
800 if !view.is_system_view()
801 && !has_access_to_object(current_user, schema, view.id, view.owner)
802 {
803 return Err(CatalogError::NotFound("view", name.to_string()).into());
804 }
805 (view.create_sql(schema.to_owned()), schema)
806 }
807 ShowCreateType::Table => {
808 let (table, schema) = schema_path
809 .try_find(|schema_name| {
810 Ok::<_, RwError>(
811 catalog_reader
812 .get_schema_by_name(&database, schema_name)?
813 .get_created_table_by_name(&object_name)
814 .filter(|t| {
815 t.is_user_table()
816 && has_access_to_object(
817 current_user,
818 schema_name,
819 t.id.table_id,
820 t.owner,
821 )
822 }),
823 )
824 })?
825 .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
826
827 (table.create_sql_purified(), schema)
828 }
829 ShowCreateType::Sink => {
830 let (sink, schema) =
831 catalog_reader.get_sink_by_name(&database, schema_path, &object_name)?;
832 if !has_access_to_object(current_user, schema, sink.id.sink_id, sink.owner.user_id) {
833 return Err(CatalogError::NotFound("sink", name.to_string()).into());
834 }
835 (sink.create_sql(), schema)
836 }
837 ShowCreateType::Source => {
838 let (source, schema) = schema_path
839 .try_find(|schema_name| {
840 Ok::<_, RwError>(
841 catalog_reader
842 .get_schema_by_name(&database, schema_name)?
843 .get_source_by_name(&object_name)
844 .filter(|s| {
845 s.associated_table_id.is_none()
846 && has_access_to_object(
847 current_user,
848 schema_name,
849 s.id,
850 s.owner,
851 )
852 }),
853 )
854 })?
855 .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
856 (source.create_sql_purified(), schema)
857 }
858 ShowCreateType::Index => {
859 let (index, schema) = schema_path
860 .try_find(|schema_name| {
861 Ok::<_, RwError>(
862 catalog_reader
863 .get_schema_by_name(&database, schema_name)?
864 .get_created_table_by_name(&object_name)
865 .filter(|t| {
866 t.is_index()
867 && has_access_to_object(
868 current_user,
869 schema_name,
870 t.id.table_id,
871 t.owner,
872 )
873 }),
874 )
875 })?
876 .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
877 (index.create_sql(), schema)
878 }
879 ShowCreateType::Function => {
880 bail_not_implemented!("show create on: {}", show_create_type);
881 }
882 ShowCreateType::Subscription => {
883 let (subscription, schema) =
884 catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
885 if !has_access_to_object(
886 current_user,
887 schema,
888 subscription.id.subscription_id,
889 subscription.owner.user_id,
890 ) {
891 return Err(CatalogError::NotFound("subscription", name.to_string()).into());
892 }
893 (subscription.create_sql(), schema)
894 }
895 };
896 let name = format!("{}.{}", schema_name, object_name);
897
898 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
899 .rows([ShowCreateObjectRow {
900 name,
901 create_sql: sql,
902 }])
903 .into())
904}
905
906async fn show_process_list_impl(
907 frontend_client_pool: FrontendClientPoolRef,
908 worker_node_manager: WorkerNodeManagerRef,
909) -> Vec<ShowProcessListRow> {
910 fn on_error(worker_id: u32, err_msg: String) -> Vec<ShowProcessListRow> {
912 vec![ShowProcessListRow {
913 worker_id: format!("{}", worker_id),
914 id: "".to_owned(),
915 user: "".to_owned(),
916 host: "".to_owned(),
917 database: "".to_owned(),
918 time: None,
919 info: Some(format!(
920 "Failed to show process list from worker {worker_id} due to: {err_msg}"
921 )),
922 }]
923 }
924 let futures = worker_node_manager
925 .list_frontend_nodes()
926 .into_iter()
927 .map(|worker| {
928 let frontend_client_pool_ = frontend_client_pool.clone();
929 async move {
930 let client = match frontend_client_pool_.get(&worker).await {
931 Ok(client) => client,
932 Err(e) => {
933 return on_error(worker.id, format!("{}", e.as_report()));
934 }
935 };
936 let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
937 Ok(resp) => resp,
938 Err(e) => {
939 return on_error(worker.id, format!("{}", e.as_report()));
940 }
941 };
942 resp.into_inner()
943 .running_sqls
944 .into_iter()
945 .map(|sql| ShowProcessListRow {
946 worker_id: format!("{}", worker.id),
947 id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
948 user: sql.user_name,
949 host: sql.peer_addr,
950 database: sql.database,
951 time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
952 info: sql
953 .sql
954 .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
955 })
956 .collect_vec()
957 }
958 })
959 .collect_vec();
960 join_all(futures).await.into_iter().flatten().collect()
961}
962
963#[cfg(test)]
964mod tests {
965 use std::ops::Index;
966
967 use futures_async_stream::for_await;
968
969 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
970
971 #[tokio::test]
972 async fn test_show_source() {
973 let frontend = LocalFrontend::new(Default::default()).await;
974
975 let sql = r#"CREATE SOURCE t1 (column1 varchar)
976 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
977 FORMAT PLAIN ENCODE JSON"#;
978 frontend.run_sql(sql).await.unwrap();
979
980 let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
981 rows.sort();
982 assert_eq!(rows, vec!["Row([Some(b\"t1\")])".to_owned(),]);
983 }
984
985 #[tokio::test]
986 async fn test_show_column() {
987 let proto_file = create_proto_file(PROTO_FILE_DATA);
988 let sql = format!(
989 r#"CREATE SOURCE t
990 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
991 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
992 proto_file.path().to_str().unwrap()
993 );
994 let frontend = LocalFrontend::new(Default::default()).await;
995 frontend.run_sql(sql).await.unwrap();
996
997 let sql = "show columns from t";
998 let mut pg_response = frontend.run_sql(sql).await.unwrap();
999
1000 let mut columns = Vec::new();
1001 #[for_await]
1002 for row_set in pg_response.values_stream() {
1003 let row_set = row_set.unwrap();
1004 for row in row_set {
1005 columns.push((
1006 std::str::from_utf8(row.index(0).as_ref().unwrap())
1007 .unwrap()
1008 .to_owned(),
1009 std::str::from_utf8(row.index(1).as_ref().unwrap())
1010 .unwrap()
1011 .to_owned(),
1012 ));
1013 }
1014 }
1015
1016 expect_test::expect![[r#"
1017 [
1018 (
1019 "id",
1020 "integer",
1021 ),
1022 (
1023 "country",
1024 "struct",
1025 ),
1026 (
1027 "country.address",
1028 "character varying",
1029 ),
1030 (
1031 "country.city",
1032 "struct",
1033 ),
1034 (
1035 "country.city.address",
1036 "character varying",
1037 ),
1038 (
1039 "country.city.zipcode",
1040 "character varying",
1041 ),
1042 (
1043 "country.zipcode",
1044 "character varying",
1045 ),
1046 (
1047 "zipcode",
1048 "bigint",
1049 ),
1050 (
1051 "rate",
1052 "real",
1053 ),
1054 (
1055 "_rw_kafka_timestamp",
1056 "timestamp with time zone",
1057 ),
1058 (
1059 "_rw_kafka_partition",
1060 "character varying",
1061 ),
1062 (
1063 "_rw_kafka_offset",
1064 "character varying",
1065 ),
1066 (
1067 "_row_id",
1068 "serial",
1069 ),
1070 ]
1071 "#]]
1072 .assert_debug_eq(&columns);
1073 }
1074}