risingwave_storage/hummock/
hummock_meta_client.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use futures::stream::BoxStream;
19use risingwave_hummock_sdk::version::HummockVersion;
20use risingwave_hummock_sdk::{ObjectIdRange, SyncResult};
21use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest};
22use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
23use risingwave_rpc_client::error::Result;
24use risingwave_rpc_client::{
25 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
26 IcebergCompactionEventItem, MetaClient,
27};
28use tokio::sync::mpsc::UnboundedSender;
29
30use crate::hummock::{HummockEpoch, HummockVersionId};
31use crate::monitor::HummockMetrics;
32
33pub struct MonitoredHummockMetaClient {
34 meta_client: MetaClient,
35
36 stats: Arc<HummockMetrics>,
37}
38
39impl MonitoredHummockMetaClient {
40 pub fn new(meta_client: MetaClient, stats: Arc<HummockMetrics>) -> MonitoredHummockMetaClient {
41 MonitoredHummockMetaClient { meta_client, stats }
42 }
43
44 pub fn get_inner(&self) -> &MetaClient {
45 &self.meta_client
46 }
47}
48
49#[async_trait]
50impl HummockMetaClient for MonitoredHummockMetaClient {
51 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
52 self.meta_client
53 .unpin_version_before(unpin_version_before)
54 .await
55 }
56
57 async fn get_current_version(&self) -> Result<HummockVersion> {
58 self.meta_client.get_current_version().await
59 }
60
61 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
62 self.stats.get_new_sst_ids_counts.inc();
63 let timer = self.stats.get_new_sst_ids_latency.start_timer();
64 let res = self.meta_client.get_new_object_ids(number).await;
65 timer.observe_duration();
66 res
67 }
68
69 async fn commit_epoch_with_change_log(
70 &self,
71 _epoch: HummockEpoch,
72 _sync_result: SyncResult,
73 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
74 ) -> Result<()> {
75 panic!("Only meta service can commit_epoch in production.")
76 }
77
78 async fn trigger_manual_compaction(
79 &self,
80 compaction_group_id: u64,
81 table_id: u32,
82 level: u32,
83 sst_ids: Vec<u64>,
84 ) -> Result<()> {
85 self.meta_client
86 .trigger_manual_compaction(compaction_group_id, table_id, level, sst_ids)
87 .await
88 }
89
90 async fn trigger_full_gc(
91 &self,
92 sst_retention_time_sec: u64,
93 prefix: Option<String>,
94 ) -> Result<()> {
95 self.meta_client
96 .trigger_full_gc(sst_retention_time_sec, prefix)
97 .await
98 }
99
100 async fn subscribe_compaction_event(
101 &self,
102 ) -> Result<(
103 UnboundedSender<SubscribeCompactionEventRequest>,
104 BoxStream<'static, CompactionEventItem>,
105 )> {
106 self.meta_client.subscribe_compaction_event().await
107 }
108
109 async fn get_version_by_epoch(
110 &self,
111 epoch: HummockEpoch,
112 table_id: u32,
113 ) -> Result<PbHummockVersion> {
114 self.meta_client.get_version_by_epoch(epoch, table_id).await
115 }
116
117 async fn subscribe_iceberg_compaction_event(
118 &self,
119 ) -> Result<(
120 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
121 BoxStream<'static, IcebergCompactionEventItem>,
122 )> {
123 self.meta_client.subscribe_iceberg_compaction_event().await
124 }
125}