risingwave_storage/hummock/
observer_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::license::LicenseManager;
16use risingwave_common_service::ObserverState;
17use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
18use risingwave_hummock_trace::TraceSpan;
19use risingwave_pb::catalog::Table;
20use risingwave_pb::meta::SubscribeResponse;
21use risingwave_pb::meta::object::PbObjectInfo;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use tokio::sync::mpsc::UnboundedSender;
24
25use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
26use crate::hummock::backup_reader::BackupReaderRef;
27use crate::hummock::event_handler::HummockVersionUpdate;
28use crate::hummock::write_limiter::WriteLimiterRef;
29
30pub struct HummockObserverNode {
31    compaction_catalog_manager: CompactionCatalogManagerRef,
32    backup_reader: BackupReaderRef,
33    write_limiter: WriteLimiterRef,
34    version_update_sender: UnboundedSender<HummockVersionUpdate>,
35    version: u64,
36}
37
38impl ObserverState for HummockObserverNode {
39    fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
40        risingwave_pb::meta::SubscribeType::Hummock
41    }
42
43    fn handle_notification(&mut self, resp: SubscribeResponse) {
44        let Some(info) = resp.info.as_ref() else {
45            return;
46        };
47
48        let _span: risingwave_hummock_trace::MayTraceSpan =
49            TraceSpan::new_meta_message_span(resp.clone());
50
51        match info.to_owned() {
52            Info::ObjectGroup(object_group) => {
53                for object in object_group.objects {
54                    match object.object_info.unwrap() {
55                        PbObjectInfo::Table(table_catalog) => {
56                            self.handle_catalog_notification(resp.operation(), table_catalog);
57                        }
58                        _ => panic!("error type notification"),
59                    };
60                }
61                assert!(
62                    resp.version > self.version,
63                    "resp version={:?}, current version={:?}",
64                    resp.version,
65                    self.version
66                );
67                self.version = resp.version;
68            }
69            Info::HummockVersionDeltas(hummock_version_deltas) => {
70                let _ = self
71                    .version_update_sender
72                    .send(HummockVersionUpdate::VersionDeltas(
73                        hummock_version_deltas
74                            .version_deltas
75                            .iter()
76                            .map(HummockVersionDelta::from_rpc_protobuf)
77                            .collect(),
78                    ))
79                    .inspect_err(|e| {
80                        tracing::error!(event = ?e.0, "unable to send version delta");
81                    });
82            }
83
84            Info::MetaBackupManifestId(id) => {
85                self.backup_reader.try_refresh_manifest(id.id);
86            }
87
88            Info::HummockWriteLimits(write_limits) => {
89                self.write_limiter
90                    .update_write_limits(write_limits.write_limits);
91            }
92
93            Info::ComputeNodeTotalCpuCount(count) => {
94                LicenseManager::get().update_cpu_core_count(count as _);
95            }
96
97            _ => {
98                panic!("error type notification");
99            }
100        }
101    }
102
103    fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
104        let _span: risingwave_hummock_trace::MayTraceSpan =
105            TraceSpan::new_meta_message_span(resp.clone());
106
107        let Some(Info::Snapshot(snapshot)) = resp.info else {
108            unreachable!();
109        };
110
111        self.handle_catalog_snapshot(snapshot.tables);
112        self.backup_reader.try_refresh_manifest(
113            snapshot
114                .meta_backup_manifest_id
115                .expect("should get meta backup manifest id")
116                .id,
117        );
118        self.write_limiter.update_write_limits(
119            snapshot
120                .hummock_write_limits
121                .expect("should get hummock_write_limits")
122                .write_limits,
123        );
124        let _ = self
125            .version_update_sender
126            .send(HummockVersionUpdate::PinnedVersion(Box::new(
127                HummockVersion::from_rpc_protobuf(
128                    &snapshot
129                        .hummock_version
130                        .expect("should get hummock version"),
131                ),
132            )))
133            .inspect_err(|e| {
134                tracing::error!(event = ?e.0, "unable to send full version");
135            });
136        let snapshot_version = snapshot.version.unwrap();
137        self.version = snapshot_version.catalog_version;
138        LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
139    }
140}
141
142impl HummockObserverNode {
143    pub fn new(
144        compaction_catalog_manager: CompactionCatalogManagerRef,
145        backup_reader: BackupReaderRef,
146        version_update_sender: UnboundedSender<HummockVersionUpdate>,
147        write_limiter: WriteLimiterRef,
148    ) -> Self {
149        Self {
150            compaction_catalog_manager,
151            backup_reader,
152            version_update_sender,
153            version: 0,
154            write_limiter,
155        }
156    }
157
158    fn handle_catalog_snapshot(&mut self, tables: Vec<Table>) {
159        self.compaction_catalog_manager
160            .sync(tables.into_iter().map(|t| (t.id, t)).collect());
161    }
162
163    fn handle_catalog_notification(&mut self, operation: Operation, table_catalog: Table) {
164        match operation {
165            Operation::Add | Operation::Update => {
166                self.compaction_catalog_manager
167                    .update(table_catalog.id, table_catalog);
168            }
169
170            Operation::Delete => {
171                self.compaction_catalog_manager.remove(table_catalog.id);
172            }
173
174            _ => panic!("receive an unsupported notify {:?}", operation),
175        }
176    }
177}