risingwave_meta_service/
notification_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use itertools::Itertools;
17use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
18use risingwave_hummock_sdk::FrontendHummockVersion;
19use risingwave_meta::MetaResult;
20use risingwave_meta::controller::catalog::Catalog;
21use risingwave_meta::manager::MetadataManager;
22use risingwave_pb::backup_service::MetaBackupManifestId;
23use risingwave_pb::catalog::{Secret, Table};
24use risingwave_pb::common::worker_node::State::Running;
25use risingwave_pb::common::{WorkerNode, WorkerType};
26use risingwave_pb::hummock::WriteLimits;
27use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
28use risingwave_pb::meta::notification_service_server::NotificationService;
29use risingwave_pb::meta::{
30    FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
31    SubscribeType,
32};
33use risingwave_pb::user::UserInfo;
34use tokio::sync::mpsc;
35use tokio_stream::wrappers::UnboundedReceiverStream;
36use tonic::{Request, Response, Status};
37
38use crate::backup_restore::BackupManagerRef;
39use crate::hummock::HummockManagerRef;
40use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
41use crate::serving::ServingVnodeMappingRef;
42
43pub struct NotificationServiceImpl {
44    env: MetaSrvEnv,
45
46    metadata_manager: MetadataManager,
47    hummock_manager: HummockManagerRef,
48    backup_manager: BackupManagerRef,
49    serving_vnode_mapping: ServingVnodeMappingRef,
50}
51
52impl NotificationServiceImpl {
53    pub async fn new(
54        env: MetaSrvEnv,
55        metadata_manager: MetadataManager,
56        hummock_manager: HummockManagerRef,
57        backup_manager: BackupManagerRef,
58        serving_vnode_mapping: ServingVnodeMappingRef,
59    ) -> MetaResult<Self> {
60        let service = Self {
61            env,
62            metadata_manager,
63            hummock_manager,
64            backup_manager,
65            serving_vnode_mapping,
66        };
67        let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
68        LocalSecretManager::global().init_secrets(secrets);
69        Ok(service)
70    }
71
72    async fn get_catalog_snapshot(
73        &self,
74    ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
75        let catalog_guard = self
76            .metadata_manager
77            .catalog_controller
78            .get_inner_read_guard()
79            .await;
80        let (
81            (
82                databases,
83                schemas,
84                tables,
85                sources,
86                sinks,
87                subscriptions,
88                indexes,
89                views,
90                functions,
91                connections,
92                secrets,
93            ),
94            users,
95        ) = catalog_guard.snapshot().await?;
96        let notification_version = self.env.notification_manager().current_version().await;
97        Ok((
98            (
99                databases,
100                schemas,
101                tables,
102                sources,
103                sinks,
104                subscriptions,
105                indexes,
106                views,
107                functions,
108                connections,
109                secrets,
110            ),
111            users,
112            notification_version,
113        ))
114    }
115
116    /// Get decrypted secret snapshot
117    async fn get_decrypted_secret_snapshot(
118        &self,
119    ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
120        let catalog_guard = self
121            .metadata_manager
122            .catalog_controller
123            .get_inner_read_guard()
124            .await;
125        let secrets = catalog_guard.list_secrets().await?;
126        let notification_version = self.env.notification_manager().current_version().await;
127
128        let decrypted_secrets = self.decrypt_secrets(secrets)?;
129
130        Ok((decrypted_secrets, notification_version))
131    }
132
133    fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
134        // Skip getting `secret_store_private_key` if there is no secret
135        if secrets.is_empty() {
136            return Ok(vec![]);
137        }
138        let secret_store_private_key = self
139            .env
140            .opts
141            .secret_store_private_key
142            .clone()
143            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
144        let mut decrypted_secrets = Vec::with_capacity(secrets.len());
145        for mut secret in secrets {
146            let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
147                .context(format!("failed to deserialize secret {}", secret.name))?;
148            let decrypted_secret = encrypted_secret
149                .decrypt(secret_store_private_key.as_slice())
150                .context(format!("failed to decrypt secret {}", secret.name))?;
151            secret.value = decrypted_secret;
152            decrypted_secrets.push(secret);
153        }
154        Ok(decrypted_secrets)
155    }
156
157    async fn get_worker_slot_mapping_snapshot(
158        &self,
159    ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
160        let fragment_guard = self
161            .metadata_manager
162            .catalog_controller
163            .get_inner_read_guard()
164            .await;
165        let worker_slot_mappings = fragment_guard
166            .all_running_fragment_mappings()
167            .await?
168            .collect_vec();
169        let notification_version = self.env.notification_manager().current_version().await;
170        Ok((worker_slot_mappings, notification_version))
171    }
172
173    fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
174        self.serving_vnode_mapping
175            .all()
176            .iter()
177            .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
178                fragment_id: *fragment_id,
179                mapping: Some(mapping.to_protobuf()),
180            })
181            .collect()
182    }
183
184    async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
185        let cluster_guard = self
186            .metadata_manager
187            .cluster_controller
188            .get_inner_read_guard()
189            .await;
190        let nodes = cluster_guard
191            .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
192            .await?;
193        let notification_version = self.env.notification_manager().current_version().await;
194        Ok((nodes, notification_version))
195    }
196
197    async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
198        let catalog_guard = self
199            .metadata_manager
200            .catalog_controller
201            .get_inner_read_guard()
202            .await;
203        let mut tables = catalog_guard.list_all_state_tables().await?;
204        tables.extend(catalog_guard.dropped_tables.values().cloned());
205        let notification_version = self.env.notification_manager().current_version().await;
206        Ok((tables, notification_version))
207    }
208
209    async fn get_compute_node_total_cpu_count(&self) -> usize {
210        self.metadata_manager
211            .cluster_controller
212            .compute_node_total_cpu_count()
213            .await
214    }
215
216    async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
217        let (tables, catalog_version) = self.get_tables_snapshot().await?;
218        let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
219
220        Ok(MetaSnapshot {
221            tables,
222            version: Some(SnapshotVersion {
223                catalog_version,
224                ..Default::default()
225            }),
226            compute_node_total_cpu_count: compute_node_total_cpu_count as _,
227            ..Default::default()
228        })
229    }
230
231    async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
232        let (
233            (
234                databases,
235                schemas,
236                tables,
237                sources,
238                sinks,
239                subscriptions,
240                indexes,
241                views,
242                functions,
243                connections,
244                secrets,
245            ),
246            users,
247            catalog_version,
248        ) = self.get_catalog_snapshot().await?;
249
250        // Use the plain text secret value for frontend. The secret value will be masked in frontend handle.
251        let decrypted_secrets = self.decrypt_secrets(secrets)?;
252
253        let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
254            self.get_worker_slot_mapping_snapshot().await?;
255        let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
256
257        let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
258
259        let hummock_version = self
260            .hummock_manager
261            .on_current_version(|version| {
262                FrontendHummockVersion::from_version(version).to_protobuf()
263            })
264            .await;
265
266        let session_params = self
267            .env
268            .session_params_manager_impl_ref()
269            .get_params()
270            .await;
271
272        let session_params = Some(GetSessionParamsResponse {
273            params: serde_json::to_string(&session_params)
274                .context("failed to encode session params")?,
275        });
276
277        let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
278
279        Ok(MetaSnapshot {
280            databases,
281            schemas,
282            sources,
283            sinks,
284            tables,
285            indexes,
286            views,
287            subscriptions,
288            functions,
289            connections,
290            secrets: decrypted_secrets,
291            users,
292            nodes,
293            hummock_version: Some(hummock_version),
294            version: Some(SnapshotVersion {
295                catalog_version,
296                worker_node_version,
297                streaming_worker_slot_mapping_version,
298            }),
299            serving_worker_slot_mappings,
300            streaming_worker_slot_mappings,
301            session_params,
302            compute_node_total_cpu_count: compute_node_total_cpu_count as _,
303            ..Default::default()
304        })
305    }
306
307    async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
308        let (tables, catalog_version) = self.get_tables_snapshot().await?;
309        let hummock_version = self
310            .hummock_manager
311            .on_current_version(|version| version.into())
312            .await;
313        let hummock_write_limits = self.hummock_manager.write_limits().await;
314        let meta_backup_manifest_id = self.backup_manager.manifest().manifest_id;
315        let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
316
317        Ok(MetaSnapshot {
318            tables,
319            hummock_version: Some(hummock_version),
320            version: Some(SnapshotVersion {
321                catalog_version,
322                ..Default::default()
323            }),
324            meta_backup_manifest_id: Some(MetaBackupManifestId {
325                id: meta_backup_manifest_id,
326            }),
327            hummock_write_limits: Some(WriteLimits {
328                write_limits: hummock_write_limits,
329            }),
330            compute_node_total_cpu_count: compute_node_total_cpu_count as _,
331            ..Default::default()
332        })
333    }
334
335    async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
336        let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
337        let compute_node_total_cpu_count = self.get_compute_node_total_cpu_count().await;
338
339        Ok(MetaSnapshot {
340            secrets,
341            version: Some(SnapshotVersion {
342                catalog_version,
343                ..Default::default()
344            }),
345            compute_node_total_cpu_count: compute_node_total_cpu_count as _,
346            ..Default::default()
347        })
348    }
349}
350
351#[async_trait::async_trait]
352impl NotificationService for NotificationServiceImpl {
353    type SubscribeStream = UnboundedReceiverStream<Notification>;
354
355    #[cfg_attr(coverage, coverage(off))]
356    async fn subscribe(
357        &self,
358        request: Request<SubscribeRequest>,
359    ) -> Result<Response<Self::SubscribeStream>, Status> {
360        let req = request.into_inner();
361        let host_address = req.get_host()?.clone();
362        let subscribe_type = req.get_subscribe_type()?;
363
364        let worker_key = WorkerKey(host_address);
365
366        let (tx, rx) = mpsc::unbounded_channel();
367        self.env
368            .notification_manager()
369            .insert_sender(subscribe_type, worker_key.clone(), tx)
370            .await;
371
372        let meta_snapshot = match subscribe_type {
373            SubscribeType::Compactor => self.compactor_subscribe().await?,
374            SubscribeType::Frontend => self.frontend_subscribe().await?,
375            SubscribeType::Hummock => {
376                self.hummock_manager
377                    .pin_version(req.get_worker_id())
378                    .await?;
379                self.hummock_subscribe().await?
380            }
381            SubscribeType::Compute => self.compute_subscribe().await?,
382            SubscribeType::Unspecified => unreachable!(),
383        };
384
385        self.env
386            .notification_manager()
387            .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
388
389        Ok(Response::new(UnboundedReceiverStream::new(rx)))
390    }
391}