1use anyhow::{Context, anyhow};
16use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
17use risingwave_hummock_sdk::FrontendHummockVersion;
18use risingwave_meta::MetaResult;
19use risingwave_meta::controller::catalog::Catalog;
20use risingwave_meta::manager::MetadataManager;
21use risingwave_pb::backup_service::MetaBackupManifestId;
22use risingwave_pb::catalog::{Secret, Table};
23use risingwave_pb::common::worker_node::State::Running;
24use risingwave_pb::common::{ClusterResource, WorkerNode, WorkerType};
25use risingwave_pb::hummock::WriteLimits;
26use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
27use risingwave_pb::meta::notification_service_server::NotificationService;
28use risingwave_pb::meta::{
29 FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
30 SubscribeType,
31};
32use risingwave_pb::user::UserInfo;
33use tokio::sync::mpsc;
34use tokio_stream::wrappers::UnboundedReceiverStream;
35use tonic::{Request, Response, Status};
36
37use crate::backup_restore::BackupManagerRef;
38use crate::hummock::HummockManagerRef;
39use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
40use crate::serving::ServingVnodeMappingRef;
41
42pub struct NotificationServiceImpl {
43 env: MetaSrvEnv,
44
45 metadata_manager: MetadataManager,
46 hummock_manager: HummockManagerRef,
47 backup_manager: BackupManagerRef,
48 serving_vnode_mapping: ServingVnodeMappingRef,
49}
50
51impl NotificationServiceImpl {
52 pub async fn new(
53 env: MetaSrvEnv,
54 metadata_manager: MetadataManager,
55 hummock_manager: HummockManagerRef,
56 backup_manager: BackupManagerRef,
57 serving_vnode_mapping: ServingVnodeMappingRef,
58 ) -> MetaResult<Self> {
59 let service = Self {
60 env,
61 metadata_manager,
62 hummock_manager,
63 backup_manager,
64 serving_vnode_mapping,
65 };
66 let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
67 LocalSecretManager::global().init_secrets(secrets);
68 Ok(service)
69 }
70
71 async fn get_catalog_snapshot(
72 &self,
73 ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
74 let catalog_guard = self
75 .metadata_manager
76 .catalog_controller
77 .get_inner_read_guard()
78 .await;
79 let (
80 (
81 databases,
82 schemas,
83 tables,
84 sources,
85 sinks,
86 subscriptions,
87 indexes,
88 views,
89 functions,
90 connections,
91 secrets,
92 ),
93 users,
94 ) = catalog_guard.snapshot().await?;
95 let notification_version = self.env.notification_manager().current_version().await;
96 Ok((
97 (
98 databases,
99 schemas,
100 tables,
101 sources,
102 sinks,
103 subscriptions,
104 indexes,
105 views,
106 functions,
107 connections,
108 secrets,
109 ),
110 users,
111 notification_version,
112 ))
113 }
114
115 async fn get_decrypted_secret_snapshot(
117 &self,
118 ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
119 let catalog_guard = self
120 .metadata_manager
121 .catalog_controller
122 .get_inner_read_guard()
123 .await;
124 let secrets = catalog_guard.list_secrets().await?;
125 let notification_version = self.env.notification_manager().current_version().await;
126
127 let decrypted_secrets = self.decrypt_secrets(secrets)?;
128
129 Ok((decrypted_secrets, notification_version))
130 }
131
132 fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
133 if secrets.is_empty() {
135 return Ok(vec![]);
136 }
137 let secret_store_private_key = self
138 .env
139 .opts
140 .secret_store_private_key
141 .clone()
142 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
143 let mut decrypted_secrets = Vec::with_capacity(secrets.len());
144 for mut secret in secrets {
145 let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
146 .context(format!("failed to deserialize secret {}", secret.name))?;
147 let decrypted_secret = encrypted_secret
148 .decrypt(secret_store_private_key.as_slice())
149 .context(format!("failed to decrypt secret {}", secret.name))?;
150 secret.value = decrypted_secret;
151 decrypted_secrets.push(secret);
152 }
153 Ok(decrypted_secrets)
154 }
155
156 async fn get_worker_slot_mapping_snapshot(
157 &self,
158 ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
159 let mappings = self
160 .metadata_manager
161 .catalog_controller
162 .get_worker_slot_mappings();
163
164 let notification_version = self.env.notification_manager().current_version().await;
165 Ok((mappings, notification_version))
166 }
167
168 fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
169 self.serving_vnode_mapping
170 .all()
171 .iter()
172 .map(|(&fragment_id, mapping)| FragmentWorkerSlotMapping {
173 fragment_id,
174 mapping: Some(mapping.to_protobuf()),
175 })
176 .collect()
177 }
178
179 async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
180 let cluster_guard = self
181 .metadata_manager
182 .cluster_controller
183 .get_inner_read_guard()
184 .await;
185 let compute_nodes = cluster_guard
186 .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
187 .await?;
188 let frontends = cluster_guard
189 .list_workers(Some(WorkerType::Frontend.into()), Some(Running.into()))
190 .await?;
191 let worker_nodes = compute_nodes
192 .into_iter()
193 .chain(frontends.into_iter())
194 .collect();
195 let notification_version = self.env.notification_manager().current_version().await;
196 Ok((worker_nodes, notification_version))
197 }
198
199 async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
200 let catalog_guard = self
201 .metadata_manager
202 .catalog_controller
203 .get_inner_read_guard()
204 .await;
205 let mut tables = catalog_guard.list_all_state_tables().await?;
206 tables.extend(catalog_guard.dropped_tables.values().cloned());
207 let notification_version = self.env.notification_manager().current_version().await;
208 Ok((tables, notification_version))
209 }
210
211 async fn get_cluster_resource(&self) -> ClusterResource {
213 self.metadata_manager
214 .cluster_controller
215 .cluster_resource()
216 .await
217 }
218
219 async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
220 let (tables, catalog_version) = self.get_tables_snapshot().await?;
221 let cluster_resource = self.get_cluster_resource().await;
222
223 Ok(MetaSnapshot {
224 tables,
225 version: Some(SnapshotVersion {
226 catalog_version,
227 ..Default::default()
228 }),
229 cluster_resource: Some(cluster_resource),
230 ..Default::default()
231 })
232 }
233
234 async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
235 let (
236 (
237 databases,
238 schemas,
239 tables,
240 sources,
241 sinks,
242 subscriptions,
243 indexes,
244 views,
245 functions,
246 connections,
247 secrets,
248 ),
249 users,
250 catalog_version,
251 ) = self.get_catalog_snapshot().await?;
252
253 let decrypted_secrets = self.decrypt_secrets(secrets)?;
255
256 let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
257 self.get_worker_slot_mapping_snapshot().await?;
258
259 let streaming_job_count = self.metadata_manager.count_streaming_job().await?;
260 if streaming_job_count > 0 && streaming_worker_slot_mappings.is_empty() {
261 tracing::warn!(
262 streaming_job_count,
263 "frontend subscribe returns empty streaming_worker_slot_mappings while streaming jobs exist; meta may still be recovering"
264 );
265 }
266
267 let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
268
269 let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
270
271 let hummock_version = self
272 .hummock_manager
273 .on_current_version(|version| {
274 FrontendHummockVersion::from_version(version).to_protobuf()
275 })
276 .await;
277
278 let session_params = self
279 .env
280 .session_params_manager_impl_ref()
281 .get_params()
282 .await;
283
284 let session_params = Some(GetSessionParamsResponse {
285 params: serde_json::to_string(&session_params)
286 .context("failed to encode session params")?,
287 });
288
289 let cluster_resource = self.get_cluster_resource().await;
290
291 Ok(MetaSnapshot {
292 databases,
293 schemas,
294 sources,
295 sinks,
296 tables,
297 indexes,
298 views,
299 subscriptions,
300 functions,
301 connections,
302 secrets: decrypted_secrets,
303 users,
304 nodes,
305 hummock_version: Some(hummock_version),
306 version: Some(SnapshotVersion {
307 catalog_version,
308 worker_node_version,
309 streaming_worker_slot_mapping_version,
310 }),
311 serving_worker_slot_mappings,
312 streaming_worker_slot_mappings,
313 session_params,
314 cluster_resource: Some(cluster_resource),
315 ..Default::default()
316 })
317 }
318
319 async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
320 let (tables, catalog_version) = self.get_tables_snapshot().await?;
321 let hummock_version = self
322 .hummock_manager
323 .on_current_version(|version| version.into())
324 .await;
325 let hummock_write_limits = self.hummock_manager.write_limits().await;
326 let meta_backup_manifest_id = self.backup_manager.manifest().await.manifest_id;
327 let cluster_resource = self.get_cluster_resource().await;
328
329 Ok(MetaSnapshot {
330 tables,
331 hummock_version: Some(hummock_version),
332 version: Some(SnapshotVersion {
333 catalog_version,
334 ..Default::default()
335 }),
336 meta_backup_manifest_id: Some(MetaBackupManifestId {
337 id: meta_backup_manifest_id,
338 }),
339 hummock_write_limits: Some(WriteLimits {
340 write_limits: hummock_write_limits,
341 }),
342 cluster_resource: Some(cluster_resource),
343 ..Default::default()
344 })
345 }
346
347 async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
348 let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
349 let cluster_resource = self.get_cluster_resource().await;
350
351 Ok(MetaSnapshot {
352 secrets,
353 version: Some(SnapshotVersion {
354 catalog_version,
355 ..Default::default()
356 }),
357 cluster_resource: Some(cluster_resource),
358 ..Default::default()
359 })
360 }
361}
362
363#[async_trait::async_trait]
364impl NotificationService for NotificationServiceImpl {
365 type SubscribeStream = UnboundedReceiverStream<Notification>;
366
367 async fn subscribe(
368 &self,
369 request: Request<SubscribeRequest>,
370 ) -> Result<Response<Self::SubscribeStream>, Status> {
371 let req = request.into_inner();
372 let host_address = req.get_host()?.clone();
373 let subscribe_type = req.get_subscribe_type()?;
374
375 let worker_key = WorkerKey(host_address);
376
377 let (tx, rx) = mpsc::unbounded_channel();
378 self.env
379 .notification_manager()
380 .insert_sender(subscribe_type, worker_key.clone(), tx);
381
382 let meta_snapshot = match subscribe_type {
383 SubscribeType::Compactor => self.compactor_subscribe().await?,
384 SubscribeType::Frontend => self.frontend_subscribe().await?,
385 SubscribeType::Hummock => {
386 self.hummock_manager
387 .pin_version(req.get_worker_id())
388 .await?;
389 self.hummock_subscribe().await?
390 }
391 SubscribeType::Compute => self.compute_subscribe().await?,
392 SubscribeType::Unspecified => unreachable!(),
393 };
394
395 self.env
396 .notification_manager()
397 .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
398
399 Ok(Response::new(UnboundedReceiverStream::new(rx)))
400 }
401}