risingwave_compute/observer/
observer_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::license::LicenseManager;
16use risingwave_common::secret::LocalSecretManager;
17use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
18use risingwave_common_service::ObserverState;
19use risingwave_pb::meta::SubscribeResponse;
20use risingwave_pb::meta::subscribe_response::{Info, Operation};
21
22pub struct ComputeObserverNode {
23    system_params_manager: LocalSystemParamsManagerRef,
24}
25
26impl ObserverState for ComputeObserverNode {
27    fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
28        risingwave_pb::meta::SubscribeType::Compute
29    }
30
31    fn handle_notification(&mut self, resp: SubscribeResponse) {
32        if let Some(info) = resp.info.as_ref() {
33            match info.to_owned() {
34                Info::SystemParams(p) => self.system_params_manager.try_set_params(p),
35                Info::Secret(s) => match resp.operation() {
36                    Operation::Add => {
37                        LocalSecretManager::global().add_secret(s.id, s.value);
38                    }
39                    Operation::Delete => {
40                        LocalSecretManager::global().remove_secret(s.id);
41                    }
42                    Operation::Update => {
43                        LocalSecretManager::global().update_secret(s.id, s.value);
44                    }
45                    _ => {
46                        panic!("error type notification");
47                    }
48                },
49                Info::ComputeNodeTotalCpuCount(count) => {
50                    LicenseManager::get().update_cpu_core_count(count as _);
51                }
52                _ => {
53                    panic!("error type notification");
54                }
55            }
56        };
57    }
58
59    fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
60        let Some(Info::Snapshot(snapshot)) = resp.info else {
61            unreachable!();
62        };
63        LocalSecretManager::global().init_secrets(snapshot.secrets);
64        LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
65    }
66}
67
68impl ComputeObserverNode {
69    pub fn new(system_params_manager: LocalSystemParamsManagerRef) -> Self {
70        Self {
71            system_params_manager,
72        }
73    }
74}