1use anyhow::{Context, anyhow};
16use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
17use risingwave_hummock_sdk::FrontendHummockVersion;
18use risingwave_meta::MetaResult;
19use risingwave_meta::controller::catalog::Catalog;
20use risingwave_meta::manager::MetadataManager;
21use risingwave_pb::backup_service::MetaBackupManifestId;
22use risingwave_pb::catalog::{Secret, Table};
23use risingwave_pb::common::worker_node::State::Running;
24use risingwave_pb::common::{ClusterResource, WorkerNode, WorkerType};
25use risingwave_pb::hummock::WriteLimits;
26use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
27use risingwave_pb::meta::notification_service_server::NotificationService;
28use risingwave_pb::meta::{
29    FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
30    SubscribeType,
31};
32use risingwave_pb::user::UserInfo;
33use tokio::sync::mpsc;
34use tokio_stream::wrappers::UnboundedReceiverStream;
35use tonic::{Request, Response, Status};
36
37use crate::backup_restore::BackupManagerRef;
38use crate::hummock::HummockManagerRef;
39use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
40use crate::serving::ServingVnodeMappingRef;
41
42pub struct NotificationServiceImpl {
43    env: MetaSrvEnv,
44
45    metadata_manager: MetadataManager,
46    hummock_manager: HummockManagerRef,
47    backup_manager: BackupManagerRef,
48    serving_vnode_mapping: ServingVnodeMappingRef,
49}
50
51impl NotificationServiceImpl {
52    pub async fn new(
53        env: MetaSrvEnv,
54        metadata_manager: MetadataManager,
55        hummock_manager: HummockManagerRef,
56        backup_manager: BackupManagerRef,
57        serving_vnode_mapping: ServingVnodeMappingRef,
58    ) -> MetaResult<Self> {
59        let service = Self {
60            env,
61            metadata_manager,
62            hummock_manager,
63            backup_manager,
64            serving_vnode_mapping,
65        };
66        let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
67        LocalSecretManager::global().init_secrets(secrets);
68        Ok(service)
69    }
70
71    async fn get_catalog_snapshot(
72        &self,
73    ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
74        let catalog_guard = self
75            .metadata_manager
76            .catalog_controller
77            .get_inner_read_guard()
78            .await;
79        let (
80            (
81                databases,
82                schemas,
83                tables,
84                sources,
85                sinks,
86                subscriptions,
87                indexes,
88                views,
89                functions,
90                connections,
91                secrets,
92            ),
93            users,
94        ) = catalog_guard.snapshot().await?;
95        let notification_version = self.env.notification_manager().current_version().await;
96        Ok((
97            (
98                databases,
99                schemas,
100                tables,
101                sources,
102                sinks,
103                subscriptions,
104                indexes,
105                views,
106                functions,
107                connections,
108                secrets,
109            ),
110            users,
111            notification_version,
112        ))
113    }
114
115    async fn get_decrypted_secret_snapshot(
117        &self,
118    ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
119        let catalog_guard = self
120            .metadata_manager
121            .catalog_controller
122            .get_inner_read_guard()
123            .await;
124        let secrets = catalog_guard.list_secrets().await?;
125        let notification_version = self.env.notification_manager().current_version().await;
126
127        let decrypted_secrets = self.decrypt_secrets(secrets)?;
128
129        Ok((decrypted_secrets, notification_version))
130    }
131
132    fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
133        if secrets.is_empty() {
135            return Ok(vec![]);
136        }
137        let secret_store_private_key = self
138            .env
139            .opts
140            .secret_store_private_key
141            .clone()
142            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
143        let mut decrypted_secrets = Vec::with_capacity(secrets.len());
144        for mut secret in secrets {
145            let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
146                .context(format!("failed to deserialize secret {}", secret.name))?;
147            let decrypted_secret = encrypted_secret
148                .decrypt(secret_store_private_key.as_slice())
149                .context(format!("failed to decrypt secret {}", secret.name))?;
150            secret.value = decrypted_secret;
151            decrypted_secrets.push(secret);
152        }
153        Ok(decrypted_secrets)
154    }
155
156    async fn get_worker_slot_mapping_snapshot(
157        &self,
158    ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
159        let mappings = self
160            .metadata_manager
161            .catalog_controller
162            .get_worker_slot_mappings();
163
164        let notification_version = self.env.notification_manager().current_version().await;
165        Ok((mappings, notification_version))
166    }
167
168    fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
169        self.serving_vnode_mapping
170            .all()
171            .iter()
172            .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
173                fragment_id: *fragment_id,
174                mapping: Some(mapping.to_protobuf()),
175            })
176            .collect()
177    }
178
179    async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
180        let cluster_guard = self
181            .metadata_manager
182            .cluster_controller
183            .get_inner_read_guard()
184            .await;
185        let compute_nodes = cluster_guard
186            .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
187            .await?;
188        let frontends = cluster_guard
189            .list_workers(Some(WorkerType::Frontend.into()), Some(Running.into()))
190            .await?;
191        let worker_nodes = compute_nodes
192            .into_iter()
193            .chain(frontends.into_iter())
194            .collect();
195        let notification_version = self.env.notification_manager().current_version().await;
196        Ok((worker_nodes, notification_version))
197    }
198
199    async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
200        let catalog_guard = self
201            .metadata_manager
202            .catalog_controller
203            .get_inner_read_guard()
204            .await;
205        let mut tables = catalog_guard.list_all_state_tables().await?;
206        tables.extend(catalog_guard.dropped_tables.values().cloned());
207        let notification_version = self.env.notification_manager().current_version().await;
208        Ok((tables, notification_version))
209    }
210
211    async fn get_cluster_resource(&self) -> ClusterResource {
213        self.metadata_manager
214            .cluster_controller
215            .cluster_resource()
216            .await
217    }
218
219    async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
220        let (tables, catalog_version) = self.get_tables_snapshot().await?;
221        let cluster_resource = self.get_cluster_resource().await;
222
223        Ok(MetaSnapshot {
224            tables,
225            version: Some(SnapshotVersion {
226                catalog_version,
227                ..Default::default()
228            }),
229            cluster_resource: Some(cluster_resource),
230            ..Default::default()
231        })
232    }
233
234    async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
235        let (
236            (
237                databases,
238                schemas,
239                tables,
240                sources,
241                sinks,
242                subscriptions,
243                indexes,
244                views,
245                functions,
246                connections,
247                secrets,
248            ),
249            users,
250            catalog_version,
251        ) = self.get_catalog_snapshot().await?;
252
253        let decrypted_secrets = self.decrypt_secrets(secrets)?;
255
256        let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
257            self.get_worker_slot_mapping_snapshot().await?;
258        let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
259
260        let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
261
262        let hummock_version = self
263            .hummock_manager
264            .on_current_version(|version| {
265                FrontendHummockVersion::from_version(version).to_protobuf()
266            })
267            .await;
268
269        let session_params = self
270            .env
271            .session_params_manager_impl_ref()
272            .get_params()
273            .await;
274
275        let session_params = Some(GetSessionParamsResponse {
276            params: serde_json::to_string(&session_params)
277                .context("failed to encode session params")?,
278        });
279
280        let cluster_resource = self.get_cluster_resource().await;
281
282        Ok(MetaSnapshot {
283            databases,
284            schemas,
285            sources,
286            sinks,
287            tables,
288            indexes,
289            views,
290            subscriptions,
291            functions,
292            connections,
293            secrets: decrypted_secrets,
294            users,
295            nodes,
296            hummock_version: Some(hummock_version),
297            version: Some(SnapshotVersion {
298                catalog_version,
299                worker_node_version,
300                streaming_worker_slot_mapping_version,
301            }),
302            serving_worker_slot_mappings,
303            streaming_worker_slot_mappings,
304            session_params,
305            cluster_resource: Some(cluster_resource),
306            ..Default::default()
307        })
308    }
309
310    async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
311        let (tables, catalog_version) = self.get_tables_snapshot().await?;
312        let hummock_version = self
313            .hummock_manager
314            .on_current_version(|version| version.into())
315            .await;
316        let hummock_write_limits = self.hummock_manager.write_limits().await;
317        let meta_backup_manifest_id = self.backup_manager.manifest().manifest_id;
318        let cluster_resource = self.get_cluster_resource().await;
319
320        Ok(MetaSnapshot {
321            tables,
322            hummock_version: Some(hummock_version),
323            version: Some(SnapshotVersion {
324                catalog_version,
325                ..Default::default()
326            }),
327            meta_backup_manifest_id: Some(MetaBackupManifestId {
328                id: meta_backup_manifest_id,
329            }),
330            hummock_write_limits: Some(WriteLimits {
331                write_limits: hummock_write_limits,
332            }),
333            cluster_resource: Some(cluster_resource),
334            ..Default::default()
335        })
336    }
337
338    async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
339        let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
340        let cluster_resource = self.get_cluster_resource().await;
341
342        Ok(MetaSnapshot {
343            secrets,
344            version: Some(SnapshotVersion {
345                catalog_version,
346                ..Default::default()
347            }),
348            cluster_resource: Some(cluster_resource),
349            ..Default::default()
350        })
351    }
352}
353
354#[async_trait::async_trait]
355impl NotificationService for NotificationServiceImpl {
356    type SubscribeStream = UnboundedReceiverStream<Notification>;
357
358    async fn subscribe(
359        &self,
360        request: Request<SubscribeRequest>,
361    ) -> Result<Response<Self::SubscribeStream>, Status> {
362        let req = request.into_inner();
363        let host_address = req.get_host()?.clone();
364        let subscribe_type = req.get_subscribe_type()?;
365
366        let worker_key = WorkerKey(host_address);
367
368        let (tx, rx) = mpsc::unbounded_channel();
369        self.env
370            .notification_manager()
371            .insert_sender(subscribe_type, worker_key.clone(), tx);
372
373        let meta_snapshot = match subscribe_type {
374            SubscribeType::Compactor => self.compactor_subscribe().await?,
375            SubscribeType::Frontend => self.frontend_subscribe().await?,
376            SubscribeType::Hummock => {
377                self.hummock_manager
378                    .pin_version(req.get_worker_id())
379                    .await?;
380                self.hummock_subscribe().await?
381            }
382            SubscribeType::Compute => self.compute_subscribe().await?,
383            SubscribeType::Unspecified => unreachable!(),
384        };
385
386        self.env
387            .notification_manager()
388            .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
389
390        Ok(Response::new(UnboundedReceiverStream::new(rx)))
391    }
392}