risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_connections.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::id::{ConnectionId, SchemaId, UserId};
16use risingwave_common::types::Fields;
17use risingwave_frontend_macro::system_catalog;
18
19use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
20use crate::error::Result;
21use crate::handler::create_connection::print_connection_params;
22
23#[derive(Fields)]
24struct RwConnection {
25    #[primary_key]
26    id: ConnectionId,
27    name: String,
28    schema_id: SchemaId,
29    owner: UserId,
30    type_: String,
31    provider: String,
32    acl: Vec<String>,
33    connection_params: String,
34}
35
36#[system_catalog(table, "rw_catalog.rw_connections")]
37fn read_rw_connections(reader: &SysCatalogReaderImpl) -> Result<Vec<RwConnection>> {
38    let catalog_reader = reader.catalog_reader.read_guard();
39    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
40    let user_reader = reader.user_info_reader.read_guard();
41    let users = user_reader.get_all_users();
42    let current_user = user_reader
43        .get_user_by_name(&reader.auth_context.user_name)
44        .expect("user not found");
45    let username_map = user_reader.get_user_name_map();
46
47    // todo: redesign the internal table for connection params
48    Ok(schemas
49        .flat_map(|schema| {
50            schema.iter_connections_with_acl(current_user).map(|conn| {
51                let mut rw_connection = RwConnection {
52                    id: conn.id,
53                    name: conn.name.clone(),
54                    schema_id: schema.id(),
55                    owner: conn.owner,
56                    type_: conn.connection_type().into(),
57                    provider: "".to_owned(),
58                    acl: get_acl_items(conn.id, false, &users, username_map),
59                    connection_params: "".to_owned(),
60                };
61                match &conn.info {
62                    risingwave_pb::catalog::connection::Info::PrivateLinkService(_) => {
63                        rw_connection.provider = conn.provider().into();
64                    }
65                    risingwave_pb::catalog::connection::Info::ConnectionParams(params) => {
66                        rw_connection.connection_params = print_connection_params(
67                            &reader.auth_context.database,
68                            params,
69                            &catalog_reader,
70                        );
71                    }
72                };
73
74                rw_connection
75            })
76        })
77        .collect())
78}