risingwave_compute/observer/
observer_manager.rs1use risingwave_common::license::LicenseManager;
16use risingwave_common::secret::LocalSecretManager;
17use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
18use risingwave_common_service::ObserverState;
19use risingwave_pb::meta::SubscribeResponse;
20use risingwave_pb::meta::subscribe_response::{Info, Operation};
21
22pub struct ComputeObserverNode {
23 system_params_manager: LocalSystemParamsManagerRef,
24}
25
26impl ObserverState for ComputeObserverNode {
27 fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
28 risingwave_pb::meta::SubscribeType::Compute
29 }
30
31 fn handle_notification(&mut self, resp: SubscribeResponse) {
32 if let Some(info) = resp.info.as_ref() {
33 match info.to_owned() {
34 Info::SystemParams(p) => self.system_params_manager.try_set_params(p),
35 Info::Secret(s) => match resp.operation() {
36 Operation::Add => {
37 LocalSecretManager::global().add_secret(s.id, s.value);
38 }
39 Operation::Delete => {
40 LocalSecretManager::global().remove_secret(s.id);
41 }
42 Operation::Update => {
43 LocalSecretManager::global().update_secret(s.id, s.value);
44 }
45 _ => {
46 panic!("error type notification");
47 }
48 },
49 Info::ComputeNodeTotalCpuCount(count) => {
50 LicenseManager::get().update_cpu_core_count(count as _);
51 }
52 _ => {
53 panic!("error type notification");
54 }
55 }
56 };
57 }
58
59 fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
60 let Some(Info::Snapshot(snapshot)) = resp.info else {
61 unreachable!();
62 };
63 LocalSecretManager::global().init_secrets(snapshot.secrets);
64 LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
65 }
66}
67
68impl ComputeObserverNode {
69 pub fn new(system_params_manager: LocalSystemParamsManagerRef) -> Self {
70 Self {
71 system_params_manager,
72 }
73 }
74}