1use anyhow::{Context, anyhow};
16use itertools::Itertools;
17use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
18use risingwave_hummock_sdk::FrontendHummockVersion;
19use risingwave_meta::MetaResult;
20use risingwave_meta::controller::catalog::Catalog;
21use risingwave_meta::manager::MetadataManager;
22use risingwave_pb::backup_service::MetaBackupManifestId;
23use risingwave_pb::catalog::{Secret, Table};
24use risingwave_pb::common::worker_node::State::Running;
25use risingwave_pb::common::{WorkerNode, WorkerType};
26use risingwave_pb::hummock::WriteLimits;
27use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
28use risingwave_pb::meta::notification_service_server::NotificationService;
29use risingwave_pb::meta::{
30 FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
31 SubscribeType,
32};
33use risingwave_pb::user::UserInfo;
34use tokio::sync::mpsc;
35use tokio_stream::wrappers::UnboundedReceiverStream;
36use tonic::{Request, Response, Status};
37
38use crate::backup_restore::BackupManagerRef;
39use crate::hummock::HummockManagerRef;
40use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
41use crate::serving::ServingVnodeMappingRef;
42
43pub struct NotificationServiceImpl {
44 env: MetaSrvEnv,
45
46 metadata_manager: MetadataManager,
47 hummock_manager: HummockManagerRef,
48 backup_manager: BackupManagerRef,
49 serving_vnode_mapping: ServingVnodeMappingRef,
50}
51
52impl NotificationServiceImpl {
53 pub async fn new(
54 env: MetaSrvEnv,
55 metadata_manager: MetadataManager,
56 hummock_manager: HummockManagerRef,
57 backup_manager: BackupManagerRef,
58 serving_vnode_mapping: ServingVnodeMappingRef,
59 ) -> MetaResult<Self> {
60 let service = Self {
61 env,
62 metadata_manager,
63 hummock_manager,
64 backup_manager,
65 serving_vnode_mapping,
66 };
67 let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
68 LocalSecretManager::global().init_secrets(secrets);
69 Ok(service)
70 }
71
72 async fn get_catalog_snapshot(
73 &self,
74 ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
75 let catalog_guard = self
76 .metadata_manager
77 .catalog_controller
78 .get_inner_read_guard()
79 .await;
80 let (
81 (
82 databases,
83 schemas,
84 tables,
85 sources,
86 sinks,
87 subscriptions,
88 indexes,
89 views,
90 functions,
91 connections,
92 secrets,
93 ),
94 users,
95 ) = catalog_guard.snapshot().await?;
96 let notification_version = self.env.notification_manager().current_version().await;
97 Ok((
98 (
99 databases,
100 schemas,
101 tables,
102 sources,
103 sinks,
104 subscriptions,
105 indexes,
106 views,
107 functions,
108 connections,
109 secrets,
110 ),
111 users,
112 notification_version,
113 ))
114 }
115
116 async fn get_decrypted_secret_snapshot(
118 &self,
119 ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
120 let catalog_guard = self
121 .metadata_manager
122 .catalog_controller
123 .get_inner_read_guard()
124 .await;
125 let secrets = catalog_guard.list_secrets().await?;
126 let notification_version = self.env.notification_manager().current_version().await;
127
128 let decrypted_secrets = self.decrypt_secrets(secrets)?;
129
130 Ok((decrypted_secrets, notification_version))
131 }
132
133 fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
134 if secrets.is_empty() {
136 return Ok(vec![]);
137 }
138 let secret_store_private_key = self
139 .env
140 .opts
141 .secret_store_private_key
142 .clone()
143 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
144 let mut decrypted_secrets = Vec::with_capacity(secrets.len());
145 for mut secret in secrets {
146 let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
147 .context(format!("failed to deserialize secret {}", secret.name))?;
148 let decrypted_secret = encrypted_secret
149 .decrypt(secret_store_private_key.as_slice())
150 .context(format!("failed to decrypt secret {}", secret.name))?;
151 secret.value = decrypted_secret;
152 decrypted_secrets.push(secret);
153 }
154 Ok(decrypted_secrets)
155 }
156
157 async fn get_worker_slot_mapping_snapshot(
158 &self,
159 ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
160 let fragment_guard = self
161 .metadata_manager
162 .catalog_controller
163 .get_inner_read_guard()
164 .await;
165 let worker_slot_mappings = fragment_guard
166 .all_running_fragment_mappings()
167 .await?
168 .collect_vec();
169 let notification_version = self.env.notification_manager().current_version().await;
170 Ok((worker_slot_mappings, notification_version))
171 }
172
173 fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
174 self.serving_vnode_mapping
175 .all()
176 .iter()
177 .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
178 fragment_id: *fragment_id,
179 mapping: Some(mapping.to_protobuf()),
180 })
181 .collect()
182 }
183
184 async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
185 let cluster_guard = self
186 .metadata_manager
187 .cluster_controller
188 .get_inner_read_guard()
189 .await;
190 let nodes = cluster_guard
191 .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
192 .await?;
193 let notification_version = self.env.notification_manager().current_version().await;
194 Ok((nodes, notification_version))
195 }
196
197 async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
198 let catalog_guard = self
199 .metadata_manager
200 .catalog_controller
201 .get_inner_read_guard()
202 .await;
203 let mut tables = catalog_guard.list_all_state_tables().await?;
204 tables.extend(catalog_guard.dropped_tables.values().cloned());
205 let notification_version = self.env.notification_manager().current_version().await;
206 Ok((tables, notification_version))
207 }
208
209 async fn get_compute_node_total_cpu_count(&self) -> usize {
210 self.metadata_manager
211 .cluster_controller
212 .compute_node_total_cpu_count()
213 .await
214 }
215
216 async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
217 let (tables, catalog_version) = self.get_tables_snapshot().await?;
218 let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
219
220 Ok(MetaSnapshot {
221 tables,
222 version: Some(SnapshotVersion {
223 catalog_version,
224 ..Default::default()
225 }),
226 compute_node_total_cpu_count: compute_node_total_cpu_count as _,
227 ..Default::default()
228 })
229 }
230
231 async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
232 let (
233 (
234 databases,
235 schemas,
236 tables,
237 sources,
238 sinks,
239 subscriptions,
240 indexes,
241 views,
242 functions,
243 connections,
244 secrets,
245 ),
246 users,
247 catalog_version,
248 ) = self.get_catalog_snapshot().await?;
249
250 let decrypted_secrets = self.decrypt_secrets(secrets)?;
252
253 let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
254 self.get_worker_slot_mapping_snapshot().await?;
255 let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
256
257 let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
258
259 let hummock_version = self
260 .hummock_manager
261 .on_current_version(|version| {
262 FrontendHummockVersion::from_version(version).to_protobuf()
263 })
264 .await;
265
266 let session_params = self
267 .env
268 .session_params_manager_impl_ref()
269 .get_params()
270 .await;
271
272 let session_params = Some(GetSessionParamsResponse {
273 params: serde_json::to_string(&session_params)
274 .context("failed to encode session params")?,
275 });
276
277 let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
278
279 Ok(MetaSnapshot {
280 databases,
281 schemas,
282 sources,
283 sinks,
284 tables,
285 indexes,
286 views,
287 subscriptions,
288 functions,
289 connections,
290 secrets: decrypted_secrets,
291 users,
292 nodes,
293 hummock_version: Some(hummock_version),
294 version: Some(SnapshotVersion {
295 catalog_version,
296 worker_node_version,
297 streaming_worker_slot_mapping_version,
298 }),
299 serving_worker_slot_mappings,
300 streaming_worker_slot_mappings,
301 session_params,
302 compute_node_total_cpu_count: compute_node_total_cpu_count as _,
303 ..Default::default()
304 })
305 }
306
307 async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
308 let (tables, catalog_version) = self.get_tables_snapshot().await?;
309 let hummock_version = self
310 .hummock_manager
311 .on_current_version(|version| version.into())
312 .await;
313 let hummock_write_limits = self.hummock_manager.write_limits().await;
314 let meta_backup_manifest_id = self.backup_manager.manifest().manifest_id;
315 let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
316
317 Ok(MetaSnapshot {
318 tables,
319 hummock_version: Some(hummock_version),
320 version: Some(SnapshotVersion {
321 catalog_version,
322 ..Default::default()
323 }),
324 meta_backup_manifest_id: Some(MetaBackupManifestId {
325 id: meta_backup_manifest_id,
326 }),
327 hummock_write_limits: Some(WriteLimits {
328 write_limits: hummock_write_limits,
329 }),
330 compute_node_total_cpu_count: compute_node_total_cpu_count as _,
331 ..Default::default()
332 })
333 }
334
335 async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
336 let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
337 let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
338
339 Ok(MetaSnapshot {
340 secrets,
341 version: Some(SnapshotVersion {
342 catalog_version,
343 ..Default::default()
344 }),
345 compute_node_total_cpu_count: compute_node_total_cpu_count as _,
346 ..Default::default()
347 })
348 }
349}
350
351#[async_trait::async_trait]
352impl NotificationService for NotificationServiceImpl {
353 type SubscribeStream = UnboundedReceiverStream<Notification>;
354
355 #[cfg_attr(coverage, coverage(off))]
356 async fn subscribe(
357 &self,
358 request: Request<SubscribeRequest>,
359 ) -> Result<Response<Self::SubscribeStream>, Status> {
360 let req = request.into_inner();
361 let host_address = req.get_host()?.clone();
362 let subscribe_type = req.get_subscribe_type()?;
363
364 let worker_key = WorkerKey(host_address);
365
366 let (tx, rx) = mpsc::unbounded_channel();
367 self.env
368 .notification_manager()
369 .insert_sender(subscribe_type, worker_key.clone(), tx)
370 .await;
371
372 let meta_snapshot = match subscribe_type {
373 SubscribeType::Compactor => self.compactor_subscribe().await?,
374 SubscribeType::Frontend => self.frontend_subscribe().await?,
375 SubscribeType::Hummock => {
376 self.hummock_manager
377 .pin_version(req.get_worker_id())
378 .await?;
379 self.hummock_subscribe().await?
380 }
381 SubscribeType::Compute => self.compute_subscribe().await?,
382 SubscribeType::Unspecified => unreachable!(),
383 };
384
385 self.env
386 .notification_manager()
387 .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
388
389 Ok(Response::new(UnboundedReceiverStream::new(rx)))
390 }
391}