risingwave_meta_service/
notification_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
17use risingwave_hummock_sdk::FrontendHummockVersion;
18use risingwave_meta::MetaResult;
19use risingwave_meta::controller::catalog::Catalog;
20use risingwave_meta::manager::MetadataManager;
21use risingwave_pb::backup_service::MetaBackupManifestId;
22use risingwave_pb::catalog::{Secret, Table};
23use risingwave_pb::common::worker_node::State::Running;
24use risingwave_pb::common::{ClusterResource, WorkerNode, WorkerType};
25use risingwave_pb::hummock::WriteLimits;
26use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
27use risingwave_pb::meta::notification_service_server::NotificationService;
28use risingwave_pb::meta::{
29    FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
30    SubscribeType,
31};
32use risingwave_pb::user::UserInfo;
33use tokio::sync::mpsc;
34use tokio_stream::wrappers::UnboundedReceiverStream;
35use tonic::{Request, Response, Status};
36
37use crate::backup_restore::BackupManagerRef;
38use crate::hummock::HummockManagerRef;
39use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
40use crate::serving::ServingVnodeMappingRef;
41
42pub struct NotificationServiceImpl {
43    env: MetaSrvEnv,
44
45    metadata_manager: MetadataManager,
46    hummock_manager: HummockManagerRef,
47    backup_manager: BackupManagerRef,
48    serving_vnode_mapping: ServingVnodeMappingRef,
49}
50
51impl NotificationServiceImpl {
52    pub async fn new(
53        env: MetaSrvEnv,
54        metadata_manager: MetadataManager,
55        hummock_manager: HummockManagerRef,
56        backup_manager: BackupManagerRef,
57        serving_vnode_mapping: ServingVnodeMappingRef,
58    ) -> MetaResult<Self> {
59        let service = Self {
60            env,
61            metadata_manager,
62            hummock_manager,
63            backup_manager,
64            serving_vnode_mapping,
65        };
66        let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
67        LocalSecretManager::global().init_secrets(secrets);
68        Ok(service)
69    }
70
71    async fn get_catalog_snapshot(
72        &self,
73    ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
74        let catalog_guard = self
75            .metadata_manager
76            .catalog_controller
77            .get_inner_read_guard()
78            .await;
79        let (
80            (
81                databases,
82                schemas,
83                tables,
84                sources,
85                sinks,
86                subscriptions,
87                indexes,
88                views,
89                functions,
90                connections,
91                secrets,
92            ),
93            users,
94        ) = catalog_guard.snapshot().await?;
95        let notification_version = self.env.notification_manager().current_version().await;
96        Ok((
97            (
98                databases,
99                schemas,
100                tables,
101                sources,
102                sinks,
103                subscriptions,
104                indexes,
105                views,
106                functions,
107                connections,
108                secrets,
109            ),
110            users,
111            notification_version,
112        ))
113    }
114
115    /// Get decrypted secret snapshot
116    async fn get_decrypted_secret_snapshot(
117        &self,
118    ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
119        let catalog_guard = self
120            .metadata_manager
121            .catalog_controller
122            .get_inner_read_guard()
123            .await;
124        let secrets = catalog_guard.list_secrets().await?;
125        let notification_version = self.env.notification_manager().current_version().await;
126
127        let decrypted_secrets = self.decrypt_secrets(secrets)?;
128
129        Ok((decrypted_secrets, notification_version))
130    }
131
132    fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
133        // Skip getting `secret_store_private_key` if there is no secret
134        if secrets.is_empty() {
135            return Ok(vec![]);
136        }
137        let secret_store_private_key = self
138            .env
139            .opts
140            .secret_store_private_key
141            .clone()
142            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
143        let mut decrypted_secrets = Vec::with_capacity(secrets.len());
144        for mut secret in secrets {
145            let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
146                .context(format!("failed to deserialize secret {}", secret.name))?;
147            let decrypted_secret = encrypted_secret
148                .decrypt(secret_store_private_key.as_slice())
149                .context(format!("failed to decrypt secret {}", secret.name))?;
150            secret.value = decrypted_secret;
151            decrypted_secrets.push(secret);
152        }
153        Ok(decrypted_secrets)
154    }
155
156    async fn get_worker_slot_mapping_snapshot(
157        &self,
158    ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
159        let mappings = self
160            .metadata_manager
161            .catalog_controller
162            .get_worker_slot_mappings();
163
164        let notification_version = self.env.notification_manager().current_version().await;
165        Ok((mappings, notification_version))
166    }
167
168    fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
169        self.serving_vnode_mapping
170            .all()
171            .iter()
172            .map(|(&fragment_id, mapping)| FragmentWorkerSlotMapping {
173                fragment_id,
174                mapping: Some(mapping.to_protobuf()),
175            })
176            .collect()
177    }
178
179    async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
180        let cluster_guard = self
181            .metadata_manager
182            .cluster_controller
183            .get_inner_read_guard()
184            .await;
185        let compute_nodes = cluster_guard
186            .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
187            .await?;
188        let frontends = cluster_guard
189            .list_workers(Some(WorkerType::Frontend.into()), Some(Running.into()))
190            .await?;
191        let worker_nodes = compute_nodes
192            .into_iter()
193            .chain(frontends.into_iter())
194            .collect();
195        let notification_version = self.env.notification_manager().current_version().await;
196        Ok((worker_nodes, notification_version))
197    }
198
199    async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
200        let catalog_guard = self
201            .metadata_manager
202            .catalog_controller
203            .get_inner_read_guard()
204            .await;
205        let mut tables = catalog_guard.list_all_state_tables().await?;
206        tables.extend(catalog_guard.dropped_tables.values().cloned());
207        let notification_version = self.env.notification_manager().current_version().await;
208        Ok((tables, notification_version))
209    }
210
211    /// Get the total resource of the cluster.
212    async fn get_cluster_resource(&self) -> ClusterResource {
213        self.metadata_manager
214            .cluster_controller
215            .cluster_resource()
216            .await
217    }
218
219    async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
220        let (tables, catalog_version) = self.get_tables_snapshot().await?;
221        let cluster_resource = self.get_cluster_resource().await;
222
223        Ok(MetaSnapshot {
224            tables,
225            version: Some(SnapshotVersion {
226                catalog_version,
227                ..Default::default()
228            }),
229            cluster_resource: Some(cluster_resource),
230            ..Default::default()
231        })
232    }
233
234    async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
235        let (
236            (
237                databases,
238                schemas,
239                tables,
240                sources,
241                sinks,
242                subscriptions,
243                indexes,
244                views,
245                functions,
246                connections,
247                secrets,
248            ),
249            users,
250            catalog_version,
251        ) = self.get_catalog_snapshot().await?;
252        let object_dependencies = self
253            .metadata_manager
254            .catalog_controller
255            .list_created_object_dependencies()
256            .await?;
257
258        // Use the plain text secret value for frontend. The secret value will be masked in frontend handle.
259        let decrypted_secrets = self.decrypt_secrets(secrets)?;
260
261        let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
262            self.get_worker_slot_mapping_snapshot().await?;
263
264        let streaming_job_count = self.metadata_manager.count_streaming_job().await?;
265        if streaming_job_count > 0 && streaming_worker_slot_mappings.is_empty() {
266            tracing::warn!(
267                streaming_job_count,
268                "frontend subscribe returns empty streaming_worker_slot_mappings while streaming jobs exist; meta may still be recovering"
269            );
270        }
271
272        let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
273
274        let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
275
276        let hummock_version = self
277            .hummock_manager
278            .on_current_version(|version| {
279                FrontendHummockVersion::from_version(version).to_protobuf()
280            })
281            .await;
282
283        let session_params = self
284            .env
285            .session_params_manager_impl_ref()
286            .get_params()
287            .await;
288
289        let session_params = Some(GetSessionParamsResponse {
290            params: serde_json::to_string(&session_params)
291                .context("failed to encode session params")?,
292        });
293
294        let cluster_resource = self.get_cluster_resource().await;
295
296        Ok(MetaSnapshot {
297            databases,
298            schemas,
299            sources,
300            sinks,
301            tables,
302            indexes,
303            views,
304            subscriptions,
305            functions,
306            connections,
307            secrets: decrypted_secrets,
308            users,
309            nodes,
310            hummock_version: Some(hummock_version),
311            version: Some(SnapshotVersion {
312                catalog_version,
313                worker_node_version,
314                streaming_worker_slot_mapping_version,
315            }),
316            serving_worker_slot_mappings,
317            streaming_worker_slot_mappings,
318            session_params,
319            object_dependencies,
320            cluster_resource: Some(cluster_resource),
321            ..Default::default()
322        })
323    }
324
325    async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
326        let (tables, catalog_version) = self.get_tables_snapshot().await?;
327        let hummock_version = self
328            .hummock_manager
329            .on_current_version(|version| version.into())
330            .await;
331        let hummock_write_limits = self.hummock_manager.write_limits().await;
332        let meta_backup_manifest_id = self.backup_manager.manifest().await.manifest_id;
333        let cluster_resource = self.get_cluster_resource().await;
334
335        Ok(MetaSnapshot {
336            tables,
337            hummock_version: Some(hummock_version),
338            version: Some(SnapshotVersion {
339                catalog_version,
340                ..Default::default()
341            }),
342            meta_backup_manifest_id: Some(MetaBackupManifestId {
343                id: meta_backup_manifest_id,
344            }),
345            hummock_write_limits: Some(WriteLimits {
346                write_limits: hummock_write_limits,
347            }),
348            cluster_resource: Some(cluster_resource),
349            ..Default::default()
350        })
351    }
352
353    async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
354        let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
355        let cluster_resource = self.get_cluster_resource().await;
356
357        Ok(MetaSnapshot {
358            secrets,
359            version: Some(SnapshotVersion {
360                catalog_version,
361                ..Default::default()
362            }),
363            cluster_resource: Some(cluster_resource),
364            ..Default::default()
365        })
366    }
367}
368
369#[async_trait::async_trait]
370impl NotificationService for NotificationServiceImpl {
371    type SubscribeStream = UnboundedReceiverStream<Notification>;
372
373    async fn subscribe(
374        &self,
375        request: Request<SubscribeRequest>,
376    ) -> Result<Response<Self::SubscribeStream>, Status> {
377        let req = request.into_inner();
378        let host_address = req.get_host()?.clone();
379        let subscribe_type = req.get_subscribe_type()?;
380
381        let worker_key = WorkerKey(host_address);
382
383        let (tx, rx) = mpsc::unbounded_channel();
384        self.env
385            .notification_manager()
386            .insert_sender(subscribe_type, worker_key.clone(), tx);
387
388        let meta_snapshot = match subscribe_type {
389            SubscribeType::Compactor => self.compactor_subscribe().await?,
390            SubscribeType::Frontend => self.frontend_subscribe().await?,
391            SubscribeType::Hummock => {
392                self.hummock_manager
393                    .pin_version(req.get_worker_id())
394                    .await?;
395                self.hummock_subscribe().await?
396            }
397            SubscribeType::Compute => self.compute_subscribe().await?,
398            SubscribeType::Unspecified => unreachable!(),
399        };
400
401        self.env
402            .notification_manager()
403            .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
404
405        Ok(Response::new(UnboundedReceiverStream::new(rx)))
406    }
407}