1use anyhow::{Context, anyhow};
16use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
17use risingwave_hummock_sdk::FrontendHummockVersion;
18use risingwave_meta::MetaResult;
19use risingwave_meta::controller::catalog::Catalog;
20use risingwave_meta::manager::MetadataManager;
21use risingwave_pb::backup_service::MetaBackupManifestId;
22use risingwave_pb::catalog::{Secret, Table};
23use risingwave_pb::common::worker_node::State::Running;
24use risingwave_pb::common::{ClusterResource, WorkerNode, WorkerType};
25use risingwave_pb::hummock::WriteLimits;
26use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
27use risingwave_pb::meta::notification_service_server::NotificationService;
28use risingwave_pb::meta::{
29 FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
30 SubscribeType,
31};
32use risingwave_pb::user::UserInfo;
33use tokio::sync::mpsc;
34use tokio_stream::wrappers::UnboundedReceiverStream;
35use tonic::{Request, Response, Status};
36
37use crate::backup_restore::BackupManagerRef;
38use crate::hummock::HummockManagerRef;
39use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
40use crate::serving::ServingVnodeMappingRef;
41
42pub struct NotificationServiceImpl {
43 env: MetaSrvEnv,
44
45 metadata_manager: MetadataManager,
46 hummock_manager: HummockManagerRef,
47 backup_manager: BackupManagerRef,
48 serving_vnode_mapping: ServingVnodeMappingRef,
49}
50
51impl NotificationServiceImpl {
52 pub async fn new(
53 env: MetaSrvEnv,
54 metadata_manager: MetadataManager,
55 hummock_manager: HummockManagerRef,
56 backup_manager: BackupManagerRef,
57 serving_vnode_mapping: ServingVnodeMappingRef,
58 ) -> MetaResult<Self> {
59 let service = Self {
60 env,
61 metadata_manager,
62 hummock_manager,
63 backup_manager,
64 serving_vnode_mapping,
65 };
66 let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
67 LocalSecretManager::global().init_secrets(secrets);
68 Ok(service)
69 }
70
71 async fn get_catalog_snapshot(
72 &self,
73 ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
74 let catalog_guard = self
75 .metadata_manager
76 .catalog_controller
77 .get_inner_read_guard()
78 .await;
79 let (
80 (
81 databases,
82 schemas,
83 tables,
84 sources,
85 sinks,
86 subscriptions,
87 indexes,
88 views,
89 functions,
90 connections,
91 secrets,
92 ),
93 users,
94 ) = catalog_guard.snapshot().await?;
95 let notification_version = self.env.notification_manager().current_version().await;
96 Ok((
97 (
98 databases,
99 schemas,
100 tables,
101 sources,
102 sinks,
103 subscriptions,
104 indexes,
105 views,
106 functions,
107 connections,
108 secrets,
109 ),
110 users,
111 notification_version,
112 ))
113 }
114
115 async fn get_decrypted_secret_snapshot(
117 &self,
118 ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
119 let catalog_guard = self
120 .metadata_manager
121 .catalog_controller
122 .get_inner_read_guard()
123 .await;
124 let secrets = catalog_guard.list_secrets().await?;
125 let notification_version = self.env.notification_manager().current_version().await;
126
127 let decrypted_secrets = self.decrypt_secrets(secrets)?;
128
129 Ok((decrypted_secrets, notification_version))
130 }
131
132 fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
133 if secrets.is_empty() {
135 return Ok(vec![]);
136 }
137 let secret_store_private_key = self
138 .env
139 .opts
140 .secret_store_private_key
141 .clone()
142 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
143 let mut decrypted_secrets = Vec::with_capacity(secrets.len());
144 for mut secret in secrets {
145 let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
146 .context(format!("failed to deserialize secret {}", secret.name))?;
147 let decrypted_secret = encrypted_secret
148 .decrypt(secret_store_private_key.as_slice())
149 .context(format!("failed to decrypt secret {}", secret.name))?;
150 secret.value = decrypted_secret;
151 decrypted_secrets.push(secret);
152 }
153 Ok(decrypted_secrets)
154 }
155
156 async fn get_worker_slot_mapping_snapshot(
157 &self,
158 ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
159 let mappings = self
160 .metadata_manager
161 .catalog_controller
162 .get_worker_slot_mappings();
163
164 let notification_version = self.env.notification_manager().current_version().await;
165 Ok((mappings, notification_version))
166 }
167
168 fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
169 self.serving_vnode_mapping
170 .all()
171 .iter()
172 .map(|(&fragment_id, mapping)| FragmentWorkerSlotMapping {
173 fragment_id,
174 mapping: Some(mapping.to_protobuf()),
175 })
176 .collect()
177 }
178
179 async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
180 let cluster_guard = self
181 .metadata_manager
182 .cluster_controller
183 .get_inner_read_guard()
184 .await;
185 let compute_nodes = cluster_guard
186 .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
187 .await?;
188 let frontends = cluster_guard
189 .list_workers(Some(WorkerType::Frontend.into()), Some(Running.into()))
190 .await?;
191 let worker_nodes = compute_nodes
192 .into_iter()
193 .chain(frontends.into_iter())
194 .collect();
195 let notification_version = self.env.notification_manager().current_version().await;
196 Ok((worker_nodes, notification_version))
197 }
198
199 async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
200 let catalog_guard = self
201 .metadata_manager
202 .catalog_controller
203 .get_inner_read_guard()
204 .await;
205 let mut tables = catalog_guard.list_all_state_tables().await?;
206 tables.extend(catalog_guard.dropped_tables.values().cloned());
207 let notification_version = self.env.notification_manager().current_version().await;
208 Ok((tables, notification_version))
209 }
210
211 async fn get_cluster_resource(&self) -> ClusterResource {
213 self.metadata_manager
214 .cluster_controller
215 .cluster_resource()
216 .await
217 }
218
219 async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
220 let (tables, catalog_version) = self.get_tables_snapshot().await?;
221 let cluster_resource = self.get_cluster_resource().await;
222
223 Ok(MetaSnapshot {
224 tables,
225 version: Some(SnapshotVersion {
226 catalog_version,
227 ..Default::default()
228 }),
229 cluster_resource: Some(cluster_resource),
230 ..Default::default()
231 })
232 }
233
234 async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
235 let (
236 (
237 databases,
238 schemas,
239 tables,
240 sources,
241 sinks,
242 subscriptions,
243 indexes,
244 views,
245 functions,
246 connections,
247 secrets,
248 ),
249 users,
250 catalog_version,
251 ) = self.get_catalog_snapshot().await?;
252 let object_dependencies = self
253 .metadata_manager
254 .catalog_controller
255 .list_created_object_dependencies()
256 .await?;
257
258 let decrypted_secrets = self.decrypt_secrets(secrets)?;
260
261 let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
262 self.get_worker_slot_mapping_snapshot().await?;
263
264 let streaming_job_count = self.metadata_manager.count_streaming_job().await?;
265 if streaming_job_count > 0 && streaming_worker_slot_mappings.is_empty() {
266 tracing::warn!(
267 streaming_job_count,
268 "frontend subscribe returns empty streaming_worker_slot_mappings while streaming jobs exist; meta may still be recovering"
269 );
270 }
271
272 let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
273
274 let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
275
276 let hummock_version = self
277 .hummock_manager
278 .on_current_version(|version| {
279 FrontendHummockVersion::from_version(version).to_protobuf()
280 })
281 .await;
282
283 let session_params = self
284 .env
285 .session_params_manager_impl_ref()
286 .get_params()
287 .await;
288
289 let session_params = Some(GetSessionParamsResponse {
290 params: serde_json::to_string(&session_params)
291 .context("failed to encode session params")?,
292 });
293
294 let cluster_resource = self.get_cluster_resource().await;
295
296 Ok(MetaSnapshot {
297 databases,
298 schemas,
299 sources,
300 sinks,
301 tables,
302 indexes,
303 views,
304 subscriptions,
305 functions,
306 connections,
307 secrets: decrypted_secrets,
308 users,
309 nodes,
310 hummock_version: Some(hummock_version),
311 version: Some(SnapshotVersion {
312 catalog_version,
313 worker_node_version,
314 streaming_worker_slot_mapping_version,
315 }),
316 serving_worker_slot_mappings,
317 streaming_worker_slot_mappings,
318 session_params,
319 object_dependencies,
320 cluster_resource: Some(cluster_resource),
321 ..Default::default()
322 })
323 }
324
325 async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
326 let (tables, catalog_version) = self.get_tables_snapshot().await?;
327 let hummock_version = self
328 .hummock_manager
329 .on_current_version(|version| version.into())
330 .await;
331 let hummock_write_limits = self.hummock_manager.write_limits().await;
332 let meta_backup_manifest_id = self.backup_manager.manifest().await.manifest_id;
333 let cluster_resource = self.get_cluster_resource().await;
334
335 Ok(MetaSnapshot {
336 tables,
337 hummock_version: Some(hummock_version),
338 version: Some(SnapshotVersion {
339 catalog_version,
340 ..Default::default()
341 }),
342 meta_backup_manifest_id: Some(MetaBackupManifestId {
343 id: meta_backup_manifest_id,
344 }),
345 hummock_write_limits: Some(WriteLimits {
346 write_limits: hummock_write_limits,
347 }),
348 cluster_resource: Some(cluster_resource),
349 ..Default::default()
350 })
351 }
352
353 async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
354 let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
355 let cluster_resource = self.get_cluster_resource().await;
356
357 Ok(MetaSnapshot {
358 secrets,
359 version: Some(SnapshotVersion {
360 catalog_version,
361 ..Default::default()
362 }),
363 cluster_resource: Some(cluster_resource),
364 ..Default::default()
365 })
366 }
367}
368
369#[async_trait::async_trait]
370impl NotificationService for NotificationServiceImpl {
371 type SubscribeStream = UnboundedReceiverStream<Notification>;
372
373 async fn subscribe(
374 &self,
375 request: Request<SubscribeRequest>,
376 ) -> Result<Response<Self::SubscribeStream>, Status> {
377 let req = request.into_inner();
378 let host_address = req.get_host()?.clone();
379 let subscribe_type = req.get_subscribe_type()?;
380
381 let worker_key = WorkerKey(host_address);
382
383 let (tx, rx) = mpsc::unbounded_channel();
384 self.env
385 .notification_manager()
386 .insert_sender(subscribe_type, worker_key.clone(), tx);
387
388 let meta_snapshot = match subscribe_type {
389 SubscribeType::Compactor => self.compactor_subscribe().await?,
390 SubscribeType::Frontend => self.frontend_subscribe().await?,
391 SubscribeType::Hummock => {
392 self.hummock_manager
393 .pin_version(req.get_worker_id())
394 .await?;
395 self.hummock_subscribe().await?
396 }
397 SubscribeType::Compute => self.compute_subscribe().await?,
398 SubscribeType::Unspecified => unreachable!(),
399 };
400
401 self.env
402 .notification_manager()
403 .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
404
405 Ok(Response::new(UnboundedReceiverStream::new(rx)))
406 }
407}