risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_connections.rs1use risingwave_common::types::Fields;
16use risingwave_frontend_macro::system_catalog;
17
18use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
19use crate::error::Result;
20use crate::handler::create_connection::print_connection_params;
21
22#[derive(Fields)]
23struct RwConnection {
24 #[primary_key]
25 id: i32,
26 name: String,
27 schema_id: i32,
28 owner: i32,
29 type_: String,
30 provider: String,
31 acl: Vec<String>,
32 connection_params: String,
33}
34
35#[system_catalog(table, "rw_catalog.rw_connections")]
36fn read_rw_connections(reader: &SysCatalogReaderImpl) -> Result<Vec<RwConnection>> {
37 let catalog_reader = reader.catalog_reader.read_guard();
38 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
39 let user_reader = reader.user_info_reader.read_guard();
40 let users = user_reader.get_all_users();
41 let current_user = user_reader
42 .get_user_by_name(&reader.auth_context.user_name)
43 .expect("user not found");
44 let username_map = user_reader.get_user_name_map();
45
46 Ok(schemas
48 .flat_map(|schema| {
49 schema.iter_connections_with_acl(current_user).map(|conn| {
50 let mut rw_connection = RwConnection {
51 id: conn.id.as_i32_id(),
52 name: conn.name.clone(),
53 schema_id: schema.id().as_i32_id(),
54 owner: conn.owner as i32,
55 type_: conn.connection_type().into(),
56 provider: "".to_owned(),
57 acl: get_acl_items(conn.id, false, &users, username_map),
58 connection_params: "".to_owned(),
59 };
60 match &conn.info {
61 risingwave_pb::catalog::connection::Info::PrivateLinkService(_) => {
62 rw_connection.provider = conn.provider().into();
63 }
64 risingwave_pb::catalog::connection::Info::ConnectionParams(params) => {
65 rw_connection.connection_params = print_connection_params(
66 &reader.auth_context.database,
67 params,
68 &catalog_reader,
69 );
70 }
71 };
72
73 rw_connection
74 })
75 })
76 .collect())
77}