risingwave_compute/observer/
observer_manager.rs1use risingwave_common::license::LicenseManager;
16use risingwave_common::secret::LocalSecretManager;
17use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
18use risingwave_common_service::ObserverState;
19use risingwave_pb::meta::SubscribeResponse;
20use risingwave_pb::meta::subscribe_response::{Info, Operation};
21use risingwave_rpc_client::ComputeClientPoolRef;
22
23pub struct ComputeObserverNode {
24 system_params_manager: LocalSystemParamsManagerRef,
25 batch_client_pool: ComputeClientPoolRef,
26}
27
28impl ObserverState for ComputeObserverNode {
29 fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
30 risingwave_pb::meta::SubscribeType::Compute
31 }
32
33 fn handle_notification(&mut self, resp: SubscribeResponse) {
34 if let Some(info) = resp.info.as_ref() {
35 match info.to_owned() {
36 Info::SystemParams(p) => self.system_params_manager.try_set_params(p),
37 Info::Secret(s) => match resp.operation() {
38 Operation::Add => {
39 LocalSecretManager::global().add_secret(s.id, s.value);
40 }
41 Operation::Delete => {
42 LocalSecretManager::global().remove_secret(s.id);
43 }
44 Operation::Update => {
45 LocalSecretManager::global().update_secret(s.id, s.value);
46 }
47 _ => {
48 panic!("error type notification");
49 }
50 },
51 Info::ComputeNodeTotalCpuCount(count) => {
52 LicenseManager::get().update_cpu_core_count(count as _);
53 }
54 Info::Recovery(_) => {
55 self.batch_client_pool.invalidate_all();
60 }
61 _ => {
62 panic!("error type notification");
63 }
64 }
65 };
66 }
67
68 fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
69 let Some(Info::Snapshot(snapshot)) = resp.info else {
70 unreachable!();
71 };
72 LocalSecretManager::global().init_secrets(snapshot.secrets);
73 LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
74 }
75}
76
77impl ComputeObserverNode {
78 pub fn new(
79 system_params_manager: LocalSystemParamsManagerRef,
80 batch_client_pool: ComputeClientPoolRef,
81 ) -> Self {
82 Self {
83 system_params_manager,
84 batch_client_pool,
85 }
86 }
87}