1use anyhow::{Context, anyhow};
16use itertools::Itertools;
17use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
18use risingwave_hummock_sdk::FrontendHummockVersion;
19use risingwave_meta::MetaResult;
20use risingwave_meta::controller::catalog::Catalog;
21use risingwave_meta::manager::MetadataManager;
22use risingwave_pb::backup_service::MetaBackupManifestId;
23use risingwave_pb::catalog::{Secret, Table};
24use risingwave_pb::common::worker_node::State::Running;
25use risingwave_pb::common::{WorkerNode, WorkerType};
26use risingwave_pb::hummock::WriteLimits;
27use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
28use risingwave_pb::meta::notification_service_server::NotificationService;
29use risingwave_pb::meta::{
30 FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
31 SubscribeType,
32};
33use risingwave_pb::user::UserInfo;
34use tokio::sync::mpsc;
35use tokio_stream::wrappers::UnboundedReceiverStream;
36use tonic::{Request, Response, Status};
37
38use crate::backup_restore::BackupManagerRef;
39use crate::hummock::HummockManagerRef;
40use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
41use crate::serving::ServingVnodeMappingRef;
42
43pub struct NotificationServiceImpl {
44 env: MetaSrvEnv,
45
46 metadata_manager: MetadataManager,
47 hummock_manager: HummockManagerRef,
48 backup_manager: BackupManagerRef,
49 serving_vnode_mapping: ServingVnodeMappingRef,
50}
51
52impl NotificationServiceImpl {
53 pub async fn new(
54 env: MetaSrvEnv,
55 metadata_manager: MetadataManager,
56 hummock_manager: HummockManagerRef,
57 backup_manager: BackupManagerRef,
58 serving_vnode_mapping: ServingVnodeMappingRef,
59 ) -> MetaResult<Self> {
60 let service = Self {
61 env,
62 metadata_manager,
63 hummock_manager,
64 backup_manager,
65 serving_vnode_mapping,
66 };
67 let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
68 LocalSecretManager::global().init_secrets(secrets);
69 Ok(service)
70 }
71
72 async fn get_catalog_snapshot(
73 &self,
74 ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
75 let catalog_guard = self
76 .metadata_manager
77 .catalog_controller
78 .get_inner_read_guard()
79 .await;
80 let (
81 (
82 databases,
83 schemas,
84 tables,
85 sources,
86 sinks,
87 subscriptions,
88 indexes,
89 views,
90 functions,
91 connections,
92 secrets,
93 ),
94 users,
95 ) = catalog_guard.snapshot().await?;
96 let notification_version = self.env.notification_manager().current_version().await;
97 Ok((
98 (
99 databases,
100 schemas,
101 tables,
102 sources,
103 sinks,
104 subscriptions,
105 indexes,
106 views,
107 functions,
108 connections,
109 secrets,
110 ),
111 users,
112 notification_version,
113 ))
114 }
115
116 async fn get_decrypted_secret_snapshot(
118 &self,
119 ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
120 let catalog_guard = self
121 .metadata_manager
122 .catalog_controller
123 .get_inner_read_guard()
124 .await;
125 let secrets = catalog_guard.list_secrets().await?;
126 let notification_version = self.env.notification_manager().current_version().await;
127
128 let decrypted_secrets = self.decrypt_secrets(secrets)?;
129
130 Ok((decrypted_secrets, notification_version))
131 }
132
133 fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
134 if secrets.is_empty() {
136 return Ok(vec![]);
137 }
138 let secret_store_private_key = self
139 .env
140 .opts
141 .secret_store_private_key
142 .clone()
143 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
144 let mut decrypted_secrets = Vec::with_capacity(secrets.len());
145 for mut secret in secrets {
146 let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
147 .context(format!("failed to deserialize secret {}", secret.name))?;
148 let decrypted_secret = encrypted_secret
149 .decrypt(secret_store_private_key.as_slice())
150 .context(format!("failed to decrypt secret {}", secret.name))?;
151 secret.value = decrypted_secret;
152 decrypted_secrets.push(secret);
153 }
154 Ok(decrypted_secrets)
155 }
156
157 async fn get_worker_slot_mapping_snapshot(
158 &self,
159 ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
160 let fragment_guard = self
161 .metadata_manager
162 .catalog_controller
163 .get_inner_read_guard()
164 .await;
165 let worker_slot_mappings = fragment_guard
166 .all_running_fragment_mappings()
167 .await?
168 .collect_vec();
169 let notification_version = self.env.notification_manager().current_version().await;
170 Ok((worker_slot_mappings, notification_version))
171 }
172
173 fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
174 self.serving_vnode_mapping
175 .all()
176 .iter()
177 .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
178 fragment_id: *fragment_id,
179 mapping: Some(mapping.to_protobuf()),
180 })
181 .collect()
182 }
183
184 async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
185 let cluster_guard = self
186 .metadata_manager
187 .cluster_controller
188 .get_inner_read_guard()
189 .await;
190 let compute_nodes = cluster_guard
191 .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
192 .await?;
193 let frontends = cluster_guard
194 .list_workers(Some(WorkerType::Frontend.into()), Some(Running.into()))
195 .await?;
196 let worker_nodes = compute_nodes
197 .into_iter()
198 .chain(frontends.into_iter())
199 .collect();
200 let notification_version = self.env.notification_manager().current_version().await;
201 Ok((worker_nodes, notification_version))
202 }
203
204 async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
205 let catalog_guard = self
206 .metadata_manager
207 .catalog_controller
208 .get_inner_read_guard()
209 .await;
210 let mut tables = catalog_guard.list_all_state_tables().await?;
211 tables.extend(catalog_guard.dropped_tables.values().cloned());
212 let notification_version = self.env.notification_manager().current_version().await;
213 Ok((tables, notification_version))
214 }
215
216 async fn get_compute_node_total_cpu_count(&self) -> usize {
217 self.metadata_manager
218 .cluster_controller
219 .compute_node_total_cpu_count()
220 .await
221 }
222
223 async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
224 let (tables, catalog_version) = self.get_tables_snapshot().await?;
225 let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
226
227 Ok(MetaSnapshot {
228 tables,
229 version: Some(SnapshotVersion {
230 catalog_version,
231 ..Default::default()
232 }),
233 compute_node_total_cpu_count: compute_node_total_cpu_count as _,
234 ..Default::default()
235 })
236 }
237
238 async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
239 let (
240 (
241 databases,
242 schemas,
243 tables,
244 sources,
245 sinks,
246 subscriptions,
247 indexes,
248 views,
249 functions,
250 connections,
251 secrets,
252 ),
253 users,
254 catalog_version,
255 ) = self.get_catalog_snapshot().await?;
256
257 let decrypted_secrets = self.decrypt_secrets(secrets)?;
259
260 let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
261 self.get_worker_slot_mapping_snapshot().await?;
262 let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
263
264 let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
265
266 let hummock_version = self
267 .hummock_manager
268 .on_current_version(|version| {
269 FrontendHummockVersion::from_version(version).to_protobuf()
270 })
271 .await;
272
273 let session_params = self
274 .env
275 .session_params_manager_impl_ref()
276 .get_params()
277 .await;
278
279 let session_params = Some(GetSessionParamsResponse {
280 params: serde_json::to_string(&session_params)
281 .context("failed to encode session params")?,
282 });
283
284 let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
285
286 Ok(MetaSnapshot {
287 databases,
288 schemas,
289 sources,
290 sinks,
291 tables,
292 indexes,
293 views,
294 subscriptions,
295 functions,
296 connections,
297 secrets: decrypted_secrets,
298 users,
299 nodes,
300 hummock_version: Some(hummock_version),
301 version: Some(SnapshotVersion {
302 catalog_version,
303 worker_node_version,
304 streaming_worker_slot_mapping_version,
305 }),
306 serving_worker_slot_mappings,
307 streaming_worker_slot_mappings,
308 session_params,
309 compute_node_total_cpu_count: compute_node_total_cpu_count as _,
310 ..Default::default()
311 })
312 }
313
314 async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
315 let (tables, catalog_version) = self.get_tables_snapshot().await?;
316 let hummock_version = self
317 .hummock_manager
318 .on_current_version(|version| version.into())
319 .await;
320 let hummock_write_limits = self.hummock_manager.write_limits().await;
321 let meta_backup_manifest_id = self.backup_manager.manifest().manifest_id;
322 let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
323
324 Ok(MetaSnapshot {
325 tables,
326 hummock_version: Some(hummock_version),
327 version: Some(SnapshotVersion {
328 catalog_version,
329 ..Default::default()
330 }),
331 meta_backup_manifest_id: Some(MetaBackupManifestId {
332 id: meta_backup_manifest_id,
333 }),
334 hummock_write_limits: Some(WriteLimits {
335 write_limits: hummock_write_limits,
336 }),
337 compute_node_total_cpu_count: compute_node_total_cpu_count as _,
338 ..Default::default()
339 })
340 }
341
342 async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
343 let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
344 let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
345
346 Ok(MetaSnapshot {
347 secrets,
348 version: Some(SnapshotVersion {
349 catalog_version,
350 ..Default::default()
351 }),
352 compute_node_total_cpu_count: compute_node_total_cpu_count as _,
353 ..Default::default()
354 })
355 }
356}
357
358#[async_trait::async_trait]
359impl NotificationService for NotificationServiceImpl {
360 type SubscribeStream = UnboundedReceiverStream<Notification>;
361
362 async fn subscribe(
363 &self,
364 request: Request<SubscribeRequest>,
365 ) -> Result<Response<Self::SubscribeStream>, Status> {
366 let req = request.into_inner();
367 let host_address = req.get_host()?.clone();
368 let subscribe_type = req.get_subscribe_type()?;
369
370 let worker_key = WorkerKey(host_address);
371
372 let (tx, rx) = mpsc::unbounded_channel();
373 self.env
374 .notification_manager()
375 .insert_sender(subscribe_type, worker_key.clone(), tx)
376 .await;
377
378 let meta_snapshot = match subscribe_type {
379 SubscribeType::Compactor => self.compactor_subscribe().await?,
380 SubscribeType::Frontend => self.frontend_subscribe().await?,
381 SubscribeType::Hummock => {
382 self.hummock_manager
383 .pin_version(req.get_worker_id())
384 .await?;
385 self.hummock_subscribe().await?
386 }
387 SubscribeType::Compute => self.compute_subscribe().await?,
388 SubscribeType::Unspecified => unreachable!(),
389 };
390
391 self.env
392 .notification_manager()
393 .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
394
395 Ok(Response::new(UnboundedReceiverStream::new(rx)))
396 }
397}