1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
32use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
33use risingwave_pb::meta::list_actor_states_response::ActorState;
34use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
35use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
36use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
37use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
38use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
39use risingwave_pb::meta::{
40 EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus, RefreshRequest,
41 RefreshResponse,
42};
43use risingwave_pb::secret::PbSecretRef;
44use risingwave_rpc_client::error::Result;
45use risingwave_rpc_client::{HummockMetaClient, MetaClient};
46
47use crate::catalog::{DatabaseId, SinkId};
48
49#[async_trait::async_trait]
55pub trait FrontendMetaClient: Send + Sync {
56 async fn try_unregister(&self);
57
58 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
59
60 async fn wait(&self) -> Result<()>;
61
62 async fn recover(&self) -> Result<()>;
63
64 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
65
66 async fn list_table_fragments(
67 &self,
68 table_ids: &[u32],
69 ) -> Result<HashMap<u32, TableFragmentInfo>>;
70
71 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
72
73 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
74
75 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
76
77 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
78
79 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
80
81 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
82
83 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
84
85 async fn set_system_param(
86 &self,
87 param: String,
88 value: Option<String>,
89 ) -> Result<Option<SystemParamsReader>>;
90
91 async fn get_session_params(&self) -> Result<SessionConfig>;
92
93 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
94
95 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
96
97 async fn get_tables(
98 &self,
99 table_ids: &[u32],
100 include_dropped_table: bool,
101 ) -> Result<HashMap<u32, Table>>;
102
103 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
105
106 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
107
108 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
109
110 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
111
112 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
113
114 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
115
116 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
117
118 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
119
120 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
121 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
122
123 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
124
125 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
126
127 async fn apply_throttle(
128 &self,
129 kind: PbThrottleTarget,
130 id: u32,
131 rate_limit: Option<u32>,
132 ) -> Result<()>;
133
134 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
135
136 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
137
138 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
139
140 async fn get_meta_store_endpoint(&self) -> Result<String>;
141
142 async fn alter_sink_props(
143 &self,
144 sink_id: u32,
145 changed_props: BTreeMap<String, String>,
146 changed_secret_refs: BTreeMap<String, PbSecretRef>,
147 connector_conn_ref: Option<u32>,
148 ) -> Result<()>;
149
150 async fn alter_source_connector_props(
151 &self,
152 source_id: u32,
153 changed_props: BTreeMap<String, String>,
154 changed_secret_refs: BTreeMap<String, PbSecretRef>,
155 connector_conn_ref: Option<u32>,
156 ) -> Result<()>;
157
158 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
159
160 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>>;
161
162 fn worker_id(&self) -> u32;
163
164 async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()>;
165
166 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
167
168 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
169}
170
171pub struct FrontendMetaClientImpl(pub MetaClient);
172
173#[async_trait::async_trait]
174impl FrontendMetaClient for FrontendMetaClientImpl {
175 async fn try_unregister(&self) {
176 self.0.try_unregister().await;
177 }
178
179 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
180 self.0.flush(database_id).await
181 }
182
183 async fn wait(&self) -> Result<()> {
184 self.0.wait().await
185 }
186
187 async fn recover(&self) -> Result<()> {
188 self.0.recover().await
189 }
190
191 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
192 self.0.cancel_creating_jobs(infos).await
193 }
194
195 async fn list_table_fragments(
196 &self,
197 table_ids: &[u32],
198 ) -> Result<HashMap<u32, TableFragmentInfo>> {
199 self.0.list_table_fragments(table_ids).await
200 }
201
202 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
203 self.0.list_streaming_job_states().await
204 }
205
206 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
207 self.0.list_fragment_distributions().await
208 }
209
210 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
211 self.0.list_creating_fragment_distribution().await
212 }
213
214 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
215 self.0.list_actor_states().await
216 }
217
218 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
219 self.0.list_actor_splits().await
220 }
221
222 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
223 self.0.list_object_dependencies().await
224 }
225
226 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
227 let manifest = self.0.get_meta_snapshot_manifest().await?;
228 Ok(manifest.snapshot_metadata)
229 }
230
231 async fn set_system_param(
232 &self,
233 param: String,
234 value: Option<String>,
235 ) -> Result<Option<SystemParamsReader>> {
236 self.0.set_system_param(param, value).await
237 }
238
239 async fn get_session_params(&self) -> Result<SessionConfig> {
240 let session_config: SessionConfig =
241 serde_json::from_str(&self.0.get_session_params().await?)
242 .context("failed to parse session config")?;
243 Ok(session_config)
244 }
245
246 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
247 self.0.set_session_param(param, value).await
248 }
249
250 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
251 let ddl_progress = self.0.get_ddl_progress().await?;
252 Ok(ddl_progress)
253 }
254
255 async fn get_tables(
256 &self,
257 table_ids: &[u32],
258 include_dropped_tables: bool,
259 ) -> Result<HashMap<u32, Table>> {
260 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
261 Ok(tables)
262 }
263
264 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
265 let pinned_versions = self
266 .0
267 .risectl_get_pinned_versions_summary()
268 .await?
269 .summary
270 .unwrap()
271 .pinned_versions;
272 let ret = pinned_versions
273 .into_iter()
274 .map(|v| (v.context_id, v.min_pinned_id))
275 .collect();
276 Ok(ret)
277 }
278
279 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
280 self.0.get_current_version().await
281 }
282
283 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
284 self.0
285 .risectl_get_checkpoint_hummock_version()
286 .await
287 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
288 }
289
290 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
291 self.0
293 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
294 .await
295 }
296
297 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
298 self.0.list_branched_object().await
299 }
300
301 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
302 self.0.risectl_list_compaction_group().await
303 }
304
305 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
306 self.0.list_active_write_limit().await
307 }
308
309 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
310 self.0.list_hummock_meta_config().await
311 }
312
313 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
314 self.0.list_event_log().await
315 }
316
317 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
318 self.0.list_compact_task_assignment().await
319 }
320
321 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
322 self.0.list_worker_nodes(None).await
323 }
324
325 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
326 self.0.list_compact_task_progress().await
327 }
328
329 async fn apply_throttle(
330 &self,
331 kind: PbThrottleTarget,
332 id: u32,
333 rate_limit: Option<u32>,
334 ) -> Result<()> {
335 self.0
336 .apply_throttle(kind, id, rate_limit)
337 .await
338 .map(|_| ())
339 }
340
341 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
342 self.0.get_cluster_recovery_status().await
343 }
344
345 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
346 self.0.get_cluster_limits().await
347 }
348
349 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
350 self.0.list_rate_limits().await
351 }
352
353 async fn get_meta_store_endpoint(&self) -> Result<String> {
354 self.0.get_meta_store_endpoint().await
355 }
356
357 async fn alter_sink_props(
358 &self,
359 sink_id: u32,
360 changed_props: BTreeMap<String, String>,
361 changed_secret_refs: BTreeMap<String, PbSecretRef>,
362 connector_conn_ref: Option<u32>,
363 ) -> Result<()> {
364 self.0
365 .alter_sink_props(
366 sink_id,
367 changed_props,
368 changed_secret_refs,
369 connector_conn_ref,
370 )
371 .await
372 }
373
374 async fn alter_source_connector_props(
375 &self,
376 source_id: u32,
377 changed_props: BTreeMap<String, String>,
378 changed_secret_refs: BTreeMap<String, PbSecretRef>,
379 connector_conn_ref: Option<u32>,
380 ) -> Result<()> {
381 self.0
382 .alter_source_connector_props(
383 source_id,
384 changed_props,
385 changed_secret_refs,
386 connector_conn_ref,
387 )
388 .await
389 }
390
391 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
392 self.0.list_hosted_iceberg_tables().await
393 }
394
395 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>> {
396 self.0.get_fragment_by_id(fragment_id).await
397 }
398
399 fn worker_id(&self) -> u32 {
400 self.0.worker_id()
401 }
402
403 async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
404 self.0.set_sync_log_store_aligned(job_id, aligned).await
405 }
406
407 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
408 self.0.compact_iceberg_table(sink_id).await
409 }
410
411 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
412 self.0.refresh(request).await
413 }
414}