risingwave_meta_service/
notification_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use itertools::Itertools;
17use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
18use risingwave_hummock_sdk::FrontendHummockVersion;
19use risingwave_meta::MetaResult;
20use risingwave_meta::controller::catalog::Catalog;
21use risingwave_meta::manager::MetadataManager;
22use risingwave_pb::backup_service::MetaBackupManifestId;
23use risingwave_pb::catalog::{Secret, Table};
24use risingwave_pb::common::worker_node::State::Running;
25use risingwave_pb::common::{WorkerNode, WorkerType};
26use risingwave_pb::hummock::WriteLimits;
27use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
28use risingwave_pb::meta::notification_service_server::NotificationService;
29use risingwave_pb::meta::{
30    FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
31    SubscribeType,
32};
33use risingwave_pb::user::UserInfo;
34use tokio::sync::mpsc;
35use tokio_stream::wrappers::UnboundedReceiverStream;
36use tonic::{Request, Response, Status};
37
38use crate::backup_restore::BackupManagerRef;
39use crate::hummock::HummockManagerRef;
40use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
41use crate::serving::ServingVnodeMappingRef;
42
43pub struct NotificationServiceImpl {
44    env: MetaSrvEnv,
45
46    metadata_manager: MetadataManager,
47    hummock_manager: HummockManagerRef,
48    backup_manager: BackupManagerRef,
49    serving_vnode_mapping: ServingVnodeMappingRef,
50}
51
52impl NotificationServiceImpl {
53    pub async fn new(
54        env: MetaSrvEnv,
55        metadata_manager: MetadataManager,
56        hummock_manager: HummockManagerRef,
57        backup_manager: BackupManagerRef,
58        serving_vnode_mapping: ServingVnodeMappingRef,
59    ) -> MetaResult<Self> {
60        let service = Self {
61            env,
62            metadata_manager,
63            hummock_manager,
64            backup_manager,
65            serving_vnode_mapping,
66        };
67        let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
68        LocalSecretManager::global().init_secrets(secrets);
69        Ok(service)
70    }
71
72    async fn get_catalog_snapshot(
73        &self,
74    ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
75        let catalog_guard = self
76            .metadata_manager
77            .catalog_controller
78            .get_inner_read_guard()
79            .await;
80        let (
81            (
82                databases,
83                schemas,
84                tables,
85                sources,
86                sinks,
87                subscriptions,
88                indexes,
89                views,
90                functions,
91                connections,
92                secrets,
93            ),
94            users,
95        ) = catalog_guard.snapshot().await?;
96        let notification_version = self.env.notification_manager().current_version().await;
97        Ok((
98            (
99                databases,
100                schemas,
101                tables,
102                sources,
103                sinks,
104                subscriptions,
105                indexes,
106                views,
107                functions,
108                connections,
109                secrets,
110            ),
111            users,
112            notification_version,
113        ))
114    }
115
116    /// Get decrypted secret snapshot
117    async fn get_decrypted_secret_snapshot(
118        &self,
119    ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
120        let catalog_guard = self
121            .metadata_manager
122            .catalog_controller
123            .get_inner_read_guard()
124            .await;
125        let secrets = catalog_guard.list_secrets().await?;
126        let notification_version = self.env.notification_manager().current_version().await;
127
128        let decrypted_secrets = self.decrypt_secrets(secrets)?;
129
130        Ok((decrypted_secrets, notification_version))
131    }
132
133    fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
134        // Skip getting `secret_store_private_key` if there is no secret
135        if secrets.is_empty() {
136            return Ok(vec![]);
137        }
138        let secret_store_private_key = self
139            .env
140            .opts
141            .secret_store_private_key
142            .clone()
143            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
144        let mut decrypted_secrets = Vec::with_capacity(secrets.len());
145        for mut secret in secrets {
146            let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
147                .context(format!("failed to deserialize secret {}", secret.name))?;
148            let decrypted_secret = encrypted_secret
149                .decrypt(secret_store_private_key.as_slice())
150                .context(format!("failed to decrypt secret {}", secret.name))?;
151            secret.value = decrypted_secret;
152            decrypted_secrets.push(secret);
153        }
154        Ok(decrypted_secrets)
155    }
156
157    async fn get_worker_slot_mapping_snapshot(
158        &self,
159    ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
160        let fragment_guard = self
161            .metadata_manager
162            .catalog_controller
163            .get_inner_read_guard()
164            .await;
165        let worker_slot_mappings = fragment_guard
166            .all_running_fragment_mappings()
167            .await?
168            .collect_vec();
169        let notification_version = self.env.notification_manager().current_version().await;
170        Ok((worker_slot_mappings, notification_version))
171    }
172
173    fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
174        self.serving_vnode_mapping
175            .all()
176            .iter()
177            .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
178                fragment_id: *fragment_id,
179                mapping: Some(mapping.to_protobuf()),
180            })
181            .collect()
182    }
183
184    async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
185        let cluster_guard = self
186            .metadata_manager
187            .cluster_controller
188            .get_inner_read_guard()
189            .await;
190        let compute_nodes = cluster_guard
191            .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
192            .await?;
193        let frontends = cluster_guard
194            .list_workers(Some(WorkerType::Frontend.into()), Some(Running.into()))
195            .await?;
196        let worker_nodes = compute_nodes
197            .into_iter()
198            .chain(frontends.into_iter())
199            .collect();
200        let notification_version = self.env.notification_manager().current_version().await;
201        Ok((worker_nodes, notification_version))
202    }
203
204    async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
205        let catalog_guard = self
206            .metadata_manager
207            .catalog_controller
208            .get_inner_read_guard()
209            .await;
210        let mut tables = catalog_guard.list_all_state_tables().await?;
211        tables.extend(catalog_guard.dropped_tables.values().cloned());
212        let notification_version = self.env.notification_manager().current_version().await;
213        Ok((tables, notification_version))
214    }
215
216    async fn get_compute_node_total_cpu_count(&self) -> usize {
217        self.metadata_manager
218            .cluster_controller
219            .compute_node_total_cpu_count()
220            .await
221    }
222
223    async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
224        let (tables, catalog_version) = self.get_tables_snapshot().await?;
225        let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
226
227        Ok(MetaSnapshot {
228            tables,
229            version: Some(SnapshotVersion {
230                catalog_version,
231                ..Default::default()
232            }),
233            compute_node_total_cpu_count: compute_node_total_cpu_count as _,
234            ..Default::default()
235        })
236    }
237
238    async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
239        let (
240            (
241                databases,
242                schemas,
243                tables,
244                sources,
245                sinks,
246                subscriptions,
247                indexes,
248                views,
249                functions,
250                connections,
251                secrets,
252            ),
253            users,
254            catalog_version,
255        ) = self.get_catalog_snapshot().await?;
256
257        // Use the plain text secret value for frontend. The secret value will be masked in frontend handle.
258        let decrypted_secrets = self.decrypt_secrets(secrets)?;
259
260        let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
261            self.get_worker_slot_mapping_snapshot().await?;
262        let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
263
264        let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
265
266        let hummock_version = self
267            .hummock_manager
268            .on_current_version(|version| {
269                FrontendHummockVersion::from_version(version).to_protobuf()
270            })
271            .await;
272
273        let session_params = self
274            .env
275            .session_params_manager_impl_ref()
276            .get_params()
277            .await;
278
279        let session_params = Some(GetSessionParamsResponse {
280            params: serde_json::to_string(&session_params)
281                .context("failed to encode session params")?,
282        });
283
284        let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
285
286        Ok(MetaSnapshot {
287            databases,
288            schemas,
289            sources,
290            sinks,
291            tables,
292            indexes,
293            views,
294            subscriptions,
295            functions,
296            connections,
297            secrets: decrypted_secrets,
298            users,
299            nodes,
300            hummock_version: Some(hummock_version),
301            version: Some(SnapshotVersion {
302                catalog_version,
303                worker_node_version,
304                streaming_worker_slot_mapping_version,
305            }),
306            serving_worker_slot_mappings,
307            streaming_worker_slot_mappings,
308            session_params,
309            compute_node_total_cpu_count: compute_node_total_cpu_count as _,
310            ..Default::default()
311        })
312    }
313
314    async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
315        let (tables, catalog_version) = self.get_tables_snapshot().await?;
316        let hummock_version = self
317            .hummock_manager
318            .on_current_version(|version| version.into())
319            .await;
320        let hummock_write_limits = self.hummock_manager.write_limits().await;
321        let meta_backup_manifest_id = self.backup_manager.manifest().manifest_id;
322        let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
323
324        Ok(MetaSnapshot {
325            tables,
326            hummock_version: Some(hummock_version),
327            version: Some(SnapshotVersion {
328                catalog_version,
329                ..Default::default()
330            }),
331            meta_backup_manifest_id: Some(MetaBackupManifestId {
332                id: meta_backup_manifest_id,
333            }),
334            hummock_write_limits: Some(WriteLimits {
335                write_limits: hummock_write_limits,
336            }),
337            compute_node_total_cpu_count: compute_node_total_cpu_count as _,
338            ..Default::default()
339        })
340    }
341
342    async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
343        let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
344        let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
345
346        Ok(MetaSnapshot {
347            secrets,
348            version: Some(SnapshotVersion {
349                catalog_version,
350                ..Default::default()
351            }),
352            compute_node_total_cpu_count: compute_node_total_cpu_count as _,
353            ..Default::default()
354        })
355    }
356}
357
358#[async_trait::async_trait]
359impl NotificationService for NotificationServiceImpl {
360    type SubscribeStream = UnboundedReceiverStream<Notification>;
361
362    async fn subscribe(
363        &self,
364        request: Request<SubscribeRequest>,
365    ) -> Result<Response<Self::SubscribeStream>, Status> {
366        let req = request.into_inner();
367        let host_address = req.get_host()?.clone();
368        let subscribe_type = req.get_subscribe_type()?;
369
370        let worker_key = WorkerKey(host_address);
371
372        let (tx, rx) = mpsc::unbounded_channel();
373        self.env
374            .notification_manager()
375            .insert_sender(subscribe_type, worker_key.clone(), tx)
376            .await;
377
378        let meta_snapshot = match subscribe_type {
379            SubscribeType::Compactor => self.compactor_subscribe().await?,
380            SubscribeType::Frontend => self.frontend_subscribe().await?,
381            SubscribeType::Hummock => {
382                self.hummock_manager
383                    .pin_version(req.get_worker_id())
384                    .await?;
385                self.hummock_subscribe().await?
386            }
387            SubscribeType::Compute => self.compute_subscribe().await?,
388            SubscribeType::Unspecified => unreachable!(),
389        };
390
391        self.env
392            .notification_manager()
393            .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
394
395        Ok(Response::new(UnboundedReceiverStream::new(rx)))
396    }
397}