risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sources.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::SecretId;
16use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
17use risingwave_frontend_macro::system_catalog;
18use risingwave_pb::user::grant_privilege::Object;
19use serde_json::{Map as JsonMap, json};
20
21use crate::WithOptionsSecResolved;
22use crate::catalog::catalog_service::CatalogReadGuard;
23use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
24use crate::error::Result;
25use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
26
27#[derive(Fields)]
28struct RwSource {
29    #[primary_key]
30    id: i32,
31    name: String,
32    schema_id: i32,
33    owner: i32,
34    connector: String,
35    columns: Vec<String>,
36    format: Option<String>,
37    row_encode: Option<String>,
38    append_only: bool,
39    associated_table_id: Option<i32>,
40    connection_id: Option<i32>,
41    definition: String,
42    acl: Vec<String>,
43    initialized_at: Option<Timestamptz>,
44    created_at: Option<Timestamptz>,
45    initialized_at_cluster_version: Option<String>,
46    created_at_cluster_version: Option<String>,
47    is_shared: bool,
48    // connector properties in json format
49    connector_props: JsonbVal,
50    // format-encode options in json format
51    format_encode_options: JsonbVal,
52}
53
54#[system_catalog(table, "rw_catalog.rw_sources")]
55fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>> {
56    let catalog_reader = reader.catalog_reader.read_guard();
57    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
58    let user_reader = reader.user_info_reader.read_guard();
59    let users = user_reader.get_all_users();
60    let username_map = user_reader.get_user_name_map();
61
62    Ok(schemas
63        .flat_map(|schema| {
64            schema.iter_source().map(|source| {
65                let format_encode_props_with_secrets = WithOptionsSecResolved::new(
66                    source.info.format_encode_options.clone(),
67                    source.info.format_encode_secret_refs.clone(),
68                );
69                RwSource {
70                    id: source.id as i32,
71                    name: source.name.clone(),
72                    schema_id: schema.id() as i32,
73                    owner: source.owner as i32,
74                    connector: source
75                        .with_properties
76                        .get(UPSTREAM_SOURCE_KEY)
77                        .cloned()
78                        .unwrap_or("".to_owned())
79                        .to_uppercase(),
80                    columns: source.columns.iter().map(|c| c.name().into()).collect(),
81                    format: source
82                        .info
83                        .get_format()
84                        .ok()
85                        .map(|format| format.as_str_name().into()),
86                    row_encode: source
87                        .info
88                        .get_row_encode()
89                        .ok()
90                        .map(|row_encode| row_encode.as_str_name().into()),
91                    append_only: source.append_only,
92                    associated_table_id: source.associated_table_id.map(|id| id.table_id as i32),
93                    connection_id: source.connection_id.map(|id| id as i32),
94                    definition: source.create_sql_purified(),
95                    acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map),
96                    initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
97                    created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
98                    initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
99                    created_at_cluster_version: source.created_at_cluster_version.clone(),
100                    is_shared: source.info.is_shared(),
101
102                    connector_props: serialize_props_with_secret(
103                        &catalog_reader,
104                        &reader.auth_context.database,
105                        source.with_properties.clone(),
106                    )
107                    .into(),
108                    format_encode_options: serialize_props_with_secret(
109                        &catalog_reader,
110                        &reader.auth_context.database,
111                        format_encode_props_with_secrets,
112                    )
113                    .into(),
114                }
115            })
116        })
117        .collect())
118}
119
120pub fn serialize_props_with_secret(
121    catalog_reader: &CatalogReadGuard,
122    db_name: &str,
123    props_with_secret: WithOptionsSecResolved,
124) -> jsonbb::Value {
125    let (inner, secret_ref) = props_with_secret.into_parts();
126    // if not secret, {"some key": {"type": "plaintext", "value": "xxxx"}}
127    // if secret, {"some key": {"type": "secret", "value": {"value": "<secret name>"}}}
128    let mut result: JsonMap<String, serde_json::Value> = JsonMap::new();
129
130    for (k, v) in inner {
131        result.insert(k, json!({"type": "plaintext", "value": v}));
132    }
133    for (k, v) in secret_ref {
134        let secret = catalog_reader
135            .iter_schemas(db_name)
136            .unwrap()
137            .find_map(|schema| schema.get_secret_by_id(&SecretId(v.secret_id)));
138        let secret_name = secret
139            .map(|s| s.name.to_owned())
140            .unwrap_or("not found".to_owned());
141        result.insert(
142            k,
143            json!({"type": "secret", "value": {"value": secret_name}}),
144        );
145    }
146
147    jsonbb::Value::from(serde_json::Value::Object(result))
148}