risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sources.rs1use risingwave_common::id::{ConnectionId, SchemaId, SourceId, TableId, UserId};
16use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
17use risingwave_frontend_macro::system_catalog;
18use serde_json::{Map as JsonMap, json};
19
20use crate::WithOptionsSecResolved;
21use crate::catalog::catalog_service::CatalogReadGuard;
22use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
23use crate::error::Result;
24use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
25
26#[derive(Fields)]
27struct RwSource {
28 #[primary_key]
29 id: SourceId,
30 name: String,
31 schema_id: SchemaId,
32 owner: UserId,
33 connector: String,
34 columns: Vec<String>,
35 format: Option<String>,
36 row_encode: Option<String>,
37 append_only: bool,
38 associated_table_id: Option<TableId>,
39 connection_id: Option<ConnectionId>,
40 definition: String,
41 acl: Vec<String>,
42 initialized_at: Option<Timestamptz>,
43 created_at: Option<Timestamptz>,
44 initialized_at_cluster_version: Option<String>,
45 created_at_cluster_version: Option<String>,
46 is_shared: bool,
47 connector_props: JsonbVal,
49 format_encode_options: JsonbVal,
51}
52
53#[system_catalog(table, "rw_catalog.rw_sources")]
54fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>> {
55 let catalog_reader = reader.catalog_reader.read_guard();
56 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
57 let user_reader = reader.user_info_reader.read_guard();
58 let users = user_reader.get_all_users();
59 let username_map = user_reader.get_user_name_map();
60
61 Ok(schemas
62 .flat_map(|schema| {
63 schema.iter_source().map(|source| {
64 let format_encode_props_with_secrets = WithOptionsSecResolved::new(
65 source.info.format_encode_options.clone(),
66 source.info.format_encode_secret_refs.clone(),
67 );
68 RwSource {
69 id: source.id,
70 name: source.name.clone(),
71 schema_id: schema.id(),
72 owner: source.owner,
73 connector: source
74 .with_properties
75 .get(UPSTREAM_SOURCE_KEY)
76 .cloned()
77 .unwrap_or("".to_owned())
78 .to_uppercase(),
79 columns: source.columns.iter().map(|c| c.name().into()).collect(),
80 format: source
81 .info
82 .get_format()
83 .ok()
84 .map(|format| format.as_str_name().into()),
85 row_encode: source
86 .info
87 .get_row_encode()
88 .ok()
89 .map(|row_encode| row_encode.as_str_name().into()),
90 append_only: source.append_only,
91 associated_table_id: source.associated_table_id,
92 connection_id: source.connection_id,
93 definition: source.create_sql_purified(),
94 acl: get_acl_items(source.id, false, &users, username_map),
95 initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
96 created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
97 initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
98 created_at_cluster_version: source.created_at_cluster_version.clone(),
99 is_shared: source.info.is_shared(),
100
101 connector_props: serialize_props_with_secret(
102 &catalog_reader,
103 &reader.auth_context.database,
104 source.with_properties.clone(),
105 )
106 .into(),
107 format_encode_options: serialize_props_with_secret(
108 &catalog_reader,
109 &reader.auth_context.database,
110 format_encode_props_with_secrets,
111 )
112 .into(),
113 }
114 })
115 })
116 .collect())
117}
118
119pub fn serialize_props_with_secret(
120 catalog_reader: &CatalogReadGuard,
121 db_name: &str,
122 props_with_secret: WithOptionsSecResolved,
123) -> jsonbb::Value {
124 let (inner, secret_ref) = props_with_secret.into_parts();
125 let mut result: JsonMap<String, serde_json::Value> = JsonMap::new();
128
129 for (k, v) in inner {
130 result.insert(k, json!({"type": "plaintext", "value": v}));
131 }
132 for (k, v) in secret_ref {
133 let secret = catalog_reader
134 .iter_schemas(db_name)
135 .unwrap()
136 .find_map(|schema| schema.get_secret_by_id(v.secret_id));
137 let secret_name = secret
138 .map(|s| s.name.clone())
139 .unwrap_or("not found".to_owned());
140 result.insert(
141 k,
142 json!({"type": "secret", "value": {"value": secret_name}}),
143 );
144 }
145
146 jsonbb::Value::from(serde_json::Value::Object(result))
147}