1use std::sync::Arc;
16
17use itertools::Itertools;
18use pgwire::pg_field_descriptor::PgFieldDescriptor;
19use pgwire::pg_protocol::truncated_fmt;
20use pgwire::pg_response::{PgResponse, StatementType};
21use pgwire::pg_server::Session;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
24use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
25use risingwave_common::types::{DataType, Fields, Timestamptz};
26use risingwave_common::util::addr::HostAddr;
27use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
28use risingwave_expr::scalar::like::{i_like_default, like_default};
29use risingwave_pb::catalog::connection;
30use risingwave_sqlparser::ast::{
31 Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, display_comma_separated,
32};
33
34use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
35use crate::binder::{Binder, Relation};
36use crate::catalog::catalog_service::CatalogReadGuard;
37use crate::catalog::root_catalog::SchemaPath;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::{CatalogError, IndexCatalog};
40use crate::error::{Result, RwError};
41use crate::handler::HandlerArgs;
42use crate::handler::create_connection::print_connection_params;
43use crate::session::SessionImpl;
44use crate::session::cursor_manager::SubscriptionCursor;
45use crate::user::has_access_to_object;
46
47pub fn get_columns_from_table(
48 session: &SessionImpl,
49 table_name: ObjectName,
50) -> Result<Vec<ColumnCatalog>> {
51 let mut binder = Binder::new_for_system(session);
52 let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?;
53 let column_catalogs = match relation {
54 Relation::Source(s) => s.catalog.columns,
55 Relation::BaseTable(t) => t.table_catalog.columns.clone(),
56 Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(),
57 _ => {
58 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
59 }
60 };
61
62 Ok(column_catalogs)
63}
64
65pub fn get_columns_from_sink(
66 session: &SessionImpl,
67 sink_name: ObjectName,
68) -> Result<Vec<ColumnCatalog>> {
69 let binder = Binder::new_for_system(session);
70 let sink = binder.bind_sink_by_name(sink_name.clone())?;
71 Ok(sink.sink_catalog.full_columns().to_vec())
72}
73
74pub fn get_columns_from_view(
75 session: &SessionImpl,
76 view_name: ObjectName,
77) -> Result<Vec<ColumnCatalog>> {
78 let binder = Binder::new_for_system(session);
79 let view = binder.bind_view_by_name(view_name.clone())?;
80
81 Ok(view
82 .view_catalog
83 .columns
84 .iter()
85 .enumerate()
86 .map(|(idx, field)| ColumnCatalog {
87 column_desc: ColumnDesc::from_field_with_column_id(field, idx as _),
88 is_hidden: false,
89 })
90 .collect())
91}
92
93pub fn get_indexes_from_table(
94 session: &SessionImpl,
95 table_name: ObjectName,
96) -> Result<Vec<Arc<IndexCatalog>>> {
97 let mut binder = Binder::new_for_system(session);
98 let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?;
99 let indexes = match relation {
100 Relation::BaseTable(t) => t.table_indexes,
101 _ => {
102 return Err(CatalogError::NotFound("table or source", table_name.to_string()).into());
103 }
104 };
105
106 Ok(indexes)
107}
108
109fn schema_or_search_path(
110 session: &Arc<SessionImpl>,
111 schema: &Option<Ident>,
112 search_path: &SearchPath,
113) -> Vec<String> {
114 if let Some(s) = schema {
115 vec![s.real_value()]
116 } else {
117 search_path
118 .real_path()
119 .iter()
120 .map(|s| {
121 if s.eq(USER_NAME_WILD_CARD) {
122 session.user_name()
123 } else {
124 s.to_string()
125 }
126 })
127 .collect()
128 }
129}
130
131fn iter_schema_items<F, T>(
132 session: &Arc<SessionImpl>,
133 schema: &Option<Ident>,
134 reader: &CatalogReadGuard,
135 mut f: F,
136) -> Vec<T>
137where
138 F: FnMut(&SchemaCatalog) -> Vec<T>,
139{
140 let search_path = session.config().search_path();
141
142 schema_or_search_path(session, schema, &search_path)
143 .into_iter()
144 .filter_map(|schema| {
145 reader
146 .get_schema_by_name(&session.database(), schema.as_ref())
147 .ok()
148 })
149 .flat_map(|s| f(s).into_iter())
150 .collect()
151}
152
153#[derive(Fields)]
154#[fields(style = "Title Case")]
155struct ShowObjectRow {
156 name: String,
157}
158
159#[derive(Fields)]
160#[fields(style = "Title Case")]
161pub struct ShowColumnRow {
162 pub name: String,
163 pub r#type: String,
164 pub is_hidden: Option<String>, pub description: Option<String>,
166}
167
168impl ShowColumnRow {
169 fn flatten(
172 name: String,
173 data_type: DataType,
174 is_hidden: bool,
175 description: Option<String>,
176 ) -> Vec<Self> {
177 let r#type = match &data_type {
179 DataType::Struct(_) => "struct".to_owned(),
180 DataType::List(box DataType::Struct(_)) => "struct[]".to_owned(),
181 d => d.to_string(),
182 };
183
184 let mut rows = vec![ShowColumnRow {
185 name: name.clone(),
186 r#type,
187 is_hidden: Some(is_hidden.to_string()),
188 description,
189 }];
190
191 match data_type {
192 DataType::Struct(st) => {
193 rows.extend(st.iter().flat_map(|(field_name, field_data_type)| {
194 Self::flatten(
195 format!("{}.{}", name, field_name),
196 field_data_type.clone(),
197 is_hidden,
198 None,
199 )
200 }));
201 }
202
203 DataType::List(inner @ box DataType::Struct(_)) => {
204 rows.extend(Self::flatten(
205 format!("{}[1]", name),
206 *inner,
207 is_hidden,
208 None,
209 ));
210 }
211
212 _ => {}
213 }
214
215 rows
216 }
217
218 pub fn from_catalog(col: ColumnCatalog) -> Vec<Self> {
219 Self::flatten(
220 col.column_desc.name,
221 col.column_desc.data_type,
222 col.is_hidden,
223 col.column_desc.description,
224 )
225 }
226}
227
228#[derive(Fields)]
229#[fields(style = "Title Case")]
230struct ShowConnectionRow {
231 name: String,
232 r#type: String,
233 properties: String,
234}
235
236#[derive(Fields)]
237#[fields(style = "Title Case")]
238struct ShowFunctionRow {
239 name: String,
240 arguments: String,
241 return_type: String,
242 language: String,
243 link: Option<String>,
244}
245
246#[derive(Fields)]
247#[fields(style = "Title Case")]
248struct ShowIndexRow {
249 name: String,
250 on: String,
251 key: String,
252 include: String,
253 distributed_by: String,
254}
255
256impl From<Arc<IndexCatalog>> for ShowIndexRow {
257 fn from(index: Arc<IndexCatalog>) -> Self {
258 let index_display = index.display();
259 ShowIndexRow {
260 name: index.name.clone(),
261 on: index.primary_table.name.clone(),
262 key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(),
263 include: display_comma_separated(&index_display.include_columns).to_string(),
264 distributed_by: display_comma_separated(&index_display.distributed_by_columns)
265 .to_string(),
266 }
267 }
268}
269
270#[derive(Fields)]
271#[fields(style = "Title Case")]
272struct ShowClusterRow {
273 id: i32,
274 addr: String,
275 r#type: String,
276 state: String,
277 parallelism: Option<i32>,
278 is_streaming: Option<bool>,
279 is_serving: Option<bool>,
280 is_unschedulable: Option<bool>,
281 started_at: Option<Timestamptz>,
282}
283
284#[derive(Fields)]
285#[fields(style = "Title Case")]
286struct ShowJobRow {
287 id: i64,
288 statement: String,
289 progress: String,
290}
291
292#[derive(Fields)]
293#[fields(style = "Title Case")]
294struct ShowProcessListRow {
295 id: String,
296 user: String,
297 host: String,
298 database: String,
299 time: Option<String>,
300 info: Option<String>,
301}
302
303#[derive(Fields)]
304#[fields(style = "Title Case")]
305struct ShowCreateObjectRow {
306 name: String,
307 create_sql: String,
308}
309
310#[derive(Fields)]
311#[fields(style = "Title Case")]
312struct ShowSubscriptionRow {
313 name: String,
314 retention_seconds: i64,
315}
316
317#[derive(Fields)]
318#[fields(style = "Title Case")]
319struct ShowCursorRow {
320 session_id: String,
321 user: String,
322 host: String,
323 database: String,
324 cursor_name: String,
325}
326
327#[derive(Fields)]
328#[fields(style = "Title Case")]
329struct ShowSubscriptionCursorRow {
330 session_id: String,
331 user: String,
332 host: String,
333 database: String,
334 cursor_name: String,
335 subscription_name: String,
336 state: String,
337 idle_duration_ms: i64,
338}
339
340pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
342 fields_to_descriptors(match objects {
343 ShowObject::Columns { .. } => ShowColumnRow::fields(),
344 ShowObject::Connection { .. } => ShowConnectionRow::fields(),
345 ShowObject::Function { .. } => ShowFunctionRow::fields(),
346 ShowObject::Indexes { .. } => ShowIndexRow::fields(),
347 ShowObject::Cluster => ShowClusterRow::fields(),
348 ShowObject::Jobs => ShowJobRow::fields(),
349 ShowObject::ProcessList => ShowProcessListRow::fields(),
350 _ => ShowObjectRow::fields(),
351 })
352}
353
354pub async fn handle_show_object(
355 handler_args: HandlerArgs,
356 command: ShowObject,
357 filter: Option<ShowStatementFilter>,
358) -> Result<RwPgResponse> {
359 let session = handler_args.session;
360
361 if let Some(ShowStatementFilter::Where(..)) = filter {
362 bail_not_implemented!("WHERE clause in SHOW statement");
363 }
364
365 let catalog_reader = session.env().catalog_reader();
366 let user_reader = session.env().user_info_reader();
367
368 let names = match command {
369 ShowObject::Table { schema } => {
370 let reader = catalog_reader.read_guard();
371 let user_reader = user_reader.read_guard();
372 let current_user = user_reader
373 .get_user_by_name(&session.user_name())
374 .expect("user not found");
375 iter_schema_items(&session, &schema, &reader, |schema| {
376 schema
377 .iter_user_table_with_acl(current_user)
378 .map(|t| t.name.clone())
379 .collect()
380 })
381 }
382 ShowObject::InternalTable { schema } => {
383 let reader = catalog_reader.read_guard();
384 let user_reader = user_reader.read_guard();
385 let current_user = user_reader
386 .get_user_by_name(&session.user_name())
387 .expect("user not found");
388 iter_schema_items(&session, &schema, &reader, |schema| {
389 schema
390 .iter_internal_table_with_acl(current_user)
391 .map(|t| t.name.clone())
392 .collect()
393 })
394 }
395 ShowObject::Database => catalog_reader.read_guard().get_all_database_names(),
396 ShowObject::Schema => catalog_reader
397 .read_guard()
398 .get_all_schema_names(&session.database())?,
399 ShowObject::View { schema } => {
400 let reader = catalog_reader.read_guard();
401 let user_reader = user_reader.read_guard();
402 let current_user = user_reader
403 .get_user_by_name(&session.user_name())
404 .expect("user not found");
405 iter_schema_items(&session, &schema, &reader, |schema| {
406 schema
407 .iter_view_with_acl(current_user)
408 .map(|t| t.name.clone())
409 .collect()
410 })
411 }
412 ShowObject::MaterializedView { schema } => {
413 let reader = catalog_reader.read_guard();
414 let user_reader = user_reader.read_guard();
415 let current_user = user_reader
416 .get_user_by_name(&session.user_name())
417 .expect("user not found");
418 iter_schema_items(&session, &schema, &reader, |schema| {
419 schema
420 .iter_created_mvs_with_acl(current_user)
421 .map(|t| t.name.clone())
422 .collect()
423 })
424 }
425 ShowObject::Source { schema } => {
426 let reader = catalog_reader.read_guard();
427 let user_reader = user_reader.read_guard();
428 let current_user = user_reader
429 .get_user_by_name(&session.user_name())
430 .expect("user not found");
431 let mut sources = iter_schema_items(&session, &schema, &reader, |schema| {
432 schema
433 .iter_source_with_acl(current_user)
434 .map(|t| t.name.clone())
435 .collect()
436 });
437 sources.extend(session.temporary_source_manager().keys());
438 sources
439 }
440 ShowObject::Sink { schema } => {
441 let reader = catalog_reader.read_guard();
442 let user_reader = user_reader.read_guard();
443 let current_user = user_reader
444 .get_user_by_name(&session.user_name())
445 .expect("user not found");
446 iter_schema_items(&session, &schema, &reader, |schema| {
447 schema
448 .iter_sink_with_acl(current_user)
449 .map(|t| t.name.clone())
450 .collect()
451 })
452 }
453 ShowObject::Subscription { schema } => {
454 let reader = catalog_reader.read_guard();
455 let rows = iter_schema_items(&session, &schema, &reader, |schema| {
456 schema
457 .iter_subscription()
458 .map(|t| ShowSubscriptionRow {
459 name: t.name.clone(),
460 retention_seconds: t.retention_seconds as i64,
461 })
462 .collect()
463 });
464 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
465 .rows(rows)
466 .into());
467 }
468 ShowObject::Secret { schema } => {
469 let reader = catalog_reader.read_guard();
470 iter_schema_items(&session, &schema, &reader, |schema| {
471 schema.iter_secret().map(|t| t.name.clone()).collect()
472 })
473 }
474 ShowObject::Columns { table } => {
475 let Ok(columns) = get_columns_from_table(&session, table.clone())
476 .or(get_columns_from_sink(&session, table.clone()))
477 .or(get_columns_from_view(&session, table.clone()))
478 else {
479 return Err(CatalogError::NotFound(
480 "table, source, sink or view",
481 table.to_string(),
482 )
483 .into());
484 };
485
486 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
487 .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog))
488 .into());
489 }
490 ShowObject::Indexes { table } => {
491 let indexes = get_indexes_from_table(&session, table)?;
492
493 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
494 .rows(indexes.into_iter().map(ShowIndexRow::from))
495 .into());
496 }
497 ShowObject::Connection { schema } => {
498 let reader = catalog_reader.read_guard();
499 let rows = iter_schema_items(&session, &schema, &reader, |schema| {
500 schema.iter_connections()
501 .map(|c| {
502 let name = c.name.clone();
503 let r#type = match &c.info {
504 connection::Info::PrivateLinkService(_) => {
505 PRIVATELINK_CONNECTION.to_owned()
506 },
507 connection::Info::ConnectionParams(params) => {
508 params.get_connection_type().unwrap().as_str_name().to_owned()
509 }
510 };
511 let source_names = schema
512 .get_source_ids_by_connection(c.id)
513 .unwrap_or_default()
514 .into_iter()
515 .filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
516 .collect_vec();
517 let sink_names = schema
518 .get_sink_ids_by_connection(c.id)
519 .unwrap_or_default()
520 .into_iter()
521 .filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
522 .collect_vec();
523 let properties = match &c.info {
524 connection::Info::PrivateLinkService(i) => {
525 format!(
526 "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}\nsources: {}\nsinks: {}",
527 i.get_provider().unwrap().as_str_name(),
528 i.service_name,
529 i.endpoint_id,
530 serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap(),
531 serde_json::to_string(&source_names).unwrap(),
532 serde_json::to_string(&sink_names).unwrap(),
533 )
534 }
535 connection::Info::ConnectionParams(params) => {
536 print_connection_params(params, schema)
538 }
539 };
540 ShowConnectionRow {
541 name,
542 r#type,
543 properties,
544 }
545 }).collect_vec()
546 });
547 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
548 .rows(rows)
549 .into());
550 }
551 ShowObject::Function { schema } => {
552 let reader = catalog_reader.read_guard();
553 let rows = iter_schema_items(&session, &schema, &reader, |schema| {
554 schema
555 .iter_function()
556 .map(|t| ShowFunctionRow {
557 name: t.name.clone(),
558 arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
559 return_type: t.return_type.to_string(),
560 language: t.language.clone(),
561 link: t.link.clone(),
562 })
563 .collect()
564 });
565 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
566 .rows(rows)
567 .into());
568 }
569 ShowObject::Cluster => {
570 let workers = session.env().meta_client().list_all_nodes().await?;
571 let rows = workers.into_iter().sorted_by_key(|w| w.id).map(|worker| {
572 let addr: HostAddr = worker.host.as_ref().unwrap().into();
573 let property = worker.property.as_ref();
574 ShowClusterRow {
575 id: worker.id as _,
576 addr: addr.to_string(),
577 r#type: worker.get_type().unwrap().as_str_name().into(),
578 state: worker.get_state().unwrap().as_str_name().to_owned(),
579 parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
580 is_streaming: property.map(|p| p.is_streaming),
581 is_serving: property.map(|p| p.is_serving),
582 is_unschedulable: property.map(|p| p.is_unschedulable),
583 started_at: worker
584 .started_at
585 .map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
586 }
587 });
588 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
589 .rows(rows)
590 .into());
591 }
592 ShowObject::Jobs => {
593 let resp = session.env().meta_client().get_ddl_progress().await?;
594 let rows = resp.into_iter().map(|job| ShowJobRow {
595 id: job.id as i64,
596 statement: job.statement,
597 progress: job.progress,
598 });
599 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
600 .rows(rows)
601 .into());
602 }
603 ShowObject::ProcessList => {
604 let sessions_map = session.env().sessions_map().read();
605 let rows = sessions_map.values().map(|s| {
606 ShowProcessListRow {
607 id: format!("{}", s.id().0),
609 user: s.user_name(),
610 host: format!("{}", s.peer_addr()),
611 database: s.database(),
612 time: s
613 .elapse_since_running_sql()
614 .map(|mills| format!("{}ms", mills)),
615 info: s
616 .running_sql()
617 .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))),
618 }
619 });
620
621 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
622 .rows(rows)
623 .into());
624 }
625 ShowObject::Cursor => {
626 let sessions = session
627 .env()
628 .sessions_map()
629 .read()
630 .values()
631 .cloned()
632 .collect_vec();
633 let mut rows = vec![];
634 for s in sessions {
635 let session_id = format!("{}", s.id().0);
636 let user = s.user_name();
637 let host = format!("{}", s.peer_addr());
638 let database = s.database();
639
640 s.get_cursor_manager()
641 .iter_query_cursors(|cursor_name: &String, _| {
642 rows.push(ShowCursorRow {
643 session_id: session_id.clone(),
644 user: user.clone(),
645 host: host.clone(),
646 database: database.clone(),
647 cursor_name: cursor_name.to_owned(),
648 });
649 })
650 .await;
651 }
652 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
653 .rows(rows)
654 .into());
655 }
656 ShowObject::SubscriptionCursor => {
657 let sessions = session
658 .env()
659 .sessions_map()
660 .read()
661 .values()
662 .cloned()
663 .collect_vec();
664 let mut rows = vec![];
665 for s in sessions {
666 let session_id = format!("{}", s.id().0);
667 let user = s.user_name();
668 let host = format!("{}", s.peer_addr());
669 let database = s.database().to_owned();
670
671 s.get_cursor_manager()
672 .iter_subscription_cursors(
673 |cursor_name: &String, cursor: &SubscriptionCursor| {
674 rows.push(ShowSubscriptionCursorRow {
675 session_id: session_id.clone(),
676 user: user.clone(),
677 host: host.clone(),
678 database: database.clone(),
679 cursor_name: cursor_name.to_owned(),
680 subscription_name: cursor.subscription_name().to_owned(),
681 state: cursor.state_info_string(),
682 idle_duration_ms: cursor.idle_duration().as_millis() as i64,
683 });
684 },
685 )
686 .await;
687 }
688
689 return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
690 .rows(rows)
691 .into());
692 }
693 };
694
695 let rows = names
696 .into_iter()
697 .filter(|arg| match &filter {
698 Some(ShowStatementFilter::Like(pattern)) => like_default(arg, pattern),
699 Some(ShowStatementFilter::ILike(pattern)) => i_like_default(arg, pattern),
700 Some(ShowStatementFilter::Where(..)) => unreachable!(),
701 None => true,
702 })
703 .map(|name| ShowObjectRow { name });
704
705 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
706 .rows(rows)
707 .into())
708}
709
710pub fn infer_show_create_object() -> Vec<PgFieldDescriptor> {
711 fields_to_descriptors(ShowCreateObjectRow::fields())
712}
713
714pub fn handle_show_create_object(
715 handle_args: HandlerArgs,
716 show_create_type: ShowCreateType,
717 name: ObjectName,
718) -> Result<RwPgResponse> {
719 let session = handle_args.session;
720 let catalog_reader = session.env().catalog_reader().read_guard();
721 let database = session.database();
722 let (schema_name, object_name) =
723 Binder::resolve_schema_qualified_name(&database, name.clone())?;
724 let search_path = session.config().search_path();
725 let user_name = &session.user_name();
726 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
727 let user_reader = session.env().user_info_reader().read_guard();
728 let current_user = user_reader
729 .get_user_by_name(user_name)
730 .expect("user not found");
731
732 let (sql, schema_name) = match show_create_type {
733 ShowCreateType::MaterializedView => {
734 let (mv, schema) = schema_path
735 .try_find(|schema_name| {
736 Ok::<_, RwError>(
737 catalog_reader
738 .get_schema_by_name(&database, schema_name)?
739 .get_created_table_by_name(&object_name)
740 .filter(|t| {
741 t.is_mview()
742 && has_access_to_object(
743 current_user,
744 schema_name,
745 t.id.table_id,
746 t.owner,
747 )
748 }),
749 )
750 })?
751 .ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
752 (mv.create_sql(), schema)
753 }
754 ShowCreateType::View => {
755 let (view, schema) =
756 catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
757 if !view.is_system_view()
758 && !has_access_to_object(current_user, schema, view.id, view.owner)
759 {
760 return Err(CatalogError::NotFound("view", name.to_string()).into());
761 }
762 (view.create_sql(schema.to_owned()), schema)
763 }
764 ShowCreateType::Table => {
765 let (table, schema) = schema_path
766 .try_find(|schema_name| {
767 Ok::<_, RwError>(
768 catalog_reader
769 .get_schema_by_name(&database, schema_name)?
770 .get_created_table_by_name(&object_name)
771 .filter(|t| {
772 t.is_user_table()
773 && has_access_to_object(
774 current_user,
775 schema_name,
776 t.id.table_id,
777 t.owner,
778 )
779 }),
780 )
781 })?
782 .ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
783
784 (table.create_sql_purified(), schema)
785 }
786 ShowCreateType::Sink => {
787 let (sink, schema) =
788 catalog_reader.get_sink_by_name(&database, schema_path, &object_name)?;
789 if !has_access_to_object(current_user, schema, sink.id.sink_id, sink.owner.user_id) {
790 return Err(CatalogError::NotFound("sink", name.to_string()).into());
791 }
792 (sink.create_sql(), schema)
793 }
794 ShowCreateType::Source => {
795 let (source, schema) = schema_path
796 .try_find(|schema_name| {
797 Ok::<_, RwError>(
798 catalog_reader
799 .get_schema_by_name(&database, schema_name)?
800 .get_source_by_name(&object_name)
801 .filter(|s| {
802 s.associated_table_id.is_none()
803 && has_access_to_object(
804 current_user,
805 schema_name,
806 s.id,
807 s.owner,
808 )
809 }),
810 )
811 })?
812 .ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
813 (source.create_sql_purified(), schema)
814 }
815 ShowCreateType::Index => {
816 let (index, schema) = schema_path
817 .try_find(|schema_name| {
818 Ok::<_, RwError>(
819 catalog_reader
820 .get_schema_by_name(&database, schema_name)?
821 .get_created_table_by_name(&object_name)
822 .filter(|t| {
823 t.is_index()
824 && has_access_to_object(
825 current_user,
826 schema_name,
827 t.id.table_id,
828 t.owner,
829 )
830 }),
831 )
832 })?
833 .ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
834 (index.create_sql(), schema)
835 }
836 ShowCreateType::Function => {
837 bail_not_implemented!("show create on: {}", show_create_type);
838 }
839 ShowCreateType::Subscription => {
840 let (subscription, schema) =
841 catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
842 if !has_access_to_object(
843 current_user,
844 schema,
845 subscription.id.subscription_id,
846 subscription.owner.user_id,
847 ) {
848 return Err(CatalogError::NotFound("subscription", name.to_string()).into());
849 }
850 (subscription.create_sql(), schema)
851 }
852 };
853 let name = format!("{}.{}", schema_name, object_name);
854
855 Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
856 .rows([ShowCreateObjectRow {
857 name,
858 create_sql: sql,
859 }])
860 .into())
861}
862
863#[cfg(test)]
864mod tests {
865 use std::ops::Index;
866
867 use futures_async_stream::for_await;
868
869 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
870
871 #[tokio::test]
872 async fn test_show_source() {
873 let frontend = LocalFrontend::new(Default::default()).await;
874
875 let sql = r#"CREATE SOURCE t1 (column1 varchar)
876 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
877 FORMAT PLAIN ENCODE JSON"#;
878 frontend.run_sql(sql).await.unwrap();
879
880 let mut rows = frontend.query_formatted_result("SHOW SOURCES").await;
881 rows.sort();
882 assert_eq!(rows, vec!["Row([Some(b\"t1\")])".to_owned(),]);
883 }
884
885 #[tokio::test]
886 async fn test_show_column() {
887 let proto_file = create_proto_file(PROTO_FILE_DATA);
888 let sql = format!(
889 r#"CREATE SOURCE t
890 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
891 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
892 proto_file.path().to_str().unwrap()
893 );
894 let frontend = LocalFrontend::new(Default::default()).await;
895 frontend.run_sql(sql).await.unwrap();
896
897 let sql = "show columns from t";
898 let mut pg_response = frontend.run_sql(sql).await.unwrap();
899
900 let mut columns = Vec::new();
901 #[for_await]
902 for row_set in pg_response.values_stream() {
903 let row_set = row_set.unwrap();
904 for row in row_set {
905 columns.push((
906 std::str::from_utf8(row.index(0).as_ref().unwrap())
907 .unwrap()
908 .to_owned(),
909 std::str::from_utf8(row.index(1).as_ref().unwrap())
910 .unwrap()
911 .to_owned(),
912 ));
913 }
914 }
915
916 expect_test::expect![[r#"
917 [
918 (
919 "id",
920 "integer",
921 ),
922 (
923 "country",
924 "struct",
925 ),
926 (
927 "country.address",
928 "character varying",
929 ),
930 (
931 "country.city",
932 "struct",
933 ),
934 (
935 "country.city.address",
936 "character varying",
937 ),
938 (
939 "country.city.zipcode",
940 "character varying",
941 ),
942 (
943 "country.zipcode",
944 "character varying",
945 ),
946 (
947 "zipcode",
948 "bigint",
949 ),
950 (
951 "rate",
952 "real",
953 ),
954 (
955 "_rw_kafka_timestamp",
956 "timestamp with time zone",
957 ),
958 (
959 "_rw_kafka_partition",
960 "character varying",
961 ),
962 (
963 "_rw_kafka_offset",
964 "character varying",
965 ),
966 (
967 "_row_id",
968 "serial",
969 ),
970 ]
971 "#]]
972 .assert_debug_eq(&columns);
973 }
974}