risingwave_storage/hummock/
hummock_meta_client.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use futures::stream::BoxStream;
19use risingwave_hummock_sdk::version::HummockVersion;
20use risingwave_hummock_sdk::{ObjectIdRange, SyncResult};
21use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest};
22use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
23use risingwave_pb::id::{JobId, TableId};
24use risingwave_rpc_client::error::Result;
25use risingwave_rpc_client::{
26 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
27 IcebergCompactionEventItem, MetaClient,
28};
29use tokio::sync::mpsc::UnboundedSender;
30
31use crate::hummock::{HummockEpoch, HummockVersionId};
32use crate::monitor::HummockMetrics;
33
34pub struct MonitoredHummockMetaClient {
35 meta_client: MetaClient,
36
37 stats: Arc<HummockMetrics>,
38}
39
40impl MonitoredHummockMetaClient {
41 pub fn new(meta_client: MetaClient, stats: Arc<HummockMetrics>) -> MonitoredHummockMetaClient {
42 MonitoredHummockMetaClient { meta_client, stats }
43 }
44
45 pub fn get_inner(&self) -> &MetaClient {
46 &self.meta_client
47 }
48}
49
50#[async_trait]
51impl HummockMetaClient for MonitoredHummockMetaClient {
52 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
53 self.meta_client
54 .unpin_version_before(unpin_version_before)
55 .await
56 }
57
58 async fn get_current_version(&self) -> Result<HummockVersion> {
59 self.meta_client.get_current_version().await
60 }
61
62 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
63 self.stats.get_new_sst_ids_counts.inc();
64 let timer = self.stats.get_new_sst_ids_latency.start_timer();
65 let res = self.meta_client.get_new_object_ids(number).await;
66 timer.observe_duration();
67 res
68 }
69
70 async fn commit_epoch_with_change_log(
71 &self,
72 _epoch: HummockEpoch,
73 _sync_result: SyncResult,
74 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
75 ) -> Result<()> {
76 panic!("Only meta service can commit_epoch in production.")
77 }
78
79 async fn trigger_manual_compaction(
80 &self,
81 compaction_group_id: u64,
82 table_id: JobId,
83 level: u32,
84 sst_ids: Vec<u64>,
85 ) -> Result<()> {
86 self.meta_client
87 .trigger_manual_compaction(compaction_group_id, table_id, level, sst_ids)
88 .await
89 }
90
91 async fn trigger_full_gc(
92 &self,
93 sst_retention_time_sec: u64,
94 prefix: Option<String>,
95 ) -> Result<()> {
96 self.meta_client
97 .trigger_full_gc(sst_retention_time_sec, prefix)
98 .await
99 }
100
101 async fn subscribe_compaction_event(
102 &self,
103 ) -> Result<(
104 UnboundedSender<SubscribeCompactionEventRequest>,
105 BoxStream<'static, CompactionEventItem>,
106 )> {
107 self.meta_client.subscribe_compaction_event().await
108 }
109
110 async fn get_version_by_epoch(
111 &self,
112 epoch: HummockEpoch,
113 table_id: TableId,
114 ) -> Result<PbHummockVersion> {
115 self.meta_client.get_version_by_epoch(epoch, table_id).await
116 }
117
118 async fn subscribe_iceberg_compaction_event(
119 &self,
120 ) -> Result<(
121 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
122 BoxStream<'static, IcebergCompactionEventItem>,
123 )> {
124 self.meta_client.subscribe_iceberg_compaction_event().await
125 }
126}