risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sources.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
16use risingwave_frontend_macro::system_catalog;
17use serde_json::{Map as JsonMap, json};
18
19use crate::WithOptionsSecResolved;
20use crate::catalog::catalog_service::CatalogReadGuard;
21use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
22use crate::error::Result;
23use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
24
25#[derive(Fields)]
26struct RwSource {
27    #[primary_key]
28    id: i32,
29    name: String,
30    schema_id: i32,
31    owner: i32,
32    connector: String,
33    columns: Vec<String>,
34    format: Option<String>,
35    row_encode: Option<String>,
36    append_only: bool,
37    associated_table_id: Option<i32>,
38    connection_id: Option<i32>,
39    definition: String,
40    acl: Vec<String>,
41    initialized_at: Option<Timestamptz>,
42    created_at: Option<Timestamptz>,
43    initialized_at_cluster_version: Option<String>,
44    created_at_cluster_version: Option<String>,
45    is_shared: bool,
46    // connector properties in json format
47    connector_props: JsonbVal,
48    // format-encode options in json format
49    format_encode_options: JsonbVal,
50}
51
52#[system_catalog(table, "rw_catalog.rw_sources")]
53fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>> {
54    let catalog_reader = reader.catalog_reader.read_guard();
55    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
56    let user_reader = reader.user_info_reader.read_guard();
57    let users = user_reader.get_all_users();
58    let username_map = user_reader.get_user_name_map();
59
60    Ok(schemas
61        .flat_map(|schema| {
62            schema.iter_source().map(|source| {
63                let format_encode_props_with_secrets = WithOptionsSecResolved::new(
64                    source.info.format_encode_options.clone(),
65                    source.info.format_encode_secret_refs.clone(),
66                );
67                RwSource {
68                    id: source.id.as_i32_id(),
69                    name: source.name.clone(),
70                    schema_id: schema.id().as_i32_id(),
71                    owner: source.owner as i32,
72                    connector: source
73                        .with_properties
74                        .get(UPSTREAM_SOURCE_KEY)
75                        .cloned()
76                        .unwrap_or("".to_owned())
77                        .to_uppercase(),
78                    columns: source.columns.iter().map(|c| c.name().into()).collect(),
79                    format: source
80                        .info
81                        .get_format()
82                        .ok()
83                        .map(|format| format.as_str_name().into()),
84                    row_encode: source
85                        .info
86                        .get_row_encode()
87                        .ok()
88                        .map(|row_encode| row_encode.as_str_name().into()),
89                    append_only: source.append_only,
90                    associated_table_id: source.associated_table_id.map(|id| id.as_i32_id()),
91                    connection_id: source.connection_id.map(|id| id.as_i32_id()),
92                    definition: source.create_sql_purified(),
93                    acl: get_acl_items(source.id, false, &users, username_map),
94                    initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
95                    created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
96                    initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
97                    created_at_cluster_version: source.created_at_cluster_version.clone(),
98                    is_shared: source.info.is_shared(),
99
100                    connector_props: serialize_props_with_secret(
101                        &catalog_reader,
102                        &reader.auth_context.database,
103                        source.with_properties.clone(),
104                    )
105                    .into(),
106                    format_encode_options: serialize_props_with_secret(
107                        &catalog_reader,
108                        &reader.auth_context.database,
109                        format_encode_props_with_secrets,
110                    )
111                    .into(),
112                }
113            })
114        })
115        .collect())
116}
117
118pub fn serialize_props_with_secret(
119    catalog_reader: &CatalogReadGuard,
120    db_name: &str,
121    props_with_secret: WithOptionsSecResolved,
122) -> jsonbb::Value {
123    let (inner, secret_ref) = props_with_secret.into_parts();
124    // if not secret, {"some key": {"type": "plaintext", "value": "xxxx"}}
125    // if secret, {"some key": {"type": "secret", "value": {"value": "<secret name>"}}}
126    let mut result: JsonMap<String, serde_json::Value> = JsonMap::new();
127
128    for (k, v) in inner {
129        result.insert(k, json!({"type": "plaintext", "value": v}));
130    }
131    for (k, v) in secret_ref {
132        let secret = catalog_reader
133            .iter_schemas(db_name)
134            .unwrap()
135            .find_map(|schema| schema.get_secret_by_id(v.secret_id));
136        let secret_name = secret
137            .map(|s| s.name.clone())
138            .unwrap_or("not found".to_owned());
139        result.insert(
140            k,
141            json!({"type": "secret", "value": {"value": secret_name}}),
142        );
143    }
144
145    jsonbb::Value::from(serde_json::Value::Object(result))
146}