risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sources.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::id::{ConnectionId, SchemaId, SourceId, TableId, UserId};
16use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
17use risingwave_frontend_macro::system_catalog;
18use serde_json::{Map as JsonMap, json};
19
20use crate::WithOptionsSecResolved;
21use crate::catalog::catalog_service::CatalogReadGuard;
22use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
23use crate::error::Result;
24use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
25
26#[derive(Fields)]
27struct RwSource {
28    #[primary_key]
29    id: SourceId,
30    name: String,
31    schema_id: SchemaId,
32    owner: UserId,
33    connector: String,
34    columns: Vec<String>,
35    format: Option<String>,
36    row_encode: Option<String>,
37    append_only: bool,
38    associated_table_id: Option<TableId>,
39    connection_id: Option<ConnectionId>,
40    definition: String,
41    acl: Vec<String>,
42    initialized_at: Option<Timestamptz>,
43    created_at: Option<Timestamptz>,
44    initialized_at_cluster_version: Option<String>,
45    created_at_cluster_version: Option<String>,
46    is_shared: bool,
47    // connector properties in json format
48    connector_props: JsonbVal,
49    // format-encode options in json format
50    format_encode_options: JsonbVal,
51}
52
53#[system_catalog(table, "rw_catalog.rw_sources")]
54fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>> {
55    let catalog_reader = reader.catalog_reader.read_guard();
56    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
57    let user_reader = reader.user_info_reader.read_guard();
58    let users = user_reader.get_all_users();
59    let username_map = user_reader.get_user_name_map();
60
61    Ok(schemas
62        .flat_map(|schema| {
63            schema.iter_source().map(|source| {
64                let format_encode_props_with_secrets = WithOptionsSecResolved::new(
65                    source.info.format_encode_options.clone(),
66                    source.info.format_encode_secret_refs.clone(),
67                );
68                RwSource {
69                    id: source.id,
70                    name: source.name.clone(),
71                    schema_id: schema.id(),
72                    owner: source.owner,
73                    connector: source
74                        .with_properties
75                        .get(UPSTREAM_SOURCE_KEY)
76                        .cloned()
77                        .unwrap_or("".to_owned())
78                        .to_uppercase(),
79                    columns: source.columns.iter().map(|c| c.name().into()).collect(),
80                    format: source
81                        .info
82                        .get_format()
83                        .ok()
84                        .map(|format| format.as_str_name().into()),
85                    row_encode: source
86                        .info
87                        .get_row_encode()
88                        .ok()
89                        .map(|row_encode| row_encode.as_str_name().into()),
90                    append_only: source.append_only,
91                    associated_table_id: source.associated_table_id,
92                    connection_id: source.connection_id,
93                    definition: source.create_sql_purified(),
94                    acl: get_acl_items(source.id, false, &users, username_map),
95                    initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
96                    created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
97                    initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
98                    created_at_cluster_version: source.created_at_cluster_version.clone(),
99                    is_shared: source.info.is_shared(),
100
101                    connector_props: serialize_props_with_secret(
102                        &catalog_reader,
103                        &reader.auth_context.database,
104                        source.with_properties.clone(),
105                    )
106                    .into(),
107                    format_encode_options: serialize_props_with_secret(
108                        &catalog_reader,
109                        &reader.auth_context.database,
110                        format_encode_props_with_secrets,
111                    )
112                    .into(),
113                }
114            })
115        })
116        .collect())
117}
118
119pub fn serialize_props_with_secret(
120    catalog_reader: &CatalogReadGuard,
121    db_name: &str,
122    props_with_secret: WithOptionsSecResolved,
123) -> jsonbb::Value {
124    let (inner, secret_ref) = props_with_secret.into_parts();
125    // if not secret, {"some key": {"type": "plaintext", "value": "xxxx"}}
126    // if secret, {"some key": {"type": "secret", "value": {"value": "<secret name>"}}}
127    let mut result: JsonMap<String, serde_json::Value> = JsonMap::new();
128
129    for (k, v) in inner {
130        result.insert(k, json!({"type": "plaintext", "value": v}));
131    }
132    for (k, v) in secret_ref {
133        let secret = catalog_reader
134            .iter_schemas(db_name)
135            .unwrap()
136            .find_map(|schema| schema.get_secret_by_id(v.secret_id));
137        let secret_name = secret
138            .map(|s| s.name.clone())
139            .unwrap_or("not found".to_owned());
140        result.insert(
141            k,
142            json!({"type": "secret", "value": {"value": secret_name}}),
143        );
144    }
145
146    jsonbb::Value::from(serde_json::Value::Object(result))
147}