Skip to main content

risingwave_meta_service/
notification_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
17use risingwave_hummock_sdk::FrontendHummockVersion;
18use risingwave_meta::MetaResult;
19use risingwave_meta::controller::catalog::Catalog;
20use risingwave_meta::manager::MetadataManager;
21use risingwave_pb::backup_service::MetaBackupManifestId;
22use risingwave_pb::catalog::{Secret, Table};
23use risingwave_pb::common::worker_node::State::Running;
24use risingwave_pb::common::{ClusterResource, WorkerNode, WorkerType};
25use risingwave_pb::hummock::WriteLimits;
26use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
27use risingwave_pb::meta::notification_service_server::NotificationService;
28use risingwave_pb::meta::{
29    FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest,
30    SubscribeType,
31};
32use risingwave_pb::user::UserInfo;
33use tokio::sync::mpsc;
34use tokio_stream::wrappers::UnboundedReceiverStream;
35use tonic::{Request, Response, Status};
36
37use crate::backup_restore::BackupManagerRef;
38use crate::hummock::HummockManagerRef;
39use crate::manager::{MetaSrvEnv, Notification, NotificationVersion, WorkerKey};
40use crate::serving::ServingVnodeMappingRef;
41
42pub struct NotificationServiceImpl {
43    env: MetaSrvEnv,
44
45    metadata_manager: MetadataManager,
46    hummock_manager: HummockManagerRef,
47    backup_manager: BackupManagerRef,
48    serving_vnode_mapping: ServingVnodeMappingRef,
49}
50
51impl NotificationServiceImpl {
52    pub async fn new(
53        env: MetaSrvEnv,
54        metadata_manager: MetadataManager,
55        hummock_manager: HummockManagerRef,
56        backup_manager: BackupManagerRef,
57        serving_vnode_mapping: ServingVnodeMappingRef,
58    ) -> MetaResult<Self> {
59        let service = Self {
60            env,
61            metadata_manager,
62            hummock_manager,
63            backup_manager,
64            serving_vnode_mapping,
65        };
66        let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
67        LocalSecretManager::global().init_secrets(secrets);
68        Ok(service)
69    }
70
71    async fn get_catalog_snapshot(
72        &self,
73    ) -> MetaResult<(Catalog, Vec<UserInfo>, NotificationVersion)> {
74        let catalog_guard = self
75            .metadata_manager
76            .catalog_controller
77            .get_inner_read_guard()
78            .await;
79        let (
80            (
81                databases,
82                schemas,
83                tables,
84                sources,
85                sinks,
86                subscriptions,
87                indexes,
88                views,
89                functions,
90                connections,
91                secrets,
92            ),
93            users,
94        ) = catalog_guard.snapshot().await?;
95        let notification_version = self.env.notification_manager().current_version().await;
96        Ok((
97            (
98                databases,
99                schemas,
100                tables,
101                sources,
102                sinks,
103                subscriptions,
104                indexes,
105                views,
106                functions,
107                connections,
108                secrets,
109            ),
110            users,
111            notification_version,
112        ))
113    }
114
115    /// Get decrypted secret snapshot
116    async fn get_decrypted_secret_snapshot(
117        &self,
118    ) -> MetaResult<(Vec<Secret>, NotificationVersion)> {
119        let catalog_guard = self
120            .metadata_manager
121            .catalog_controller
122            .get_inner_read_guard()
123            .await;
124        let secrets = catalog_guard.list_secrets().await?;
125        let notification_version = self.env.notification_manager().current_version().await;
126
127        let decrypted_secrets = self.decrypt_secrets(secrets)?;
128
129        Ok((decrypted_secrets, notification_version))
130    }
131
132    fn decrypt_secrets(&self, secrets: Vec<Secret>) -> MetaResult<Vec<Secret>> {
133        // Skip getting `secret_store_private_key` if there is no secret
134        if secrets.is_empty() {
135            return Ok(vec![]);
136        }
137        let secret_store_private_key = self
138            .env
139            .opts
140            .secret_store_private_key
141            .clone()
142            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
143        let mut decrypted_secrets = Vec::with_capacity(secrets.len());
144        for mut secret in secrets {
145            let encrypted_secret = SecretEncryption::deserialize(secret.get_value())
146                .context(format!("failed to deserialize secret {}", secret.name))?;
147            let decrypted_secret = encrypted_secret
148                .decrypt(secret_store_private_key.as_slice())
149                .context(format!("failed to decrypt secret {}", secret.name))?;
150            secret.value = decrypted_secret;
151            decrypted_secrets.push(secret);
152        }
153        Ok(decrypted_secrets)
154    }
155
156    async fn get_worker_slot_mapping_snapshot(
157        &self,
158    ) -> MetaResult<(Vec<FragmentWorkerSlotMapping>, NotificationVersion)> {
159        let mappings = self
160            .metadata_manager
161            .catalog_controller
162            .get_worker_slot_mappings();
163
164        let notification_version = self.env.notification_manager().current_version().await;
165        Ok((mappings, notification_version))
166    }
167
168    fn get_serving_vnode_mappings(&self) -> Vec<FragmentWorkerSlotMapping> {
169        self.serving_vnode_mapping
170            .all()
171            .iter()
172            .map(|(&fragment_id, mapping)| FragmentWorkerSlotMapping {
173                fragment_id,
174                mapping: Some(mapping.to_protobuf()),
175            })
176            .collect()
177    }
178
179    async fn get_worker_node_snapshot(&self) -> MetaResult<(Vec<WorkerNode>, NotificationVersion)> {
180        let cluster_guard = self
181            .metadata_manager
182            .cluster_controller
183            .get_inner_read_guard()
184            .await;
185        let compute_nodes = cluster_guard
186            .list_workers(Some(WorkerType::ComputeNode.into()), Some(Running.into()))
187            .await?;
188        let frontends = cluster_guard
189            .list_workers(Some(WorkerType::Frontend.into()), Some(Running.into()))
190            .await?;
191        let worker_nodes = compute_nodes.into_iter().chain(frontends).collect();
192        let notification_version = self.env.notification_manager().current_version().await;
193        Ok((worker_nodes, notification_version))
194    }
195
196    async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
197        let catalog_guard = self
198            .metadata_manager
199            .catalog_controller
200            .get_inner_read_guard()
201            .await;
202        let mut tables = catalog_guard.list_all_state_tables().await?;
203        tables.extend(catalog_guard.dropped_tables.values().cloned());
204        let notification_version = self.env.notification_manager().current_version().await;
205        Ok((tables, notification_version))
206    }
207
208    /// Get the total resource of the cluster.
209    async fn get_cluster_resource(&self) -> ClusterResource {
210        self.metadata_manager
211            .cluster_controller
212            .cluster_resource()
213            .await
214    }
215
216    async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
217        let (tables, catalog_version) = self.get_tables_snapshot().await?;
218        let cluster_resource = self.get_cluster_resource().await;
219
220        Ok(MetaSnapshot {
221            tables,
222            version: Some(SnapshotVersion {
223                catalog_version,
224                ..Default::default()
225            }),
226            cluster_resource: Some(cluster_resource),
227            ..Default::default()
228        })
229    }
230
231    async fn frontend_subscribe(&self) -> MetaResult<MetaSnapshot> {
232        let (
233            (
234                databases,
235                schemas,
236                tables,
237                sources,
238                sinks,
239                subscriptions,
240                indexes,
241                views,
242                functions,
243                connections,
244                secrets,
245            ),
246            users,
247            catalog_version,
248        ) = self.get_catalog_snapshot().await?;
249        let object_dependencies = self
250            .metadata_manager
251            .catalog_controller
252            .list_created_object_dependencies()
253            .await?;
254
255        // Use the plain text secret value for frontend. The secret value will be masked in frontend handle.
256        let decrypted_secrets = self.decrypt_secrets(secrets)?;
257
258        let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) =
259            self.get_worker_slot_mapping_snapshot().await?;
260
261        let streaming_job_count = self.metadata_manager.count_streaming_job().await?;
262        if streaming_job_count > 0 && streaming_worker_slot_mappings.is_empty() {
263            tracing::warn!(
264                streaming_job_count,
265                "frontend subscribe returns empty streaming_worker_slot_mappings while streaming jobs exist; meta may still be recovering"
266            );
267        }
268
269        let serving_worker_slot_mappings = self.get_serving_vnode_mappings();
270
271        let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;
272
273        let hummock_version = self
274            .hummock_manager
275            .on_current_version(|version| {
276                FrontendHummockVersion::from_version(version).to_protobuf()
277            })
278            .await;
279
280        let session_params = self
281            .env
282            .session_params_manager_impl_ref()
283            .get_params()
284            .await;
285
286        let session_params = Some(GetSessionParamsResponse {
287            params: serde_json::to_string(&session_params)
288                .context("failed to encode session params")?,
289        });
290
291        let cluster_resource = self.get_cluster_resource().await;
292
293        Ok(MetaSnapshot {
294            databases,
295            schemas,
296            sources,
297            sinks,
298            tables,
299            indexes,
300            views,
301            subscriptions,
302            functions,
303            connections,
304            secrets: decrypted_secrets,
305            users,
306            nodes,
307            hummock_version: Some(hummock_version),
308            version: Some(SnapshotVersion {
309                catalog_version,
310                worker_node_version,
311                streaming_worker_slot_mapping_version,
312            }),
313            serving_worker_slot_mappings,
314            streaming_worker_slot_mappings,
315            session_params,
316            object_dependencies,
317            cluster_resource: Some(cluster_resource),
318            ..Default::default()
319        })
320    }
321
322    async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
323        let (tables, catalog_version) = self.get_tables_snapshot().await?;
324        let hummock_version = self
325            .hummock_manager
326            .on_current_version(|version| version.into())
327            .await;
328        let hummock_write_limits = self.hummock_manager.write_limits().await;
329        let meta_backup_manifest_id = self.backup_manager.manifest().await.manifest_id;
330        let cluster_resource = self.get_cluster_resource().await;
331
332        Ok(MetaSnapshot {
333            tables,
334            hummock_version: Some(hummock_version),
335            version: Some(SnapshotVersion {
336                catalog_version,
337                ..Default::default()
338            }),
339            meta_backup_manifest_id: Some(MetaBackupManifestId {
340                id: meta_backup_manifest_id,
341            }),
342            hummock_write_limits: Some(WriteLimits {
343                write_limits: hummock_write_limits,
344            }),
345            cluster_resource: Some(cluster_resource),
346            ..Default::default()
347        })
348    }
349
350    async fn compute_subscribe(&self) -> MetaResult<MetaSnapshot> {
351        let (secrets, catalog_version) = self.get_decrypted_secret_snapshot().await?;
352        let cluster_resource = self.get_cluster_resource().await;
353
354        Ok(MetaSnapshot {
355            secrets,
356            version: Some(SnapshotVersion {
357                catalog_version,
358                ..Default::default()
359            }),
360            cluster_resource: Some(cluster_resource),
361            ..Default::default()
362        })
363    }
364}
365
366#[async_trait::async_trait]
367impl NotificationService for NotificationServiceImpl {
368    type SubscribeStream = UnboundedReceiverStream<Notification>;
369
370    async fn subscribe(
371        &self,
372        request: Request<SubscribeRequest>,
373    ) -> Result<Response<Self::SubscribeStream>, Status> {
374        let req = request.into_inner();
375        let host_address = req.get_host()?.clone();
376        let subscribe_type = req.get_subscribe_type()?;
377
378        let worker_key = WorkerKey(host_address);
379
380        let (tx, rx) = mpsc::unbounded_channel();
381        self.env
382            .notification_manager()
383            .insert_sender(subscribe_type, worker_key.clone(), tx);
384
385        let meta_snapshot = match subscribe_type {
386            SubscribeType::Compactor => self.compactor_subscribe().await?,
387            SubscribeType::Frontend => self.frontend_subscribe().await?,
388            SubscribeType::Hummock => {
389                self.hummock_manager
390                    .pin_version(req.get_worker_id())
391                    .await?;
392                self.hummock_subscribe().await?
393            }
394            SubscribeType::Compute => self.compute_subscribe().await?,
395            SubscribeType::Unspecified => unreachable!(),
396        };
397
398        self.env
399            .notification_manager()
400            .notify_snapshot(worker_key, subscribe_type, meta_snapshot);
401
402        Ok(Response::new(UnboundedReceiverStream::new(rx)))
403    }
404}