risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sources.rs1use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
16use risingwave_frontend_macro::system_catalog;
17use serde_json::{Map as JsonMap, json};
18
19use crate::WithOptionsSecResolved;
20use crate::catalog::catalog_service::CatalogReadGuard;
21use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
22use crate::error::Result;
23use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
24
25#[derive(Fields)]
26struct RwSource {
27 #[primary_key]
28 id: i32,
29 name: String,
30 schema_id: i32,
31 owner: i32,
32 connector: String,
33 columns: Vec<String>,
34 format: Option<String>,
35 row_encode: Option<String>,
36 append_only: bool,
37 associated_table_id: Option<i32>,
38 connection_id: Option<i32>,
39 definition: String,
40 acl: Vec<String>,
41 initialized_at: Option<Timestamptz>,
42 created_at: Option<Timestamptz>,
43 initialized_at_cluster_version: Option<String>,
44 created_at_cluster_version: Option<String>,
45 is_shared: bool,
46 connector_props: JsonbVal,
48 format_encode_options: JsonbVal,
50}
51
52#[system_catalog(table, "rw_catalog.rw_sources")]
53fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>> {
54 let catalog_reader = reader.catalog_reader.read_guard();
55 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
56 let user_reader = reader.user_info_reader.read_guard();
57 let users = user_reader.get_all_users();
58 let username_map = user_reader.get_user_name_map();
59
60 Ok(schemas
61 .flat_map(|schema| {
62 schema.iter_source().map(|source| {
63 let format_encode_props_with_secrets = WithOptionsSecResolved::new(
64 source.info.format_encode_options.clone(),
65 source.info.format_encode_secret_refs.clone(),
66 );
67 RwSource {
68 id: source.id.as_i32_id(),
69 name: source.name.clone(),
70 schema_id: schema.id().as_i32_id(),
71 owner: source.owner as i32,
72 connector: source
73 .with_properties
74 .get(UPSTREAM_SOURCE_KEY)
75 .cloned()
76 .unwrap_or("".to_owned())
77 .to_uppercase(),
78 columns: source.columns.iter().map(|c| c.name().into()).collect(),
79 format: source
80 .info
81 .get_format()
82 .ok()
83 .map(|format| format.as_str_name().into()),
84 row_encode: source
85 .info
86 .get_row_encode()
87 .ok()
88 .map(|row_encode| row_encode.as_str_name().into()),
89 append_only: source.append_only,
90 associated_table_id: source.associated_table_id.map(|id| id.as_i32_id()),
91 connection_id: source.connection_id.map(|id| id.as_i32_id()),
92 definition: source.create_sql_purified(),
93 acl: get_acl_items(source.id, false, &users, username_map),
94 initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
95 created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
96 initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
97 created_at_cluster_version: source.created_at_cluster_version.clone(),
98 is_shared: source.info.is_shared(),
99
100 connector_props: serialize_props_with_secret(
101 &catalog_reader,
102 &reader.auth_context.database,
103 source.with_properties.clone(),
104 )
105 .into(),
106 format_encode_options: serialize_props_with_secret(
107 &catalog_reader,
108 &reader.auth_context.database,
109 format_encode_props_with_secrets,
110 )
111 .into(),
112 }
113 })
114 })
115 .collect())
116}
117
118pub fn serialize_props_with_secret(
119 catalog_reader: &CatalogReadGuard,
120 db_name: &str,
121 props_with_secret: WithOptionsSecResolved,
122) -> jsonbb::Value {
123 let (inner, secret_ref) = props_with_secret.into_parts();
124 let mut result: JsonMap<String, serde_json::Value> = JsonMap::new();
127
128 for (k, v) in inner {
129 result.insert(k, json!({"type": "plaintext", "value": v}));
130 }
131 for (k, v) in secret_ref {
132 let secret = catalog_reader
133 .iter_schemas(db_name)
134 .unwrap()
135 .find_map(|schema| schema.get_secret_by_id(v.secret_id));
136 let secret_name = secret
137 .map(|s| s.name.clone())
138 .unwrap_or("not found".to_owned());
139 result.insert(
140 k,
141 json!({"type": "secret", "value": {"value": secret_name}}),
142 );
143 }
144
145 jsonbb::Value::from(serde_json::Value::Object(result))
146}