risingwave_frontend/
meta_client.rs1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
32use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
33use risingwave_pb::meta::list_actor_states_response::ActorState;
34use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
35use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
36use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
37use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
38use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
39use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
40use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus};
41use risingwave_pb::secret::PbSecretRef;
42use risingwave_rpc_client::error::Result;
43use risingwave_rpc_client::{HummockMetaClient, MetaClient};
44
45use crate::catalog::DatabaseId;
46
47#[async_trait::async_trait]
53pub trait FrontendMetaClient: Send + Sync {
54 async fn try_unregister(&self);
55
56 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
57
58 async fn wait(&self) -> Result<()>;
59
60 async fn recover(&self) -> Result<()>;
61
62 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
63
64 async fn list_table_fragments(
65 &self,
66 table_ids: &[u32],
67 ) -> Result<HashMap<u32, TableFragmentInfo>>;
68
69 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
70
71 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
72
73 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
74
75 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
76
77 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
78
79 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
80
81 async fn get_system_params(&self) -> Result<SystemParamsReader>;
82
83 async fn set_system_param(
84 &self,
85 param: String,
86 value: Option<String>,
87 ) -> Result<Option<SystemParamsReader>>;
88
89 async fn get_session_params(&self) -> Result<SessionConfig>;
90
91 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
92
93 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
94
95 async fn get_tables(
96 &self,
97 table_ids: &[u32],
98 include_dropped_table: bool,
99 ) -> Result<HashMap<u32, Table>>;
100
101 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
103
104 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
105
106 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
107
108 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
109
110 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
111
112 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
113
114 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
115
116 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
117
118 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
119 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
120
121 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
122
123 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
124
125 async fn apply_throttle(
126 &self,
127 kind: PbThrottleTarget,
128 id: u32,
129 rate_limit: Option<u32>,
130 ) -> Result<()>;
131
132 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
133
134 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
135
136 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
137
138 async fn get_meta_store_endpoint(&self) -> Result<String>;
139
140 async fn alter_sink_props(
141 &self,
142 sink_id: u32,
143 changed_props: BTreeMap<String, String>,
144 changed_secret_refs: BTreeMap<String, PbSecretRef>,
145 connector_conn_ref: Option<u32>,
146 ) -> Result<()>;
147
148 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
149}
150
151pub struct FrontendMetaClientImpl(pub MetaClient);
152
153#[async_trait::async_trait]
154impl FrontendMetaClient for FrontendMetaClientImpl {
155 async fn try_unregister(&self) {
156 self.0.try_unregister().await;
157 }
158
159 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
160 self.0.flush(database_id).await
161 }
162
163 async fn wait(&self) -> Result<()> {
164 self.0.wait().await
165 }
166
167 async fn recover(&self) -> Result<()> {
168 self.0.recover().await
169 }
170
171 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
172 self.0.cancel_creating_jobs(infos).await
173 }
174
175 async fn list_table_fragments(
176 &self,
177 table_ids: &[u32],
178 ) -> Result<HashMap<u32, TableFragmentInfo>> {
179 self.0.list_table_fragments(table_ids).await
180 }
181
182 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
183 self.0.list_streaming_job_states().await
184 }
185
186 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
187 self.0.list_fragment_distributions().await
188 }
189
190 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
191 self.0.list_actor_states().await
192 }
193
194 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
195 self.0.list_actor_splits().await
196 }
197
198 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
199 self.0.list_object_dependencies().await
200 }
201
202 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
203 let manifest = self.0.get_meta_snapshot_manifest().await?;
204 Ok(manifest.snapshot_metadata)
205 }
206
207 async fn get_system_params(&self) -> Result<SystemParamsReader> {
208 self.0.get_system_params().await
209 }
210
211 async fn set_system_param(
212 &self,
213 param: String,
214 value: Option<String>,
215 ) -> Result<Option<SystemParamsReader>> {
216 self.0.set_system_param(param, value).await
217 }
218
219 async fn get_session_params(&self) -> Result<SessionConfig> {
220 let session_config: SessionConfig =
221 serde_json::from_str(&self.0.get_session_params().await?)
222 .context("failed to parse session config")?;
223 Ok(session_config)
224 }
225
226 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
227 self.0.set_session_param(param, value).await
228 }
229
230 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
231 let ddl_progress = self.0.get_ddl_progress().await?;
232 Ok(ddl_progress)
233 }
234
235 async fn get_tables(
236 &self,
237 table_ids: &[u32],
238 include_dropped_tables: bool,
239 ) -> Result<HashMap<u32, Table>> {
240 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
241 Ok(tables)
242 }
243
244 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
245 let pinned_versions = self
246 .0
247 .risectl_get_pinned_versions_summary()
248 .await?
249 .summary
250 .unwrap()
251 .pinned_versions;
252 let ret = pinned_versions
253 .into_iter()
254 .map(|v| (v.context_id, v.min_pinned_id))
255 .collect();
256 Ok(ret)
257 }
258
259 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
260 self.0.get_current_version().await
261 }
262
263 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
264 self.0
265 .risectl_get_checkpoint_hummock_version()
266 .await
267 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
268 }
269
270 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
271 self.0
273 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
274 .await
275 }
276
277 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
278 self.0.list_branched_object().await
279 }
280
281 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
282 self.0.risectl_list_compaction_group().await
283 }
284
285 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
286 self.0.list_active_write_limit().await
287 }
288
289 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
290 self.0.list_hummock_meta_config().await
291 }
292
293 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
294 self.0.list_event_log().await
295 }
296
297 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
298 self.0.list_compact_task_assignment().await
299 }
300
301 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
302 self.0.list_worker_nodes(None).await
303 }
304
305 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
306 self.0.list_compact_task_progress().await
307 }
308
309 async fn apply_throttle(
310 &self,
311 kind: PbThrottleTarget,
312 id: u32,
313 rate_limit: Option<u32>,
314 ) -> Result<()> {
315 self.0
316 .apply_throttle(kind, id, rate_limit)
317 .await
318 .map(|_| ())
319 }
320
321 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
322 self.0.get_cluster_recovery_status().await
323 }
324
325 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
326 self.0.get_cluster_limits().await
327 }
328
329 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
330 self.0.list_rate_limits().await
331 }
332
333 async fn get_meta_store_endpoint(&self) -> Result<String> {
334 self.0.get_meta_store_endpoint().await
335 }
336
337 async fn alter_sink_props(
338 &self,
339 sink_id: u32,
340 changed_props: BTreeMap<String, String>,
341 changed_secret_refs: BTreeMap<String, PbSecretRef>,
342 connector_conn_ref: Option<u32>,
343 ) -> Result<()> {
344 self.0
345 .alter_sink_props(
346 sink_id,
347 changed_props,
348 changed_secret_refs,
349 connector_conn_ref,
350 )
351 .await
352 }
353
354 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
355 self.0.list_hosted_iceberg_tables().await
356 }
357}