risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sources.rs1use risingwave_common::catalog::SecretId;
16use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
17use risingwave_frontend_macro::system_catalog;
18use risingwave_pb::user::grant_privilege::Object;
19use serde_json::{Map as JsonMap, json};
20
21use crate::WithOptionsSecResolved;
22use crate::catalog::catalog_service::CatalogReadGuard;
23use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
24use crate::error::Result;
25use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
26
27#[derive(Fields)]
28struct RwSource {
29 #[primary_key]
30 id: i32,
31 name: String,
32 schema_id: i32,
33 owner: i32,
34 connector: String,
35 columns: Vec<String>,
36 format: Option<String>,
37 row_encode: Option<String>,
38 append_only: bool,
39 associated_table_id: Option<i32>,
40 connection_id: Option<i32>,
41 definition: String,
42 acl: Vec<String>,
43 initialized_at: Option<Timestamptz>,
44 created_at: Option<Timestamptz>,
45 initialized_at_cluster_version: Option<String>,
46 created_at_cluster_version: Option<String>,
47 is_shared: bool,
48 connector_props: JsonbVal,
50 format_encode_options: JsonbVal,
52}
53
54#[system_catalog(table, "rw_catalog.rw_sources")]
55fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>> {
56 let catalog_reader = reader.catalog_reader.read_guard();
57 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
58 let user_reader = reader.user_info_reader.read_guard();
59 let users = user_reader.get_all_users();
60 let username_map = user_reader.get_user_name_map();
61
62 Ok(schemas
63 .flat_map(|schema| {
64 schema.iter_source().map(|source| {
65 let format_encode_props_with_secrets = WithOptionsSecResolved::new(
66 source.info.format_encode_options.clone(),
67 source.info.format_encode_secret_refs.clone(),
68 );
69 RwSource {
70 id: source.id as i32,
71 name: source.name.clone(),
72 schema_id: schema.id() as i32,
73 owner: source.owner as i32,
74 connector: source
75 .with_properties
76 .get(UPSTREAM_SOURCE_KEY)
77 .cloned()
78 .unwrap_or("".to_owned())
79 .to_uppercase(),
80 columns: source.columns.iter().map(|c| c.name().into()).collect(),
81 format: source
82 .info
83 .get_format()
84 .ok()
85 .map(|format| format.as_str_name().into()),
86 row_encode: source
87 .info
88 .get_row_encode()
89 .ok()
90 .map(|row_encode| row_encode.as_str_name().into()),
91 append_only: source.append_only,
92 associated_table_id: source.associated_table_id.map(|id| id.table_id as i32),
93 connection_id: source.connection_id.map(|id| id as i32),
94 definition: source.create_sql_purified(),
95 acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map),
96 initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
97 created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
98 initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
99 created_at_cluster_version: source.created_at_cluster_version.clone(),
100 is_shared: source.info.is_shared(),
101
102 connector_props: serialize_props_with_secret(
103 &catalog_reader,
104 &reader.auth_context.database,
105 source.with_properties.clone(),
106 )
107 .into(),
108 format_encode_options: serialize_props_with_secret(
109 &catalog_reader,
110 &reader.auth_context.database,
111 format_encode_props_with_secrets,
112 )
113 .into(),
114 }
115 })
116 })
117 .collect())
118}
119
120pub fn serialize_props_with_secret(
121 catalog_reader: &CatalogReadGuard,
122 db_name: &str,
123 props_with_secret: WithOptionsSecResolved,
124) -> jsonbb::Value {
125 let (inner, secret_ref) = props_with_secret.into_parts();
126 let mut result: JsonMap<String, serde_json::Value> = JsonMap::new();
129
130 for (k, v) in inner {
131 result.insert(k, json!({"type": "plaintext", "value": v}));
132 }
133 for (k, v) in secret_ref {
134 let secret = catalog_reader
135 .iter_schemas(db_name)
136 .unwrap()
137 .find_map(|schema| schema.get_secret_by_id(&SecretId(v.secret_id)));
138 let secret_name = secret
139 .map(|s| s.name.to_owned())
140 .unwrap_or("not found".to_owned());
141 result.insert(
142 k,
143 json!({"type": "secret", "value": {"value": secret_name}}),
144 );
145 }
146
147 jsonbb::Value::from(serde_json::Value::Object(result))
148}