risingwave_storage/hummock/
observer_manager.rs1use risingwave_common::license::LicenseManager;
16use risingwave_common_service::ObserverState;
17use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
18use risingwave_hummock_trace::TraceSpan;
19use risingwave_pb::catalog::Table;
20use risingwave_pb::meta::SubscribeResponse;
21use risingwave_pb::meta::object::PbObjectInfo;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use tokio::sync::mpsc::UnboundedSender;
24
25use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
26use crate::hummock::backup_reader::BackupReaderRef;
27use crate::hummock::event_handler::HummockVersionUpdate;
28use crate::hummock::write_limiter::WriteLimiterRef;
29
30pub struct HummockObserverNode {
31 compaction_catalog_manager: CompactionCatalogManagerRef,
32 backup_reader: BackupReaderRef,
33 write_limiter: WriteLimiterRef,
34 version_update_sender: UnboundedSender<HummockVersionUpdate>,
35 version: u64,
36}
37
38impl ObserverState for HummockObserverNode {
39 fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
40 risingwave_pb::meta::SubscribeType::Hummock
41 }
42
43 fn handle_notification(&mut self, resp: SubscribeResponse) {
44 let Some(info) = resp.info.as_ref() else {
45 return;
46 };
47
48 let _span: risingwave_hummock_trace::MayTraceSpan =
49 TraceSpan::new_meta_message_span(resp.clone());
50
51 match info.to_owned() {
52 Info::ObjectGroup(object_group) => {
53 for object in object_group.objects {
54 match object.object_info.unwrap() {
55 PbObjectInfo::Table(table_catalog) => {
56 self.handle_catalog_notification(resp.operation(), table_catalog);
57 }
58 _ => panic!("error type notification"),
59 };
60 }
61 assert!(
62 resp.version > self.version,
63 "resp version={:?}, current version={:?}",
64 resp.version,
65 self.version
66 );
67 self.version = resp.version;
68 }
69 Info::HummockVersionDeltas(hummock_version_deltas) => {
70 let _ = self
71 .version_update_sender
72 .send(HummockVersionUpdate::VersionDeltas(
73 hummock_version_deltas
74 .version_deltas
75 .iter()
76 .map(HummockVersionDelta::from_rpc_protobuf)
77 .collect(),
78 ))
79 .inspect_err(|e| {
80 tracing::error!(event = ?e.0, "unable to send version delta");
81 });
82 }
83
84 Info::MetaBackupManifestId(id) => {
85 self.backup_reader.try_refresh_manifest(id.id);
86 }
87
88 Info::HummockWriteLimits(write_limits) => {
89 self.write_limiter
90 .update_write_limits(write_limits.write_limits);
91 }
92
93 Info::ComputeNodeTotalCpuCount(count) => {
94 LicenseManager::get().update_cpu_core_count(count as _);
95 }
96
97 _ => {
98 panic!("error type notification");
99 }
100 }
101 }
102
103 fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
104 let _span: risingwave_hummock_trace::MayTraceSpan =
105 TraceSpan::new_meta_message_span(resp.clone());
106
107 let Some(Info::Snapshot(snapshot)) = resp.info else {
108 unreachable!();
109 };
110
111 self.handle_catalog_snapshot(snapshot.tables);
112 self.backup_reader.try_refresh_manifest(
113 snapshot
114 .meta_backup_manifest_id
115 .expect("should get meta backup manifest id")
116 .id,
117 );
118 self.write_limiter.update_write_limits(
119 snapshot
120 .hummock_write_limits
121 .expect("should get hummock_write_limits")
122 .write_limits,
123 );
124 let _ = self
125 .version_update_sender
126 .send(HummockVersionUpdate::PinnedVersion(Box::new(
127 HummockVersion::from_rpc_protobuf(
128 &snapshot
129 .hummock_version
130 .expect("should get hummock version"),
131 ),
132 )))
133 .inspect_err(|e| {
134 tracing::error!(event = ?e.0, "unable to send full version");
135 });
136 let snapshot_version = snapshot.version.unwrap();
137 self.version = snapshot_version.catalog_version;
138 LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
139 }
140}
141
142impl HummockObserverNode {
143 pub fn new(
144 compaction_catalog_manager: CompactionCatalogManagerRef,
145 backup_reader: BackupReaderRef,
146 version_update_sender: UnboundedSender<HummockVersionUpdate>,
147 write_limiter: WriteLimiterRef,
148 ) -> Self {
149 Self {
150 compaction_catalog_manager,
151 backup_reader,
152 version_update_sender,
153 version: 0,
154 write_limiter,
155 }
156 }
157
158 fn handle_catalog_snapshot(&mut self, tables: Vec<Table>) {
159 self.compaction_catalog_manager
160 .sync(tables.into_iter().map(|t| (t.id, t)).collect());
161 }
162
163 fn handle_catalog_notification(&mut self, operation: Operation, table_catalog: Table) {
164 match operation {
165 Operation::Add | Operation::Update => {
166 self.compaction_catalog_manager
167 .update(table_catalog.id, table_catalog);
168 }
169
170 Operation::Delete => {
171 self.compaction_catalog_manager.remove(table_catalog.id);
172 }
173
174 _ => panic!("receive an unsupported notify {:?}", operation),
175 }
176 }
177}