risingwave_storage/hummock/
hummock_meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use futures::stream::BoxStream;
19use risingwave_hummock_sdk::version::HummockVersion;
20use risingwave_hummock_sdk::{ObjectIdRange, SyncResult};
21use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest};
22use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
23use risingwave_pb::id::{JobId, TableId};
24use risingwave_rpc_client::error::Result;
25use risingwave_rpc_client::{
26    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
27    IcebergCompactionEventItem, MetaClient,
28};
29use tokio::sync::mpsc::UnboundedSender;
30
31use crate::hummock::{HummockEpoch, HummockVersionId};
32use crate::monitor::HummockMetrics;
33
34pub struct MonitoredHummockMetaClient {
35    meta_client: MetaClient,
36
37    stats: Arc<HummockMetrics>,
38}
39
40impl MonitoredHummockMetaClient {
41    pub fn new(meta_client: MetaClient, stats: Arc<HummockMetrics>) -> MonitoredHummockMetaClient {
42        MonitoredHummockMetaClient { meta_client, stats }
43    }
44
45    pub fn get_inner(&self) -> &MetaClient {
46        &self.meta_client
47    }
48}
49
50#[async_trait]
51impl HummockMetaClient for MonitoredHummockMetaClient {
52    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
53        self.meta_client
54            .unpin_version_before(unpin_version_before)
55            .await
56    }
57
58    async fn get_current_version(&self) -> Result<HummockVersion> {
59        self.meta_client.get_current_version().await
60    }
61
62    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
63        self.stats.get_new_sst_ids_counts.inc();
64        let timer = self.stats.get_new_sst_ids_latency.start_timer();
65        let res = self.meta_client.get_new_object_ids(number).await;
66        timer.observe_duration();
67        res
68    }
69
70    async fn commit_epoch_with_change_log(
71        &self,
72        _epoch: HummockEpoch,
73        _sync_result: SyncResult,
74        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
75    ) -> Result<()> {
76        panic!("Only meta service can commit_epoch in production.")
77    }
78
79    async fn trigger_manual_compaction(
80        &self,
81        compaction_group_id: u64,
82        table_id: JobId,
83        level: u32,
84        sst_ids: Vec<u64>,
85    ) -> Result<()> {
86        self.meta_client
87            .trigger_manual_compaction(compaction_group_id, table_id, level, sst_ids)
88            .await
89    }
90
91    async fn trigger_full_gc(
92        &self,
93        sst_retention_time_sec: u64,
94        prefix: Option<String>,
95    ) -> Result<()> {
96        self.meta_client
97            .trigger_full_gc(sst_retention_time_sec, prefix)
98            .await
99    }
100
101    async fn subscribe_compaction_event(
102        &self,
103    ) -> Result<(
104        UnboundedSender<SubscribeCompactionEventRequest>,
105        BoxStream<'static, CompactionEventItem>,
106    )> {
107        self.meta_client.subscribe_compaction_event().await
108    }
109
110    async fn get_version_by_epoch(
111        &self,
112        epoch: HummockEpoch,
113        table_id: TableId,
114    ) -> Result<PbHummockVersion> {
115        self.meta_client.get_version_by_epoch(epoch, table_id).await
116    }
117
118    async fn subscribe_iceberg_compaction_event(
119        &self,
120    ) -> Result<(
121        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
122        BoxStream<'static, IcebergCompactionEventItem>,
123    )> {
124        self.meta_client.subscribe_iceberg_compaction_event().await
125    }
126}