1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
32use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
33use risingwave_pb::meta::list_actor_states_response::ActorState;
34use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
35use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
36use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
37use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
38use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
39use risingwave_pb::meta::{EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus};
40use risingwave_pb::secret::PbSecretRef;
41use risingwave_rpc_client::error::Result;
42use risingwave_rpc_client::{HummockMetaClient, MetaClient};
43
44use crate::catalog::DatabaseId;
45
46#[async_trait::async_trait]
52pub trait FrontendMetaClient: Send + Sync {
53 async fn try_unregister(&self);
54
55 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
56
57 async fn wait(&self) -> Result<()>;
58
59 async fn recover(&self) -> Result<()>;
60
61 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
62
63 async fn list_table_fragments(
64 &self,
65 table_ids: &[u32],
66 ) -> Result<HashMap<u32, TableFragmentInfo>>;
67
68 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
69
70 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
71
72 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
73
74 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
75
76 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
77
78 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
79
80 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
81
82 async fn set_system_param(
83 &self,
84 param: String,
85 value: Option<String>,
86 ) -> Result<Option<SystemParamsReader>>;
87
88 async fn get_session_params(&self) -> Result<SessionConfig>;
89
90 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
91
92 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
93
94 async fn get_tables(
95 &self,
96 table_ids: &[u32],
97 include_dropped_table: bool,
98 ) -> Result<HashMap<u32, Table>>;
99
100 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
102
103 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
104
105 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
106
107 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
108
109 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
110
111 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
112
113 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
114
115 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
116
117 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
118 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
119
120 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
121
122 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
123
124 async fn apply_throttle(
125 &self,
126 kind: PbThrottleTarget,
127 id: u32,
128 rate_limit: Option<u32>,
129 ) -> Result<()>;
130
131 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
132
133 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
134
135 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
136
137 async fn get_meta_store_endpoint(&self) -> Result<String>;
138
139 async fn alter_sink_props(
140 &self,
141 sink_id: u32,
142 changed_props: BTreeMap<String, String>,
143 changed_secret_refs: BTreeMap<String, PbSecretRef>,
144 connector_conn_ref: Option<u32>,
145 ) -> Result<()>;
146
147 async fn alter_source_connector_props(
148 &self,
149 source_id: u32,
150 changed_props: BTreeMap<String, String>,
151 changed_secret_refs: BTreeMap<String, PbSecretRef>,
152 connector_conn_ref: Option<u32>,
153 ) -> Result<()>;
154
155 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
156
157 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>>;
158
159 fn worker_id(&self) -> u32;
160
161 async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()>;
162}
163
164pub struct FrontendMetaClientImpl(pub MetaClient);
165
166#[async_trait::async_trait]
167impl FrontendMetaClient for FrontendMetaClientImpl {
168 async fn try_unregister(&self) {
169 self.0.try_unregister().await;
170 }
171
172 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
173 self.0.flush(database_id).await
174 }
175
176 async fn wait(&self) -> Result<()> {
177 self.0.wait().await
178 }
179
180 async fn recover(&self) -> Result<()> {
181 self.0.recover().await
182 }
183
184 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
185 self.0.cancel_creating_jobs(infos).await
186 }
187
188 async fn list_table_fragments(
189 &self,
190 table_ids: &[u32],
191 ) -> Result<HashMap<u32, TableFragmentInfo>> {
192 self.0.list_table_fragments(table_ids).await
193 }
194
195 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
196 self.0.list_streaming_job_states().await
197 }
198
199 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
200 self.0.list_fragment_distributions().await
201 }
202
203 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
204 self.0.list_creating_fragment_distribution().await
205 }
206
207 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
208 self.0.list_actor_states().await
209 }
210
211 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
212 self.0.list_actor_splits().await
213 }
214
215 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
216 self.0.list_object_dependencies().await
217 }
218
219 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
220 let manifest = self.0.get_meta_snapshot_manifest().await?;
221 Ok(manifest.snapshot_metadata)
222 }
223
224 async fn set_system_param(
225 &self,
226 param: String,
227 value: Option<String>,
228 ) -> Result<Option<SystemParamsReader>> {
229 self.0.set_system_param(param, value).await
230 }
231
232 async fn get_session_params(&self) -> Result<SessionConfig> {
233 let session_config: SessionConfig =
234 serde_json::from_str(&self.0.get_session_params().await?)
235 .context("failed to parse session config")?;
236 Ok(session_config)
237 }
238
239 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
240 self.0.set_session_param(param, value).await
241 }
242
243 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
244 let ddl_progress = self.0.get_ddl_progress().await?;
245 Ok(ddl_progress)
246 }
247
248 async fn get_tables(
249 &self,
250 table_ids: &[u32],
251 include_dropped_tables: bool,
252 ) -> Result<HashMap<u32, Table>> {
253 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
254 Ok(tables)
255 }
256
257 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
258 let pinned_versions = self
259 .0
260 .risectl_get_pinned_versions_summary()
261 .await?
262 .summary
263 .unwrap()
264 .pinned_versions;
265 let ret = pinned_versions
266 .into_iter()
267 .map(|v| (v.context_id, v.min_pinned_id))
268 .collect();
269 Ok(ret)
270 }
271
272 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
273 self.0.get_current_version().await
274 }
275
276 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
277 self.0
278 .risectl_get_checkpoint_hummock_version()
279 .await
280 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
281 }
282
283 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
284 self.0
286 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
287 .await
288 }
289
290 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
291 self.0.list_branched_object().await
292 }
293
294 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
295 self.0.risectl_list_compaction_group().await
296 }
297
298 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
299 self.0.list_active_write_limit().await
300 }
301
302 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
303 self.0.list_hummock_meta_config().await
304 }
305
306 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
307 self.0.list_event_log().await
308 }
309
310 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
311 self.0.list_compact_task_assignment().await
312 }
313
314 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
315 self.0.list_worker_nodes(None).await
316 }
317
318 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
319 self.0.list_compact_task_progress().await
320 }
321
322 async fn apply_throttle(
323 &self,
324 kind: PbThrottleTarget,
325 id: u32,
326 rate_limit: Option<u32>,
327 ) -> Result<()> {
328 self.0
329 .apply_throttle(kind, id, rate_limit)
330 .await
331 .map(|_| ())
332 }
333
334 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
335 self.0.get_cluster_recovery_status().await
336 }
337
338 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
339 self.0.get_cluster_limits().await
340 }
341
342 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
343 self.0.list_rate_limits().await
344 }
345
346 async fn get_meta_store_endpoint(&self) -> Result<String> {
347 self.0.get_meta_store_endpoint().await
348 }
349
350 async fn alter_sink_props(
351 &self,
352 sink_id: u32,
353 changed_props: BTreeMap<String, String>,
354 changed_secret_refs: BTreeMap<String, PbSecretRef>,
355 connector_conn_ref: Option<u32>,
356 ) -> Result<()> {
357 self.0
358 .alter_sink_props(
359 sink_id,
360 changed_props,
361 changed_secret_refs,
362 connector_conn_ref,
363 )
364 .await
365 }
366
367 async fn alter_source_connector_props(
368 &self,
369 source_id: u32,
370 changed_props: BTreeMap<String, String>,
371 changed_secret_refs: BTreeMap<String, PbSecretRef>,
372 connector_conn_ref: Option<u32>,
373 ) -> Result<()> {
374 self.0
375 .alter_source_connector_props(
376 source_id,
377 changed_props,
378 changed_secret_refs,
379 connector_conn_ref,
380 )
381 .await
382 }
383
384 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
385 self.0.list_hosted_iceberg_tables().await
386 }
387
388 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>> {
389 self.0.get_fragment_by_id(fragment_id).await
390 }
391
392 fn worker_id(&self) -> u32 {
393 self.0.worker_id()
394 }
395
396 async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
397 self.0.set_sync_log_store_aligned(job_id, aligned).await
398 }
399}