risingwave_storage/hummock/
hummock_meta_client.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use risingwave_hummock_sdk::change_log::TableChangeLogs;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{CompactionGroupId, ObjectIdRange, SyncResult};
23use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest};
24use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
25use risingwave_pb::id::{HummockSstableId, JobId, TableId};
26use risingwave_rpc_client::error::Result;
27use risingwave_rpc_client::{
28 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
29 IcebergCompactionEventItem, MetaClient,
30};
31use tokio::sync::mpsc::UnboundedSender;
32
33use crate::hummock::{HummockEpoch, HummockVersionId};
34use crate::monitor::HummockMetrics;
35
36pub struct MonitoredHummockMetaClient {
37 meta_client: MetaClient,
38
39 stats: Arc<HummockMetrics>,
40}
41
42impl MonitoredHummockMetaClient {
43 pub fn new(meta_client: MetaClient, stats: Arc<HummockMetrics>) -> MonitoredHummockMetaClient {
44 MonitoredHummockMetaClient { meta_client, stats }
45 }
46
47 pub fn get_inner(&self) -> &MetaClient {
48 &self.meta_client
49 }
50}
51
52#[async_trait]
53impl HummockMetaClient for MonitoredHummockMetaClient {
54 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
55 self.meta_client
56 .unpin_version_before(unpin_version_before)
57 .await
58 }
59
60 async fn get_current_version(&self) -> Result<HummockVersion> {
61 self.meta_client.get_current_version().await
62 }
63
64 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
65 self.stats.get_new_sst_ids_counts.inc();
66 let timer = self.stats.get_new_sst_ids_latency.start_timer();
67 let res = self.meta_client.get_new_object_ids(number).await;
68 timer.observe_duration();
69 res
70 }
71
72 async fn commit_epoch_with_change_log(
73 &self,
74 _epoch: HummockEpoch,
75 _sync_result: SyncResult,
76 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
77 ) -> Result<()> {
78 panic!("Only meta service can commit_epoch in production.")
79 }
80
81 async fn trigger_manual_compaction(
82 &self,
83 compaction_group_id: CompactionGroupId,
84 table_id: JobId,
85 level: u32,
86 target_level: Option<u32>,
87 sst_ids: Vec<HummockSstableId>,
88 exclusive: bool,
89 ) -> Result<bool> {
90 self.meta_client
91 .trigger_manual_compaction(
92 compaction_group_id,
93 table_id,
94 level,
95 target_level,
96 sst_ids,
97 exclusive,
98 )
99 .await
100 }
101
102 async fn trigger_full_gc(
103 &self,
104 sst_retention_time_sec: u64,
105 prefix: Option<String>,
106 ) -> Result<()> {
107 self.meta_client
108 .trigger_full_gc(sst_retention_time_sec, prefix)
109 .await
110 }
111
112 async fn subscribe_compaction_event(
113 &self,
114 ) -> Result<(
115 UnboundedSender<SubscribeCompactionEventRequest>,
116 BoxStream<'static, CompactionEventItem>,
117 )> {
118 self.meta_client.subscribe_compaction_event().await
119 }
120
121 async fn get_version_by_epoch(
122 &self,
123 epoch: HummockEpoch,
124 table_id: TableId,
125 ) -> Result<PbHummockVersion> {
126 self.meta_client.get_version_by_epoch(epoch, table_id).await
127 }
128
129 async fn subscribe_iceberg_compaction_event(
130 &self,
131 ) -> Result<(
132 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
133 BoxStream<'static, IcebergCompactionEventItem>,
134 )> {
135 self.meta_client.subscribe_iceberg_compaction_event().await
136 }
137
138 async fn get_table_change_logs(
139 &self,
140 epoch_only: bool,
141 start_epoch_inclusive: Option<u64>,
142 end_epoch_inclusive: Option<u64>,
143 table_ids: Option<HashSet<TableId>>,
144 exclude_empty: bool,
145 limit: Option<u32>,
146 ) -> Result<TableChangeLogs> {
147 self.meta_client
148 .get_table_change_logs(
149 epoch_only,
150 start_epoch_inclusive,
151 end_epoch_inclusive,
152 table_ids,
153 exclude_empty,
154 limit,
155 )
156 .await
157 }
158}