risingwave_meta_service/
notification_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
17use risingwave_hummock_sdk::FrontendHummockVersion;
18use risingwave_meta::MetaResult;
19use risingwave_meta::controller::catalog::Catalog;
20use risingwave_meta::manager::MetadataManager;
21use risingwave_pb::backup_service::MetaBackupManifestId;
22use risingwave_pb::catalog::{Secret, Table};
23use risingwave_pb::common::worker_node::State::Running;
24use risingwave_pb::common::{ClusterResource, WorkerNode, WorkerType};
25use risingwave_pb::hummock::WriteLimits;
26use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
27use risingwave_pb::meta::notification_service_server::NotificationService;
28use risingwave_pb::meta::{
29    FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
30    SubscribeType,
31};
32use risingwave_pb::user::UserInfo;
33use tokio::sync::mpsc;
34use tokio_stream::wrappers::UnboundedReceiverStream;
35use tonic::{Request, Response, Status};
36
37use crate::backup_restore::BackupManagerRef;
38use crate::hummock::HummockManagerRef;
39use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
40use crate::serving::ServingVnodeMappingRef;
41
42pub struct NotificationServiceImpl {
43    env: MetaSrvEnv,
44
45    metadata_manager: MetadataManager,
46    hummock_manager: HummockManagerRef,
47    backup_manager: BackupManagerRef,
48    serving_vnode_mapping: ServingVnodeMappingRef,
49}
50
51impl NotificationServiceImpl {
52    pub async fn new(
53        env: MetaSrvEnv,
54        metadata_manager: MetadataManager,
55        hummock_manager: HummockManagerRef,
56        backup_manager: BackupManagerRef,
57        serving_vnode_mapping: ServingVnodeMappingRef,
58    ) -> MetaResult<Self> {
59        let service = Self {
60            env,
61            metadata_manager,
62            hummock_manager,
63            backup_manager,
64            serving_vnode_mapping,
65        };
66        let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
67        LocalSecretManager::global().init_secrets(secrets);
68        Ok(service)
69    }
70
71    async fn get_catalog_snapshot(
72        &self,
73    ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
74        let catalog_guard = self
75            .metadata_manager
76            .catalog_controller
77            .get_inner_read_guard()
78            .await;
79        let (
80            (
81                databases,
82                schemas,
83                tables,
84                sources,
85                sinks,
86                subscriptions,
87                indexes,
88                views,
89                functions,
90                connections,
91                secrets,
92            ),
93            users,
94        ) = catalog_guard.snapshot().await?;
95        let notification_version = self.env.notification_manager().current_version().await;
96        Ok((
97            (
98                databases,
99                schemas,
100                tables,
101                sources,
102                sinks,
103                subscriptions,
104                indexes,
105                views,
106                functions,
107                connections,
108                secrets,
109            ),
110            users,
111            notification_version,
112        ))
113    }
114
115    /// Get decrypted secret snapshot
116    async fn get_decrypted_secret_snapshot(
117        &self,
118    ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
119        let catalog_guard = self
120            .metadata_manager
121            .catalog_controller
122            .get_inner_read_guard()
123            .await;
124        let secrets = catalog_guard.list_secrets().await?;
125        let notification_version = self.env.notification_manager().current_version().await;
126
127        let decrypted_secrets = self.decrypt_secrets(secrets)?;
128
129        Ok((decrypted_secrets, notification_version))
130    }
131
132    fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
133        // Skip getting `secret_store_private_key` if there is no secret
134        if secrets.is_empty() {
135            return Ok(vec![]);
136        }
137        let secret_store_private_key = self
138            .env
139            .opts
140            .secret_store_private_key
141            .clone()
142            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
143        let mut decrypted_secrets = Vec::with_capacity(secrets.len());
144        for mut secret in secrets {
145            let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
146                .context(format!("failed to deserialize secret {}", secret.name))?;
147            let decrypted_secret = encrypted_secret
148                .decrypt(secret_store_private_key.as_slice())
149                .context(format!("failed to decrypt secret {}", secret.name))?;
150            secret.value = decrypted_secret;
151            decrypted_secrets.push(secret);
152        }
153        Ok(decrypted_secrets)
154    }
155
156    async fn get_worker_slot_mapping_snapshot(
157        &self,
158    ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
159        let mappings = self
160            .metadata_manager
161            .catalog_controller
162            .get_worker_slot_mappings();
163
164        let notification_version = self.env.notification_manager().current_version().await;
165        Ok((mappings, notification_version))
166    }
167
168    fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
169        self.serving_vnode_mapping
170            .all()
171            .iter()
172            .map(|(&fragment_id, mapping)| FragmentWorkerSlotMapping {
173                fragment_id,
174                mapping: Some(mapping.to_protobuf()),
175            })
176            .collect()
177    }
178
179    async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
180        let cluster_guard = self
181            .metadata_manager
182            .cluster_controller
183            .get_inner_read_guard()
184            .await;
185        let compute_nodes = cluster_guard
186            .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
187            .await?;
188        let frontends = cluster_guard
189            .list_workers(Some(WorkerType::Frontend.into()), Some(Running.into()))
190            .await?;
191        let worker_nodes = compute_nodes
192            .into_iter()
193            .chain(frontends.into_iter())
194            .collect();
195        let notification_version = self.env.notification_manager().current_version().await;
196        Ok((worker_nodes, notification_version))
197    }
198
199    async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
200        let catalog_guard = self
201            .metadata_manager
202            .catalog_controller
203            .get_inner_read_guard()
204            .await;
205        let mut tables = catalog_guard.list_all_state_tables().await?;
206        tables.extend(catalog_guard.dropped_tables.values().cloned());
207        let notification_version = self.env.notification_manager().current_version().await;
208        Ok((tables, notification_version))
209    }
210
211    /// Get the total resource of the cluster.
212    async fn get_cluster_resource(&self) -> ClusterResource {
213        self.metadata_manager
214            .cluster_controller
215            .cluster_resource()
216            .await
217    }
218
219    async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
220        let (tables, catalog_version) = self.get_tables_snapshot().await?;
221        let cluster_resource = self.get_cluster_resource().await;
222
223        Ok(MetaSnapshot {
224            tables,
225            version: Some(SnapshotVersion {
226                catalog_version,
227                ..Default::default()
228            }),
229            cluster_resource: Some(cluster_resource),
230            ..Default::default()
231        })
232    }
233
234    async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
235        let (
236            (
237                databases,
238                schemas,
239                tables,
240                sources,
241                sinks,
242                subscriptions,
243                indexes,
244                views,
245                functions,
246                connections,
247                secrets,
248            ),
249            users,
250            catalog_version,
251        ) = self.get_catalog_snapshot().await?;
252
253        // Use the plain text secret value for frontend. The secret value will be masked in frontend handle.
254        let decrypted_secrets = self.decrypt_secrets(secrets)?;
255
256        let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
257            self.get_worker_slot_mapping_snapshot().await?;
258
259        let streaming_job_count = self.metadata_manager.count_streaming_job().await?;
260        if streaming_job_count > 0 && streaming_worker_slot_mappings.is_empty() {
261            tracing::warn!(
262                streaming_job_count,
263                "frontend subscribe returns empty streaming_worker_slot_mappings while streaming jobs exist; meta may still be recovering"
264            );
265        }
266
267        let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
268
269        let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
270
271        let hummock_version = self
272            .hummock_manager
273            .on_current_version(|version| {
274                FrontendHummockVersion::from_version(version).to_protobuf()
275            })
276            .await;
277
278        let session_params = self
279            .env
280            .session_params_manager_impl_ref()
281            .get_params()
282            .await;
283
284        let session_params = Some(GetSessionParamsResponse {
285            params: serde_json::to_string(&session_params)
286                .context("failed to encode session params")?,
287        });
288
289        let cluster_resource = self.get_cluster_resource().await;
290
291        Ok(MetaSnapshot {
292            databases,
293            schemas,
294            sources,
295            sinks,
296            tables,
297            indexes,
298            views,
299            subscriptions,
300            functions,
301            connections,
302            secrets: decrypted_secrets,
303            users,
304            nodes,
305            hummock_version: Some(hummock_version),
306            version: Some(SnapshotVersion {
307                catalog_version,
308                worker_node_version,
309                streaming_worker_slot_mapping_version,
310            }),
311            serving_worker_slot_mappings,
312            streaming_worker_slot_mappings,
313            session_params,
314            cluster_resource: Some(cluster_resource),
315            ..Default::default()
316        })
317    }
318
319    async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
320        let (tables, catalog_version) = self.get_tables_snapshot().await?;
321        let hummock_version = self
322            .hummock_manager
323            .on_current_version(|version| version.into())
324            .await;
325        let hummock_write_limits = self.hummock_manager.write_limits().await;
326        let meta_backup_manifest_id = self.backup_manager.manifest().await.manifest_id;
327        let cluster_resource = self.get_cluster_resource().await;
328
329        Ok(MetaSnapshot {
330            tables,
331            hummock_version: Some(hummock_version),
332            version: Some(SnapshotVersion {
333                catalog_version,
334                ..Default::default()
335            }),
336            meta_backup_manifest_id: Some(MetaBackupManifestId {
337                id: meta_backup_manifest_id,
338            }),
339            hummock_write_limits: Some(WriteLimits {
340                write_limits: hummock_write_limits,
341            }),
342            cluster_resource: Some(cluster_resource),
343            ..Default::default()
344        })
345    }
346
347    async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
348        let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
349        let cluster_resource = self.get_cluster_resource().await;
350
351        Ok(MetaSnapshot {
352            secrets,
353            version: Some(SnapshotVersion {
354                catalog_version,
355                ..Default::default()
356            }),
357            cluster_resource: Some(cluster_resource),
358            ..Default::default()
359        })
360    }
361}
362
363#[async_trait::async_trait]
364impl NotificationService for NotificationServiceImpl {
365    type SubscribeStream = UnboundedReceiverStream<Notification>;
366
367    async fn subscribe(
368        &self,
369        request: Request<SubscribeRequest>,
370    ) -> Result<Response<Self::SubscribeStream>, Status> {
371        let req = request.into_inner();
372        let host_address = req.get_host()?.clone();
373        let subscribe_type = req.get_subscribe_type()?;
374
375        let worker_key = WorkerKey(host_address);
376
377        let (tx, rx) = mpsc::unbounded_channel();
378        self.env
379            .notification_manager()
380            .insert_sender(subscribe_type, worker_key.clone(), tx);
381
382        let meta_snapshot = match subscribe_type {
383            SubscribeType::Compactor => self.compactor_subscribe().await?,
384            SubscribeType::Frontend => self.frontend_subscribe().await?,
385            SubscribeType::Hummock => {
386                self.hummock_manager
387                    .pin_version(req.get_worker_id())
388                    .await?;
389                self.hummock_subscribe().await?
390            }
391            SubscribeType::Compute => self.compute_subscribe().await?,
392            SubscribeType::Unspecified => unreachable!(),
393        };
394
395        self.env
396            .notification_manager()
397            .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
398
399        Ok(Response::new(UnboundedReceiverStream::new(rx)))
400    }
401}