risingwave_storage/hummock/
hummock_meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use futures::stream::BoxStream;
19use risingwave_hummock_sdk::version::HummockVersion;
20use risingwave_hummock_sdk::{ObjectIdRange, SyncResult};
21use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest};
22use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
23use risingwave_rpc_client::error::Result;
24use risingwave_rpc_client::{
25    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
26    IcebergCompactionEventItem, MetaClient,
27};
28use tokio::sync::mpsc::UnboundedSender;
29
30use crate::hummock::{HummockEpoch, HummockVersionId};
31use crate::monitor::HummockMetrics;
32
33pub struct MonitoredHummockMetaClient {
34    meta_client: MetaClient,
35
36    stats: Arc<HummockMetrics>,
37}
38
39impl MonitoredHummockMetaClient {
40    pub fn new(meta_client: MetaClient, stats: Arc<HummockMetrics>) -> MonitoredHummockMetaClient {
41        MonitoredHummockMetaClient { meta_client, stats }
42    }
43
44    pub fn get_inner(&self) -> &MetaClient {
45        &self.meta_client
46    }
47}
48
49#[async_trait]
50impl HummockMetaClient for MonitoredHummockMetaClient {
51    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
52        self.meta_client
53            .unpin_version_before(unpin_version_before)
54            .await
55    }
56
57    async fn get_current_version(&self) -> Result<HummockVersion> {
58        self.meta_client.get_current_version().await
59    }
60
61    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
62        self.stats.get_new_sst_ids_counts.inc();
63        let timer = self.stats.get_new_sst_ids_latency.start_timer();
64        let res = self.meta_client.get_new_object_ids(number).await;
65        timer.observe_duration();
66        res
67    }
68
69    async fn commit_epoch_with_change_log(
70        &self,
71        _epoch: HummockEpoch,
72        _sync_result: SyncResult,
73        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
74    ) -> Result<()> {
75        panic!("Only meta service can commit_epoch in production.")
76    }
77
78    async fn trigger_manual_compaction(
79        &self,
80        compaction_group_id: u64,
81        table_id: u32,
82        level: u32,
83        sst_ids: Vec<u64>,
84    ) -> Result<()> {
85        self.meta_client
86            .trigger_manual_compaction(compaction_group_id, table_id, level, sst_ids)
87            .await
88    }
89
90    async fn trigger_full_gc(
91        &self,
92        sst_retention_time_sec: u64,
93        prefix: Option<String>,
94    ) -> Result<()> {
95        self.meta_client
96            .trigger_full_gc(sst_retention_time_sec, prefix)
97            .await
98    }
99
100    async fn subscribe_compaction_event(
101        &self,
102    ) -> Result<(
103        UnboundedSender<SubscribeCompactionEventRequest>,
104        BoxStream<'static, CompactionEventItem>,
105    )> {
106        self.meta_client.subscribe_compaction_event().await
107    }
108
109    async fn get_version_by_epoch(
110        &self,
111        epoch: HummockEpoch,
112        table_id: u32,
113    ) -> Result<PbHummockVersion> {
114        self.meta_client.get_version_by_epoch(epoch, table_id).await
115    }
116
117    async fn subscribe_iceberg_compaction_event(
118        &self,
119    ) -> Result<(
120        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
121        BoxStream<'static, IcebergCompactionEventItem>,
122    )> {
123        self.meta_client.subscribe_iceberg_compaction_event().await
124    }
125}