risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_connections.rs1use risingwave_common::id::{ConnectionId, SchemaId, UserId};
16use risingwave_common::types::Fields;
17use risingwave_frontend_macro::system_catalog;
18
19use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
20use crate::error::Result;
21use crate::handler::create_connection::print_connection_params;
22
23#[derive(Fields)]
24struct RwConnection {
25 #[primary_key]
26 id: ConnectionId,
27 name: String,
28 schema_id: SchemaId,
29 owner: UserId,
30 type_: String,
31 provider: String,
32 acl: Vec<String>,
33 connection_params: String,
34}
35
36#[system_catalog(table, "rw_catalog.rw_connections")]
37fn read_rw_connections(reader: &SysCatalogReaderImpl) -> Result<Vec<RwConnection>> {
38 let catalog_reader = reader.catalog_reader.read_guard();
39 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
40 let user_reader = reader.user_info_reader.read_guard();
41 let users = user_reader.get_all_users();
42 let current_user = user_reader
43 .get_user_by_name(&reader.auth_context.user_name)
44 .expect("user not found");
45 let username_map = user_reader.get_user_name_map();
46
47 Ok(schemas
49 .flat_map(|schema| {
50 schema.iter_connections_with_acl(current_user).map(|conn| {
51 let mut rw_connection = RwConnection {
52 id: conn.id,
53 name: conn.name.clone(),
54 schema_id: schema.id(),
55 owner: conn.owner,
56 type_: conn.connection_type().into(),
57 provider: "".to_owned(),
58 acl: get_acl_items(conn.id, false, &users, username_map),
59 connection_params: "".to_owned(),
60 };
61 match &conn.info {
62 risingwave_pb::catalog::connection::Info::PrivateLinkService(_) => {
63 rw_connection.provider = conn.provider().into();
64 }
65 risingwave_pb::catalog::connection::Info::ConnectionParams(params) => {
66 rw_connection.connection_params = print_connection_params(
67 &reader.auth_context.database,
68 params,
69 &catalog_reader,
70 );
71 }
72 };
73
74 rw_connection
75 })
76 })
77 .collect())
78}