risingwave_compactor/compactor_observer/
observer_manager.rs1use risingwave_common::license::LicenseManager;
16use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
17use risingwave_common_service::ObserverState;
18use risingwave_pb::catalog::Table;
19use risingwave_pb::meta::SubscribeResponse;
20use risingwave_pb::meta::object::PbObjectInfo;
21use risingwave_pb::meta::subscribe_response::{Info, Operation};
22use risingwave_storage::compaction_catalog_manager::CompactionCatalogManagerRef;
23
24pub struct CompactorObserverNode {
25 compaction_catalog_manager: CompactionCatalogManagerRef,
26 system_params_manager: LocalSystemParamsManagerRef,
27 version: u64,
28}
29
30impl ObserverState for CompactorObserverNode {
31 fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
32 risingwave_pb::meta::SubscribeType::Compactor
33 }
34
35 fn handle_notification(&mut self, resp: SubscribeResponse) {
36 let Some(info) = resp.info.as_ref() else {
37 return;
38 };
39
40 match info.to_owned() {
41 Info::ObjectGroup(object_group) => {
42 for object in object_group.objects {
43 match object.object_info.unwrap() {
44 PbObjectInfo::Table(table_catalog) => {
45 self.handle_catalog_notification(resp.operation(), table_catalog);
46 }
47 _ => panic!("error type notification"),
48 };
49 }
50 assert!(
51 resp.version > self.version,
52 "resp version={:?}, current version={:?}",
53 resp.version,
54 self.version
55 );
56 self.version = resp.version;
57 }
58 Info::HummockVersionDeltas(_) => {}
59 Info::SystemParams(p) => {
60 self.system_params_manager.try_set_params(p);
61 }
62 Info::ComputeNodeTotalCpuCount(count) => {
63 LicenseManager::get().update_cpu_core_count(count as _);
64 }
65 _ => {
66 panic!("error type notification");
67 }
68 }
69 }
70
71 fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
72 let Some(Info::Snapshot(snapshot)) = resp.info else {
73 unreachable!();
74 };
75 self.handle_catalog_snapshot(snapshot.tables);
76 let snapshot_version = snapshot.version.unwrap();
77 self.version = snapshot_version.catalog_version;
78 LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
79 }
80}
81
82impl CompactorObserverNode {
83 pub fn new(
84 compaction_catalog_manager: CompactionCatalogManagerRef,
85 system_params_manager: LocalSystemParamsManagerRef,
86 ) -> Self {
87 Self {
88 compaction_catalog_manager,
89 system_params_manager,
90 version: 0,
91 }
92 }
93
94 fn handle_catalog_snapshot(&mut self, tables: Vec<Table>) {
95 self.compaction_catalog_manager
96 .sync(tables.into_iter().map(|t| (t.id, t)).collect());
97 }
98
99 fn handle_catalog_notification(&mut self, operation: Operation, table_catalog: Table) {
100 match operation {
101 Operation::Add | Operation::Update => {
102 self.compaction_catalog_manager
103 .update(table_catalog.id, table_catalog);
104 }
105
106 Operation::Delete => {
107 self.compaction_catalog_manager.remove(table_catalog.id);
108 }
109
110 _ => panic!("receive an unsupported notify {:?}", operation),
111 }
112 }
113}