risingwave_compactor/compactor_observer/
observer_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::license::LicenseManager;
16use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
17use risingwave_common_service::ObserverState;
18use risingwave_pb::catalog::Table;
19use risingwave_pb::meta::SubscribeResponse;
20use risingwave_pb::meta::object::PbObjectInfo;
21use risingwave_pb::meta::subscribe_response::{Info, Operation};
22use risingwave_storage::compaction_catalog_manager::CompactionCatalogManagerRef;
23
24pub struct CompactorObserverNode {
25    compaction_catalog_manager: CompactionCatalogManagerRef,
26    system_params_manager: LocalSystemParamsManagerRef,
27    version: u64,
28}
29
30impl ObserverState for CompactorObserverNode {
31    fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
32        risingwave_pb::meta::SubscribeType::Compactor
33    }
34
35    fn handle_notification(&mut self, resp: SubscribeResponse) {
36        let Some(info) = resp.info.as_ref() else {
37            return;
38        };
39
40        match info.to_owned() {
41            Info::ObjectGroup(object_group) => {
42                for object in object_group.objects {
43                    match object.object_info.unwrap() {
44                        PbObjectInfo::Table(table_catalog) => {
45                            self.handle_catalog_notification(resp.operation(), table_catalog);
46                        }
47                        _ => panic!("error type notification"),
48                    };
49                }
50                assert!(
51                    resp.version > self.version,
52                    "resp version={:?}, current version={:?}",
53                    resp.version,
54                    self.version
55                );
56                self.version = resp.version;
57            }
58            Info::HummockVersionDeltas(_) => {}
59            Info::SystemParams(p) => {
60                self.system_params_manager.try_set_params(p);
61            }
62            Info::ComputeNodeTotalCpuCount(count) => {
63                LicenseManager::get().update_cpu_core_count(count as _);
64            }
65            _ => {
66                panic!("error type notification");
67            }
68        }
69    }
70
71    fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
72        let Some(Info::Snapshot(snapshot)) = resp.info else {
73            unreachable!();
74        };
75        self.handle_catalog_snapshot(snapshot.tables);
76        let snapshot_version = snapshot.version.unwrap();
77        self.version = snapshot_version.catalog_version;
78        LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
79    }
80}
81
82impl CompactorObserverNode {
83    pub fn new(
84        compaction_catalog_manager: CompactionCatalogManagerRef,
85        system_params_manager: LocalSystemParamsManagerRef,
86    ) -> Self {
87        Self {
88            compaction_catalog_manager,
89            system_params_manager,
90            version: 0,
91        }
92    }
93
94    fn handle_catalog_snapshot(&mut self, tables: Vec<Table>) {
95        self.compaction_catalog_manager
96            .sync(tables.into_iter().map(|t| (t.id, t)).collect());
97    }
98
99    fn handle_catalog_notification(&mut self, operation: Operation, table_catalog: Table) {
100        match operation {
101            Operation::Add | Operation::Update => {
102                self.compaction_catalog_manager
103                    .update(table_catalog.id, table_catalog);
104            }
105
106            Operation::Delete => {
107                self.compaction_catalog_manager.remove(table_catalog.id);
108            }
109
110            _ => panic!("receive an unsupported notify {:?}", operation),
111        }
112    }
113}