1use anyhow::{Context, anyhow};
16use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
17use risingwave_hummock_sdk::FrontendHummockVersion;
18use risingwave_meta::MetaResult;
19use risingwave_meta::controller::catalog::Catalog;
20use risingwave_meta::manager::MetadataManager;
21use risingwave_pb::backup_service::MetaBackupManifestId;
22use risingwave_pb::catalog::{Secret, Table};
23use risingwave_pb::common::worker_node::State::Running;
24use risingwave_pb::common::{ClusterResource, WorkerNode, WorkerType};
25use risingwave_pb::hummock::WriteLimits;
26use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
27use risingwave_pb::meta::notification_service_server::NotificationService;
28use risingwave_pb::meta::{
29 FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
30 SubscribeType,
31};
32use risingwave_pb::user::UserInfo;
33use tokio::sync::mpsc;
34use tokio_stream::wrappers::UnboundedReceiverStream;
35use tonic::{Request, Response, Status};
36
37use crate::backup_restore::BackupManagerRef;
38use crate::hummock::HummockManagerRef;
39use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
40use crate::serving::ServingVnodeMappingRef;
41
42pub struct NotificationServiceImpl {
43 env: MetaSrvEnv,
44
45 metadata_manager: MetadataManager,
46 hummock_manager: HummockManagerRef,
47 backup_manager: BackupManagerRef,
48 serving_vnode_mapping: ServingVnodeMappingRef,
49}
50
51impl NotificationServiceImpl {
52 pub async fn new(
53 env: MetaSrvEnv,
54 metadata_manager: MetadataManager,
55 hummock_manager: HummockManagerRef,
56 backup_manager: BackupManagerRef,
57 serving_vnode_mapping: ServingVnodeMappingRef,
58 ) -> MetaResult<Self> {
59 let service = Self {
60 env,
61 metadata_manager,
62 hummock_manager,
63 backup_manager,
64 serving_vnode_mapping,
65 };
66 let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
67 LocalSecretManager::global().init_secrets(secrets);
68 Ok(service)
69 }
70
71 async fn get_catalog_snapshot(
72 &self,
73 ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
74 let catalog_guard = self
75 .metadata_manager
76 .catalog_controller
77 .get_inner_read_guard()
78 .await;
79 let (
80 (
81 databases,
82 schemas,
83 tables,
84 sources,
85 sinks,
86 subscriptions,
87 indexes,
88 views,
89 functions,
90 connections,
91 secrets,
92 ),
93 users,
94 ) = catalog_guard.snapshot().await?;
95 let notification_version = self.env.notification_manager().current_version().await;
96 Ok((
97 (
98 databases,
99 schemas,
100 tables,
101 sources,
102 sinks,
103 subscriptions,
104 indexes,
105 views,
106 functions,
107 connections,
108 secrets,
109 ),
110 users,
111 notification_version,
112 ))
113 }
114
115 async fn get_decrypted_secret_snapshot(
117 &self,
118 ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
119 let catalog_guard = self
120 .metadata_manager
121 .catalog_controller
122 .get_inner_read_guard()
123 .await;
124 let secrets = catalog_guard.list_secrets().await?;
125 let notification_version = self.env.notification_manager().current_version().await;
126
127 let decrypted_secrets = self.decrypt_secrets(secrets)?;
128
129 Ok((decrypted_secrets, notification_version))
130 }
131
132 fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
133 if secrets.is_empty() {
135 return Ok(vec![]);
136 }
137 let secret_store_private_key = self
138 .env
139 .opts
140 .secret_store_private_key
141 .clone()
142 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
143 let mut decrypted_secrets = Vec::with_capacity(secrets.len());
144 for mut secret in secrets {
145 let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
146 .context(format!("failed to deserialize secret {}", secret.name))?;
147 let decrypted_secret = encrypted_secret
148 .decrypt(secret_store_private_key.as_slice())
149 .context(format!("failed to decrypt secret {}", secret.name))?;
150 secret.value = decrypted_secret;
151 decrypted_secrets.push(secret);
152 }
153 Ok(decrypted_secrets)
154 }
155
156 async fn get_worker_slot_mapping_snapshot(
157 &self,
158 ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
159 let mappings = self
160 .metadata_manager
161 .catalog_controller
162 .get_worker_slot_mappings();
163
164 let notification_version = self.env.notification_manager().current_version().await;
165 Ok((mappings, notification_version))
166 }
167
168 fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
169 self.serving_vnode_mapping
170 .all()
171 .iter()
172 .map(|(&fragment_id, mapping)| FragmentWorkerSlotMapping {
173 fragment_id,
174 mapping: Some(mapping.to_protobuf()),
175 })
176 .collect()
177 }
178
179 async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
180 let cluster_guard = self
181 .metadata_manager
182 .cluster_controller
183 .get_inner_read_guard()
184 .await;
185 let compute_nodes = cluster_guard
186 .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
187 .await?;
188 let frontends = cluster_guard
189 .list_workers(Some(WorkerType::Frontend.into()), Some(Running.into()))
190 .await?;
191 let worker_nodes = compute_nodes.into_iter().chain(frontends).collect();
192 let notification_version = self.env.notification_manager().current_version().await;
193 Ok((worker_nodes, notification_version))
194 }
195
196 async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
197 let catalog_guard = self
198 .metadata_manager
199 .catalog_controller
200 .get_inner_read_guard()
201 .await;
202 let mut tables = catalog_guard.list_all_state_tables().await?;
203 tables.extend(catalog_guard.dropped_tables.values().cloned());
204 let notification_version = self.env.notification_manager().current_version().await;
205 Ok((tables, notification_version))
206 }
207
208 async fn get_cluster_resource(&self) -> ClusterResource {
210 self.metadata_manager
211 .cluster_controller
212 .cluster_resource()
213 .await
214 }
215
216 async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
217 let (tables, catalog_version) = self.get_tables_snapshot().await?;
218 let cluster_resource = self.get_cluster_resource().await;
219
220 Ok(MetaSnapshot {
221 tables,
222 version: Some(SnapshotVersion {
223 catalog_version,
224 ..Default::default()
225 }),
226 cluster_resource: Some(cluster_resource),
227 ..Default::default()
228 })
229 }
230
231 async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
232 let (
233 (
234 databases,
235 schemas,
236 tables,
237 sources,
238 sinks,
239 subscriptions,
240 indexes,
241 views,
242 functions,
243 connections,
244 secrets,
245 ),
246 users,
247 catalog_version,
248 ) = self.get_catalog_snapshot().await?;
249 let object_dependencies = self
250 .metadata_manager
251 .catalog_controller
252 .list_created_object_dependencies()
253 .await?;
254
255 let decrypted_secrets = self.decrypt_secrets(secrets)?;
257
258 let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
259 self.get_worker_slot_mapping_snapshot().await?;
260
261 let streaming_job_count = self.metadata_manager.count_streaming_job().await?;
262 if streaming_job_count > 0 && streaming_worker_slot_mappings.is_empty() {
263 tracing::warn!(
264 streaming_job_count,
265 "frontend subscribe returns empty streaming_worker_slot_mappings while streaming jobs exist; meta may still be recovering"
266 );
267 }
268
269 let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
270
271 let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
272
273 let hummock_version = self
274 .hummock_manager
275 .on_current_version(|version| {
276 FrontendHummockVersion::from_version(version).to_protobuf()
277 })
278 .await;
279
280 let session_params = self
281 .env
282 .session_params_manager_impl_ref()
283 .get_params()
284 .await;
285
286 let session_params = Some(GetSessionParamsResponse {
287 params: serde_json::to_string(&session_params)
288 .context("failed to encode session params")?,
289 });
290
291 let cluster_resource = self.get_cluster_resource().await;
292
293 Ok(MetaSnapshot {
294 databases,
295 schemas,
296 sources,
297 sinks,
298 tables,
299 indexes,
300 views,
301 subscriptions,
302 functions,
303 connections,
304 secrets: decrypted_secrets,
305 users,
306 nodes,
307 hummock_version: Some(hummock_version),
308 version: Some(SnapshotVersion {
309 catalog_version,
310 worker_node_version,
311 streaming_worker_slot_mapping_version,
312 }),
313 serving_worker_slot_mappings,
314 streaming_worker_slot_mappings,
315 session_params,
316 object_dependencies,
317 cluster_resource: Some(cluster_resource),
318 ..Default::default()
319 })
320 }
321
322 async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
323 let (tables, catalog_version) = self.get_tables_snapshot().await?;
324 let hummock_version = self
325 .hummock_manager
326 .on_current_version(|version| version.into())
327 .await;
328 let hummock_write_limits = self.hummock_manager.write_limits().await;
329 let meta_backup_manifest_id = self.backup_manager.manifest().await.manifest_id;
330 let cluster_resource = self.get_cluster_resource().await;
331
332 Ok(MetaSnapshot {
333 tables,
334 hummock_version: Some(hummock_version),
335 version: Some(SnapshotVersion {
336 catalog_version,
337 ..Default::default()
338 }),
339 meta_backup_manifest_id: Some(MetaBackupManifestId {
340 id: meta_backup_manifest_id,
341 }),
342 hummock_write_limits: Some(WriteLimits {
343 write_limits: hummock_write_limits,
344 }),
345 cluster_resource: Some(cluster_resource),
346 ..Default::default()
347 })
348 }
349
350 async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
351 let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
352 let cluster_resource = self.get_cluster_resource().await;
353
354 Ok(MetaSnapshot {
355 secrets,
356 version: Some(SnapshotVersion {
357 catalog_version,
358 ..Default::default()
359 }),
360 cluster_resource: Some(cluster_resource),
361 ..Default::default()
362 })
363 }
364}
365
366#[async_trait::async_trait]
367impl NotificationService for NotificationServiceImpl {
368 type SubscribeStream = UnboundedReceiverStream<Notification>;
369
370 async fn subscribe(
371 &self,
372 request: Request<SubscribeRequest>,
373 ) -> Result<Response<Self::SubscribeStream>, Status> {
374 let req = request.into_inner();
375 let host_address = req.get_host()?.clone();
376 let subscribe_type = req.get_subscribe_type()?;
377
378 let worker_key = WorkerKey(host_address);
379
380 let (tx, rx) = mpsc::unbounded_channel();
381 self.env
382 .notification_manager()
383 .insert_sender(subscribe_type, worker_key.clone(), tx);
384
385 let meta_snapshot = match subscribe_type {
386 SubscribeType::Compactor => self.compactor_subscribe().await?,
387 SubscribeType::Frontend => self.frontend_subscribe().await?,
388 SubscribeType::Hummock => {
389 self.hummock_manager
390 .pin_version(req.get_worker_id())
391 .await?;
392 self.hummock_subscribe().await?
393 }
394 SubscribeType::Compute => self.compute_subscribe().await?,
395 SubscribeType::Unspecified => unreachable!(),
396 };
397
398 self.env
399 .notification_manager()
400 .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
401
402 Ok(Response::new(UnboundedReceiverStream::new(rx)))
403 }
404}