1use std::sync::Arc;
16
17use futures::future::join_all;
18use itertools::Itertools;
19use pgwire::pg_field_descriptor::PgFieldDescriptor;
20use pgwire::pg_protocol::truncated_fmt;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::pg_server::Session;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
26use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
27use risingwave_common::types::{DataType, Datum, Fields, Timestamptz, ToOwnedDatum, WithDataType};
28use risingwave_common::util::addr::HostAddr;
29use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
30use risingwave_expr::scalar::like::{i_like_default, like_default};
31use risingwave_pb::catalog::connection;
32use risingwave_pb::frontend_service::GetRunningSqlsRequest;
33use risingwave_rpc_client::FrontendClientPoolRef;
34use risingwave_sqlparser::ast::{
35 Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
36};
37use thiserror_ext::AsReport;
38
39use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
40use crate::binder::{Binder, Relation};
41use crate::catalog::catalog_service::CatalogReadGuard;
42use crate::catalog::root_catalog::SchemaPath;
43use crate::catalog::schema_catalog::SchemaCatalog;
44use crate::catalog::{CatalogError, IndexCatalog};
45use crate::error::{Result, RwError};
46use crate::handler::HandlerArgs;
47use crate::handler::create_connection::print_connection_params;
48use crate::session::cursor_manager::SubscriptionCursor;
49use crate::session::{SessionImpl, WorkerProcessId};
50use crate::user::has_access_to_object;
51
52pub fn get_columns_from_table(
53 session: &SessionImpl,
54 table_name: ObjectName,
55) -> Result<Vec<ColumnCatalog>> {
56 let mut binder = Binder::new_for_system(session);
57 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
58 let column_catalogs = match relation {
59 Relation::Source(s) => s.catalog.columns,
60 Relation::BaseTable(t) => t.table_catalog.columns.clone(),
61 Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
62 _ => {
63 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
64 }
65 };
66
67 Ok(column_catalogs)
68}
69
70pub fn get_columns_from_sink(
71 session: &SessionImpl,
72 sink_name: ObjectName,
73) -> Result<Vec<ColumnCatalog>> {
74 let binder = Binder::new_for_system(session);
75 let sink = binder.bind_sink_by_name(sink_name.clone())?;
76 Ok(sink.sink_catalog.full_columns().to_vec())
77}
78
79pub fn get_columns_from_view(
80 session: &SessionImpl,
81 view_name: ObjectName,
82) -> Result<Vec<ColumnCatalog>> {
83 let binder = Binder::new_for_system(session);
84 let view = binder.bind_view_by_name(view_name.clone())?;
85
86 Ok(view
87 .view_catalog
88 .columns
89 .iter()
90 .enumerate()
91 .map(|(idx, field)| ColumnCatalog {
92 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
93 is_hidden: false,
94 })
95 .collect())
96}
97
98pub fn get_indexes_from_table(
99 session: &SessionImpl,
100 table_name: ObjectName,
101) -> Result<Vec<Arc<IndexCatalog>>> {
102 let mut binder = Binder::new_for_system(session);
103 let relation = binder.bind_relation_by_name(&table_name, None, None, false)?;
104 let indexes = match relation {
105 Relation::BaseTable(t) => t.table_indexes,
106 _ => {
107 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
108 }
109 };
110
111 Ok(indexes)
112}
113
114fn schema_or_search_path(
115 session: &Arc<SessionImpl>,
116 schema: &Option<Ident>,
117 search_path: &SearchPath,
118) -> Vec<String> {
119 if let Some(s) = schema {
120 vec![s.real_value()]
121 } else {
122 search_path
123 .real_path()
124 .iter()
125 .map(|s| {
126 if s.eq(USER_NAME_WILD_CARD) {
127 session.user_name()
128 } else {
129 s.clone()
130 }
131 })
132 .collect()
133 }
134}
135
136fn iter_schema_items<F, T>(
137 session: &Arc<SessionImpl>,
138 schema: &Option<Ident>,
139 reader: &CatalogReadGuard,
140 mut f: F,
141) -> Vec<T>
142where
143 F: FnMut(&SchemaCatalog) -> Vec<T>,
144{
145 let search_path = session.config().search_path();
146
147 schema_or_search_path(session, schema, &search_path)
148 .into_iter()
149 .filter_map(|schema| {
150 reader
151 .get_schema_by_name(&session.database(), schema.as_ref())
152 .ok()
153 })
154 .flat_map(|s| f(s).into_iter())
155 .collect()
156}
157
158#[derive(Fields)]
159#[fields(style = "Title Case")]
160struct ShowObjectRow {
161 name: String,
162}
163
164#[derive(Fields)]
165#[fields(style = "Title Case")]
166pub struct ShowColumnRow {
167 pub name: ShowColumnName,
168 pub r#type: String,
169 pub is_hidden: Option<String>, pub description: Option<String>,
171}
172
173#[derive(Clone, Debug)]
174enum ShowColumnNameSegment {
175 Field(Ident),
176 ListElement,
177}
178
179impl ShowColumnNameSegment {
180 pub fn field(name: &str) -> Self {
181 ShowColumnNameSegment::Field(Ident::from_real_value(name))
182 }
183}
184
185#[derive(Clone, Debug)]
187pub struct ShowColumnName(Vec<ShowColumnNameSegment>);
188
189impl ShowColumnName {
190 pub fn special(name: &str) -> Self {
193 ShowColumnName(vec![ShowColumnNameSegment::Field(Ident::new_unchecked(
194 name,
195 ))])
196 }
197}
198
199impl WithDataType for ShowColumnName {
200 fn default_data_type() -> DataType {
201 DataType::Varchar
202 }
203}
204
205impl ToOwnedDatum for ShowColumnName {
206 fn to_owned_datum(self) -> Datum {
207 use std::fmt::Write;
208
209 let mut s = String::new();
210 for segment in self.0 {
211 match segment {
212 ShowColumnNameSegment::Field(ident) => {
213 if !s.is_empty() {
214 s.push('.');
216 }
217 write!(s, "{ident}").unwrap();
218 }
219 ShowColumnNameSegment::ListElement => {
220 s.push_str("[1]");
221 }
222 }
223 }
224 s.to_owned_datum()
225 }
226}
227
228impl ShowColumnRow {
229 fn flatten(
232 name: ShowColumnName,
233 data_type: DataType,
234 is_hidden: bool,
235 description: Option<String>,
236 ) -> Vec<Self> {
237 let r#type = match &data_type {
239 DataType::Struct(_) => "struct".to_owned(),
240 DataType::List(box DataType::Struct(_)) => "struct[]".to_owned(),
241 d => d.to_string(),
242 };
243
244 let mut rows = vec![ShowColumnRow {
245 name: name.clone(),
246 r#type,
247 is_hidden: Some(is_hidden.to_string()),
248 description,
249 }];
250
251 match data_type {
252 DataType::Struct(st) => {
253 rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
254 let mut name = name.clone();
255 name.0.push(ShowColumnNameSegment::field(field_name));
256 Self::flatten(name, field_data_type.clone(), is_hidden, None)
257 }));
258 }
259
260 DataType::List(inner @ box DataType::Struct(_)) => {
261 let mut name = name.clone();
262 name.0.push(ShowColumnNameSegment::ListElement);
263 rows.extend(Self::flatten(name, *inner, is_hidden, None));
264 }
265
266 _ => {}
267 }
268
269 rows
270 }
271
272 pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
273 Self::flatten(
274 ShowColumnName(vec![ShowColumnNameSegment::field(&col.column_desc.name)]),
275 col.column_desc.data_type,
276 col.is_hidden,
277 col.column_desc.description,
278 )
279 }
280}
281
282#[derive(Fields)]
283#[fields(style = "Title Case")]
284struct ShowConnectionRow {
285 name: String,
286 r#type: String,
287 properties: String,
288}
289
290#[derive(Fields)]
291#[fields(style = "Title Case")]
292struct ShowFunctionRow {
293 name: String,
294 arguments: String,
295 return_type: String,
296 language: String,
297 link: Option<String>,
298}
299
300#[derive(Fields)]
301#[fields(style = "Title Case")]
302struct ShowIndexRow {
303 name: String,
304 on: String,
305 key: String,
306 include: String,
307 distributed_by: String,
308}
309
310impl From<Arc<IndexCatalog>> for ShowIndexRow {
311 fn from(index: Arc<IndexCatalog>) -> Self {
312 let index_display = index.display();
313 ShowIndexRow {
314 name: index.name.clone(),
315 on: index.primary_table.name.clone(),
316 key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
317 include: display_comma_separated(&index_display.include_columns).to_string(),
318 distributed_by: display_comma_separated(&index_display.distributed_by_columns)
319 .to_string(),
320 }
321 }
322}
323
324#[derive(Fields)]
325#[fields(style = "Title Case")]
326struct ShowClusterRow {
327 id: i32,
328 addr: String,
329 r#type: String,
330 state: String,
331 parallelism: Option<i32>,
332 is_streaming: Option<bool>,
333 is_serving: Option<bool>,
334 is_unschedulable: Option<bool>,
335 started_at: Option<Timestamptz>,
336}
337
338#[derive(Fields)]
339#[fields(style = "Title Case")]
340struct ShowJobRow {
341 id: i64,
342 statement: String,
343 create_type: String,
344 progress: String,
345}
346
347#[derive(Fields)]
348#[fields(style = "Title Case")]
349struct ShowProcessListRow {
350 worker_id: String,
351 id: String,
352 user: String,
353 host: String,
354 database: String,
355 time: Option<String>,
356 info: Option<String>,
357}
358
359#[derive(Fields)]
360#[fields(style = "Title Case")]
361struct ShowCreateObjectRow {
362 name: String,
363 create_sql: String,
364}
365
366#[derive(Fields)]
367#[fields(style = "Title Case")]
368struct ShowSubscriptionRow {
369 name: String,
370 retention_seconds: i64,
371}
372
373#[derive(Fields)]
374#[fields(style = "Title Case")]
375struct ShowCursorRow {
376 session_id: String,
377 user: String,
378 host: String,
379 database: String,
380 cursor_name: String,
381}
382
383#[derive(Fields)]
384#[fields(style = "Title Case")]
385struct ShowSubscriptionCursorRow {
386 session_id: String,
387 user: String,
388 host: String,
389 database: String,
390 cursor_name: String,
391 subscription_name: String,
392 state: String,
393 idle_duration_ms: i64,
394}
395
396pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
398 fields_to_descriptors(match objects {
399 ShowObject::Columns { .. } => ShowColumnRow::fields(),
400 ShowObject::Connection { .. } => ShowConnectionRow::fields(),
401 ShowObject::Function { .. } => ShowFunctionRow::fields(),
402 ShowObject::Indexes { .. } => ShowIndexRow::fields(),
403 ShowObject::Cluster => ShowClusterRow::fields(),
404 ShowObject::Jobs => ShowJobRow::fields(),
405 ShowObject::ProcessList => ShowProcessListRow::fields(),
406 _ => ShowObjectRow::fields(),
407 })
408}
409
410pub async fn handle_show_object(
411 handler_args: HandlerArgs,
412 command: ShowObject,
413 filter: Option<ShowStatementFilter>,
414) -> Result<RwPgResponse> {
415 let session = handler_args.session;
416
417 if let Some(ShowStatementFilter::Where(..)) = filter {
418 bail_not_implemented!("WHERE clause in SHOW statement");
419 }
420
421 let catalog_reader = session.env().catalog_reader();
422 let user_reader = session.env().user_info_reader();
423
424 let names = match command {
425 ShowObject::Table { schema } => {
426 let reader = catalog_reader.read_guard();
427 let user_reader = user_reader.read_guard();
428 let current_user = user_reader
429 .get_user_by_name(&session.user_name())
430 .expect("user not found");
431 iter_schema_items(&session, &schema, &reader, |schema| {
432 schema
433 .iter_user_table_with_acl(current_user)
434 .map(|t| t.name.clone())
435 .collect()
436 })
437 }
438 ShowObject::InternalTable { schema } => {
439 let reader = catalog_reader.read_guard();
440 let user_reader = user_reader.read_guard();
441 let current_user = user_reader
442 .get_user_by_name(&session.user_name())
443 .expect("user not found");
444 iter_schema_items(&session, &schema, &reader, |schema| {
445 schema
446 .iter_internal_table_with_acl(current_user)
447 .map(|t| t.name.clone())
448 .collect()
449 })
450 }
451 ShowObject::Database => catalog_reader.read_guard().get_all_database_names(),
452 ShowObject::Schema => catalog_reader
453 .read_guard()
454 .get_all_schema_names(&session.database())?,
455 ShowObject::View { schema } => {
456 let reader = catalog_reader.read_guard();
457 let user_reader = user_reader.read_guard();
458 let current_user = user_reader
459 .get_user_by_name(&session.user_name())
460 .expect("user not found");
461 iter_schema_items(&session, &schema, &reader, |schema| {
462 schema
463 .iter_view_with_acl(current_user)
464 .map(|t| t.name.clone())
465 .collect()
466 })
467 }
468 ShowObject::MaterializedView { schema } => {
469 let reader = catalog_reader.read_guard();
470 let user_reader = user_reader.read_guard();
471 let current_user = user_reader
472 .get_user_by_name(&session.user_name())
473 .expect("user not found");
474 iter_schema_items(&session, &schema, &reader, |schema| {
475 schema
476 .iter_created_mvs_with_acl(current_user)
477 .map(|t| t.name.clone())
478 .collect()
479 })
480 }
481 ShowObject::Source { schema } => {
482 let reader = catalog_reader.read_guard();
483 let user_reader = user_reader.read_guard();
484 let current_user = user_reader
485 .get_user_by_name(&session.user_name())
486 .expect("user not found");
487 let mut sources = iter_schema_items(&session, &schema, &reader, |schema| {
488 schema
489 .iter_source_with_acl(current_user)
490 .map(|t| t.name.clone())
491 .collect()
492 });
493 sources.extend(session.temporary_source_manager().keys());
494 sources
495 }
496 ShowObject::Sink { schema } => {
497 let reader = catalog_reader.read_guard();
498 let user_reader = user_reader.read_guard();
499 let current_user = user_reader
500 .get_user_by_name(&session.user_name())
501 .expect("user not found");
502 iter_schema_items(&session, &schema, &reader, |schema| {
503 schema
504 .iter_sink_with_acl(current_user)
505 .map(|t| t.name.clone())
506 .collect()
507 })
508 }
509 ShowObject::Subscription { schema } => {
510 let reader = catalog_reader.read_guard();
511 let rows = iter_schema_items(&session, &schema, &reader, |schema| {
512 schema
513 .iter_subscription()
514 .map(|t| ShowSubscriptionRow {
515 name: t.name.clone(),
516 retention_seconds: t.retention_seconds as i64,
517 })
518 .collect()
519 });
520 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
521 .rows(rows)
522 .into());
523 }
524 ShowObject::Secret { schema } => {
525 let reader = catalog_reader.read_guard();
526 iter_schema_items(&session, &schema, &reader, |schema| {
527 schema.iter_secret().map(|t| t.name.clone()).collect()
528 })
529 }
530 ShowObject::Columns { table } => {
531 let Ok(columns) = get_columns_from_table(&session, table.clone())
532 .or(get_columns_from_sink(&session, table.clone()))
533 .or(get_columns_from_view(&session, table.clone()))
534 else {
535 return Err(CatalogError::NotFound(
536 "table, source, sink or view",
537 table.to_string(),
538 )
539 .into());
540 };
541
542 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
543 .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
544 .into());
545 }
546 ShowObject::Indexes { table } => {
547 let indexes = get_indexes_from_table(&session, table)?;
548
549 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
550 .rows(indexes.into_iter().map(ShowIndexRow::from))
551 .into());
552 }
553 ShowObject::Connection { schema } => {
554 let reader = catalog_reader.read_guard();
555 let rows = iter_schema_items(&session, &schema, &reader, |schema| {
556 schema.iter_connections()
557 .map(|c| {
558 let name = c.name.clone();
559 let r#type = match &c.info {
560 connection::Info::PrivateLinkService(_) => {
561 PRIVATELINK_CONNECTION.to_owned()
562 },
563 connection::Info::ConnectionParams(params) => {
564 params.get_connection_type().unwrap().as_str_name().to_owned()
565 }
566 };
567 let source_names = schema
568 .get_source_ids_by_connection(c.id)
569 .unwrap_or_default()
570 .into_iter()
571 .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
572 .collect_vec();
573 let sink_names = schema
574 .get_sink_ids_by_connection(c.id)
575 .unwrap_or_default()
576 .into_iter()
577 .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
578 .collect_vec();
579 let properties = match &c.info {
580 connection::Info::PrivateLinkService(i) => {
581 format!(
582 "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
583 i.get_provider().unwrap().as_str_name(),
584 i.service_name,
585 i.endpoint_id,
586 serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
587 serde_json::to_string(&source_names).unwrap(),
588 serde_json::to_string(&sink_names).unwrap(),
589 )
590 }
591 connection::Info::ConnectionParams(params) => {
592 print_connection_params(&session.database(), params, &reader)
594 }
595 };
596 ShowConnectionRow {
597 name,
598 r#type,
599 properties,
600 }
601 }).collect_vec()
602 });
603 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
604 .rows(rows)
605 .into());
606 }
607 ShowObject::Function { schema } => {
608 let reader = catalog_reader.read_guard();
609 let rows = iter_schema_items(&session, &schema, &reader, |schema| {
610 schema
611 .iter_function()
612 .map(|t| ShowFunctionRow {
613 name: t.name.clone(),
614 arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
615 return_type: t.return_type.to_string(),
616 language: t.language.clone(),
617 link: t.link.clone(),
618 })
619 .collect()
620 });
621 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
622 .rows(rows)
623 .into());
624 }
625 ShowObject::Cluster => {
626 let workers = session.env().meta_client().list_all_nodes().await?;
627 let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
628 let addr: HostAddr = worker.host.as_ref().unwrap().into();
629 let property = worker.property.as_ref();
630 ShowClusterRow {
631 id: worker.id as _,
632 addr: addr.to_string(),
633 r#type: worker.get_type().unwrap().as_str_name().into(),
634 state: worker.get_state().unwrap().as_str_name().to_owned(),
635 parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
636 is_streaming: property.map(|p| p.is_streaming),
637 is_serving: property.map(|p| p.is_serving),
638 is_unschedulable: property.map(|p| p.is_unschedulable),
639 started_at: worker
640 .started_at
641 .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
642 }
643 });
644 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
645 .rows(rows)
646 .into());
647 }
648 ShowObject::Jobs => {
649 let resp = session.env().meta_client().get_ddl_progress().await?;
650 let rows = resp.into_iter().map(|job| ShowJobRow {
651 id: job.id as i64,
652 statement: job.statement,
653 create_type: job.create_type,
654 progress: job.progress,
655 });
656 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
657 .rows(rows)
658 .into());
659 }
660 ShowObject::ProcessList => {
661 let rows = show_process_list_impl(
662 session.env().frontend_client_pool(),
663 session.env().worker_node_manager_ref(),
664 )
665 .await;
666 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
667 .rows(rows)
668 .into());
669 }
670 ShowObject::Cursor => {
671 let sessions = session
672 .env()
673 .sessions_map()
674 .read()
675 .values()
676 .cloned()
677 .collect_vec();
678 let mut rows = vec![];
679 for s in sessions {
680 let session_id = format!("{}", s.id().0);
681 let user = s.user_name();
682 let host = format!("{}", s.peer_addr());
683 let database = s.database();
684
685 s.get_cursor_manager()
686 .iter_query_cursors(|cursor_name: &String, _| {
687 rows.push(ShowCursorRow {
688 session_id: session_id.clone(),
689 user: user.clone(),
690 host: host.clone(),
691 database: database.clone(),
692 cursor_name: cursor_name.to_owned(),
693 });
694 })
695 .await;
696 }
697 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
698 .rows(rows)
699 .into());
700 }
701 ShowObject::SubscriptionCursor => {
702 let sessions = session
703 .env()
704 .sessions_map()
705 .read()
706 .values()
707 .cloned()
708 .collect_vec();
709 let mut rows = vec![];
710 for s in sessions {
711 let session_id = format!("{}", s.id().0);
712 let user = s.user_name();
713 let host = format!("{}", s.peer_addr());
714 let database = s.database().to_owned();
715
716 s.get_cursor_manager()
717 .iter_subscription_cursors(
718 |cursor_name: &String, cursor: &SubscriptionCursor| {
719 rows.push(ShowSubscriptionCursorRow {
720 session_id: session_id.clone(),
721 user: user.clone(),
722 host: host.clone(),
723 database: database.clone(),
724 cursor_name: cursor_name.to_owned(),
725 subscription_name: cursor.subscription_name().to_owned(),
726 state: cursor.state_info_string(),
727 idle_duration_ms: cursor.idle_duration().as_millis() as i64,
728 });
729 },
730 )
731 .await;
732 }
733
734 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
735 .rows(rows)
736 .into());
737 }
738 };
739
740 let rows = names
741 .into_iter()
742 .filter(|arg| match &filter {
743 Some(ShowStatementFilter::Like(pattern)) => like_default(arg, pattern),
744 Some(ShowStatementFilter::ILike(pattern)) => i_like_default(arg, pattern),
745 Some(ShowStatementFilter::Where(..)) => unreachable!(),
746 None => true,
747 })
748 .map(|name| ShowObjectRow { name });
749
750 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
751 .rows(rows)
752 .into())
753}
754
755pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
756 fields_to_descriptors(ShowCreateObjectRow::fields())
757}
758
759pub fn handle_show_create_object(
760 handle_args: HandlerArgs,
761 show_create_type: ShowCreateType,
762 name: ObjectName,
763) -> Result<RwPgResponse> {
764 let session = handle_args.session;
765 let catalog_reader = session.env().catalog_reader().read_guard();
766 let database = session.database();
767 let (schema_name, object_name) = Binder::resolve_schema_qualified_name(&database, &name)?;
768 let search_path = session.config().search_path();
769 let user_name = &session.user_name();
770 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
771 let user_reader = session.env().user_info_reader().read_guard();
772 let current_user = user_reader
773 .get_user_by_name(user_name)
774 .expect("user not found");
775
776 let (sql, schema_name) = match show_create_type {
777 ShowCreateType::MaterializedView => {
778 let (mv, schema) = schema_path
779 .try_find(|schema_name| {
780 Ok::<_, RwError>(
781 catalog_reader
782 .get_schema_by_name(&database, schema_name)?
783 .get_created_table_by_name(&object_name)
784 .filter(|t| {
785 t.is_mview()
786 && has_access_to_object(
787 current_user,
788 schema_name,
789 t.id.table_id,
790 t.owner,
791 )
792 }),
793 )
794 })?
795 .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
796 (mv.create_sql(), schema)
797 }
798 ShowCreateType::View => {
799 let (view, schema) =
800 catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
801 if !view.is_system_view()
802 && !has_access_to_object(current_user, schema, view.id, view.owner)
803 {
804 return Err(CatalogError::NotFound("view", name.to_string()).into());
805 }
806 (view.create_sql(schema.to_owned()), schema)
807 }
808 ShowCreateType::Table => {
809 let (table, schema) = schema_path
810 .try_find(|schema_name| {
811 Ok::<_, RwError>(
812 catalog_reader
813 .get_schema_by_name(&database, schema_name)?
814 .get_created_table_by_name(&object_name)
815 .filter(|t| {
816 t.is_user_table()
817 && has_access_to_object(
818 current_user,
819 schema_name,
820 t.id.table_id,
821 t.owner,
822 )
823 }),
824 )
825 })?
826 .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
827
828 (table.create_sql_purified(), schema)
829 }
830 ShowCreateType::Sink => {
831 let (sink, schema) =
832 catalog_reader.get_any_sink_by_name(&database, schema_path, &object_name)?;
833 if !has_access_to_object(current_user, schema, sink.id.sink_id, sink.owner.user_id) {
834 return Err(CatalogError::NotFound("sink", name.to_string()).into());
835 }
836 (sink.create_sql(), schema)
837 }
838 ShowCreateType::Source => {
839 let (source, schema) = schema_path
840 .try_find(|schema_name| {
841 Ok::<_, RwError>(
842 catalog_reader
843 .get_schema_by_name(&database, schema_name)?
844 .get_source_by_name(&object_name)
845 .filter(|s| {
846 s.associated_table_id.is_none()
847 && has_access_to_object(
848 current_user,
849 schema_name,
850 s.id,
851 s.owner,
852 )
853 }),
854 )
855 })?
856 .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
857 (source.create_sql_purified(), schema)
858 }
859 ShowCreateType::Index => {
860 let (index, schema) = schema_path
861 .try_find(|schema_name| {
862 Ok::<_, RwError>(
863 catalog_reader
864 .get_schema_by_name(&database, schema_name)?
865 .get_created_table_by_name(&object_name)
866 .filter(|t| {
867 t.is_index()
868 && has_access_to_object(
869 current_user,
870 schema_name,
871 t.id.table_id,
872 t.owner,
873 )
874 }),
875 )
876 })?
877 .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
878 (index.create_sql(), schema)
879 }
880 ShowCreateType::Function => {
881 bail_not_implemented!("show create on: {}", show_create_type);
882 }
883 ShowCreateType::Subscription => {
884 let (subscription, schema) =
885 catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
886 if !has_access_to_object(
887 current_user,
888 schema,
889 subscription.id.subscription_id,
890 subscription.owner.user_id,
891 ) {
892 return Err(CatalogError::NotFound("subscription", name.to_string()).into());
893 }
894 (subscription.create_sql(), schema)
895 }
896 };
897 let name = format!("{}.{}", schema_name, object_name);
898
899 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
900 .rows([ShowCreateObjectRow {
901 name,
902 create_sql: sql,
903 }])
904 .into())
905}
906
907async fn show_process_list_impl(
908 frontend_client_pool: FrontendClientPoolRef,
909 worker_node_manager: WorkerNodeManagerRef,
910) -> Vec<ShowProcessListRow> {
911 fn on_error(worker_id: u32, err_msg: String) -> Vec<ShowProcessListRow> {
913 vec![ShowProcessListRow {
914 worker_id: format!("{}", worker_id),
915 id: "".to_owned(),
916 user: "".to_owned(),
917 host: "".to_owned(),
918 database: "".to_owned(),
919 time: None,
920 info: Some(format!(
921 "Failed to show process list from worker {worker_id} due to: {err_msg}"
922 )),
923 }]
924 }
925 let futures = worker_node_manager
926 .list_frontend_nodes()
927 .into_iter()
928 .map(|worker| {
929 let frontend_client_pool_ = frontend_client_pool.clone();
930 async move {
931 let client = match frontend_client_pool_.get(&worker).await {
932 Ok(client) => client,
933 Err(e) => {
934 return on_error(worker.id, format!("{}", e.as_report()));
935 }
936 };
937 let resp = match client.get_running_sqls(GetRunningSqlsRequest {}).await {
938 Ok(resp) => resp,
939 Err(e) => {
940 return on_error(worker.id, format!("{}", e.as_report()));
941 }
942 };
943 resp.into_inner()
944 .running_sqls
945 .into_iter()
946 .map(|sql| ShowProcessListRow {
947 worker_id: format!("{}", worker.id),
948 id: format!("{}", WorkerProcessId::new(worker.id, sql.process_id)),
949 user: sql.user_name,
950 host: sql.peer_addr,
951 database: sql.database,
952 time: sql.elapsed_millis.map(|mills| format!("{}ms", mills)),
953 info: sql
954 .sql
955 .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
956 })
957 .collect_vec()
958 }
959 })
960 .collect_vec();
961 join_all(futures).await.into_iter().flatten().collect()
962}
963
964#[cfg(test)]
965mod tests {
966 use std::ops::Index;
967
968 use futures_async_stream::for_await;
969
970 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
971
972 #[tokio::test]
973 async fn test_show_source() {
974 let frontend = LocalFrontend::new(Default::default()).await;
975
976 let sql = r#"CREATE SOURCE t1 (column1 varchar)
977 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
978 FORMAT PLAIN ENCODE JSON"#;
979 frontend.run_sql(sql).await.unwrap();
980
981 let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
982 rows.sort();
983 assert_eq!(rows, vec!["Row([Some(b\"t1\")])".to_owned(),]);
984 }
985
986 #[tokio::test]
987 async fn test_show_column() {
988 let proto_file = create_proto_file(PROTO_FILE_DATA);
989 let sql = format!(
990 r#"CREATE SOURCE t
991 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
992 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
993 proto_file.path().to_str().unwrap()
994 );
995 let frontend = LocalFrontend::new(Default::default()).await;
996 frontend.run_sql(sql).await.unwrap();
997
998 let sql = "show columns from t";
999 let mut pg_response = frontend.run_sql(sql).await.unwrap();
1000
1001 let mut columns = Vec::new();
1002 #[for_await]
1003 for row_set in pg_response.values_stream() {
1004 let row_set = row_set.unwrap();
1005 for row in row_set {
1006 columns.push((
1007 std::str::from_utf8(row.index(0).as_ref().unwrap())
1008 .unwrap()
1009 .to_owned(),
1010 std::str::from_utf8(row.index(1).as_ref().unwrap())
1011 .unwrap()
1012 .to_owned(),
1013 ));
1014 }
1015 }
1016
1017 expect_test::expect![[r#"
1018 [
1019 (
1020 "id",
1021 "integer",
1022 ),
1023 (
1024 "country",
1025 "struct",
1026 ),
1027 (
1028 "country.address",
1029 "character varying",
1030 ),
1031 (
1032 "country.city",
1033 "struct",
1034 ),
1035 (
1036 "country.city.address",
1037 "character varying",
1038 ),
1039 (
1040 "country.city.zipcode",
1041 "character varying",
1042 ),
1043 (
1044 "country.zipcode",
1045 "character varying",
1046 ),
1047 (
1048 "zipcode",
1049 "bigint",
1050 ),
1051 (
1052 "rate",
1053 "real",
1054 ),
1055 (
1056 "_rw_kafka_timestamp",
1057 "timestamp with time zone",
1058 ),
1059 (
1060 "_rw_kafka_partition",
1061 "character varying",
1062 ),
1063 (
1064 "_rw_kafka_offset",
1065 "character varying",
1066 ),
1067 (
1068 "_row_id",
1069 "serial",
1070 ),
1071 ]
1072 "#]]
1073 .assert_debug_eq(&columns);
1074 }
1075}