risingwave_compute/observer/
observer_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::license::LicenseManager;
16use risingwave_common::secret::LocalSecretManager;
17use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
18use risingwave_common_service::ObserverState;
19use risingwave_pb::meta::SubscribeResponse;
20use risingwave_pb::meta::subscribe_response::{Info, Operation};
21use risingwave_rpc_client::ComputeClientPoolRef;
22
23pub struct ComputeObserverNode {
24    system_params_manager: LocalSystemParamsManagerRef,
25    batch_client_pool: ComputeClientPoolRef,
26}
27
28impl ObserverState for ComputeObserverNode {
29    fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
30        risingwave_pb::meta::SubscribeType::Compute
31    }
32
33    fn handle_notification(&mut self, resp: SubscribeResponse) {
34        if let Some(info) = resp.info.as_ref() {
35            match info.to_owned() {
36                Info::SystemParams(p) => self.system_params_manager.try_set_params(p),
37                Info::Secret(s) => match resp.operation() {
38                    Operation::Add => {
39                        LocalSecretManager::global().add_secret(s.id, s.value);
40                    }
41                    Operation::Delete => {
42                        LocalSecretManager::global().remove_secret(s.id);
43                    }
44                    Operation::Update => {
45                        LocalSecretManager::global().update_secret(s.id, s.value);
46                    }
47                    _ => {
48                        panic!("error type notification");
49                    }
50                },
51                Info::ComputeNodeTotalCpuCount(count) => {
52                    LicenseManager::get().update_cpu_core_count(count as _);
53                }
54                Info::Recovery(_) => {
55                    // Reset batch client pool on recovery is always unnecessary
56                    // when serving and streaming have been separated.
57                    // It can still be used as a method to manually trigger a reset of the batch client pool.
58                    // TODO: invalidate a single batch client on any connection issue.
59                    self.batch_client_pool.invalidate_all();
60                }
61                _ => {
62                    panic!("error type notification");
63                }
64            }
65        };
66    }
67
68    fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
69        let Some(Info::Snapshot(snapshot)) = resp.info else {
70            unreachable!();
71        };
72        LocalSecretManager::global().init_secrets(snapshot.secrets);
73        LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
74    }
75}
76
77impl ComputeObserverNode {
78    pub fn new(
79        system_params_manager: LocalSystemParamsManagerRef,
80        batch_client_pool: ComputeClientPoolRef,
81    ) -> Self {
82        Self {
83            system_params_manager,
84            batch_client_pool,
85        }
86    }
87}