1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
32use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
33use risingwave_pb::meta::list_actor_states_response::ActorState;
34use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
35use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
36use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
37use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
38use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
39use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
40use risingwave_pb::meta::{
41 EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus, RefreshRequest,
42 RefreshResponse,
43};
44use risingwave_pb::secret::PbSecretRef;
45use risingwave_rpc_client::error::Result;
46use risingwave_rpc_client::{HummockMetaClient, MetaClient};
47
48use crate::catalog::{DatabaseId, SinkId};
49
50#[async_trait::async_trait]
56pub trait FrontendMetaClient: Send + Sync {
57 async fn try_unregister(&self);
58
59 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
60
61 async fn wait(&self) -> Result<()>;
62
63 async fn recover(&self) -> Result<()>;
64
65 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
66
67 async fn list_table_fragments(
68 &self,
69 table_ids: &[u32],
70 ) -> Result<HashMap<u32, TableFragmentInfo>>;
71
72 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
73
74 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
75
76 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
77
78 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
79
80 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
81
82 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
83
84 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
85
86 async fn set_system_param(
87 &self,
88 param: String,
89 value: Option<String>,
90 ) -> Result<Option<SystemParamsReader>>;
91
92 async fn get_session_params(&self) -> Result<SessionConfig>;
93
94 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
95
96 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
97
98 async fn get_tables(
99 &self,
100 table_ids: &[u32],
101 include_dropped_table: bool,
102 ) -> Result<HashMap<u32, Table>>;
103
104 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
106
107 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
108
109 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
110
111 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
112
113 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
114
115 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
116
117 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
118
119 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
120
121 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
122 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
123
124 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
125
126 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
127
128 async fn apply_throttle(
129 &self,
130 kind: PbThrottleTarget,
131 id: u32,
132 rate_limit: Option<u32>,
133 ) -> Result<()>;
134
135 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
136
137 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
138
139 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
140
141 async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>>;
142
143 async fn get_meta_store_endpoint(&self) -> Result<String>;
144
145 async fn alter_sink_props(
146 &self,
147 sink_id: u32,
148 changed_props: BTreeMap<String, String>,
149 changed_secret_refs: BTreeMap<String, PbSecretRef>,
150 connector_conn_ref: Option<u32>,
151 ) -> Result<()>;
152
153 async fn alter_iceberg_table_props(
154 &self,
155 table_id: u32,
156 sink_id: u32,
157 source_id: u32,
158 changed_props: BTreeMap<String, String>,
159 changed_secret_refs: BTreeMap<String, PbSecretRef>,
160 connector_conn_ref: Option<u32>,
161 ) -> Result<()>;
162
163 async fn alter_source_connector_props(
164 &self,
165 source_id: u32,
166 changed_props: BTreeMap<String, String>,
167 changed_secret_refs: BTreeMap<String, PbSecretRef>,
168 connector_conn_ref: Option<u32>,
169 ) -> Result<()>;
170
171 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
172
173 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>>;
174
175 fn worker_id(&self) -> u32;
176
177 async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()>;
178
179 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
180
181 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
182
183 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
184}
185
186pub struct FrontendMetaClientImpl(pub MetaClient);
187
188#[async_trait::async_trait]
189impl FrontendMetaClient for FrontendMetaClientImpl {
190 async fn try_unregister(&self) {
191 self.0.try_unregister().await;
192 }
193
194 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
195 self.0.flush(database_id).await
196 }
197
198 async fn wait(&self) -> Result<()> {
199 self.0.wait().await
200 }
201
202 async fn recover(&self) -> Result<()> {
203 self.0.recover().await
204 }
205
206 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
207 self.0.cancel_creating_jobs(infos).await
208 }
209
210 async fn list_table_fragments(
211 &self,
212 table_ids: &[u32],
213 ) -> Result<HashMap<u32, TableFragmentInfo>> {
214 self.0.list_table_fragments(table_ids).await
215 }
216
217 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
218 self.0.list_streaming_job_states().await
219 }
220
221 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
222 self.0.list_fragment_distributions().await
223 }
224
225 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
226 self.0.list_creating_fragment_distribution().await
227 }
228
229 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
230 self.0.list_actor_states().await
231 }
232
233 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
234 self.0.list_actor_splits().await
235 }
236
237 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
238 self.0.list_object_dependencies().await
239 }
240
241 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
242 let manifest = self.0.get_meta_snapshot_manifest().await?;
243 Ok(manifest.snapshot_metadata)
244 }
245
246 async fn set_system_param(
247 &self,
248 param: String,
249 value: Option<String>,
250 ) -> Result<Option<SystemParamsReader>> {
251 self.0.set_system_param(param, value).await
252 }
253
254 async fn get_session_params(&self) -> Result<SessionConfig> {
255 let session_config: SessionConfig =
256 serde_json::from_str(&self.0.get_session_params().await?)
257 .context("failed to parse session config")?;
258 Ok(session_config)
259 }
260
261 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
262 self.0.set_session_param(param, value).await
263 }
264
265 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
266 let ddl_progress = self.0.get_ddl_progress().await?;
267 Ok(ddl_progress)
268 }
269
270 async fn get_tables(
271 &self,
272 table_ids: &[u32],
273 include_dropped_tables: bool,
274 ) -> Result<HashMap<u32, Table>> {
275 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
276 Ok(tables)
277 }
278
279 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
280 let pinned_versions = self
281 .0
282 .risectl_get_pinned_versions_summary()
283 .await?
284 .summary
285 .unwrap()
286 .pinned_versions;
287 let ret = pinned_versions
288 .into_iter()
289 .map(|v| (v.context_id, v.min_pinned_id))
290 .collect();
291 Ok(ret)
292 }
293
294 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
295 self.0.get_current_version().await
296 }
297
298 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
299 self.0
300 .risectl_get_checkpoint_hummock_version()
301 .await
302 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
303 }
304
305 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
306 self.0
308 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
309 .await
310 }
311
312 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
313 self.0.list_branched_object().await
314 }
315
316 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
317 self.0.risectl_list_compaction_group().await
318 }
319
320 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
321 self.0.list_active_write_limit().await
322 }
323
324 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
325 self.0.list_hummock_meta_config().await
326 }
327
328 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
329 self.0.list_event_log().await
330 }
331
332 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
333 self.0.list_compact_task_assignment().await
334 }
335
336 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
337 self.0.list_worker_nodes(None).await
338 }
339
340 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
341 self.0.list_compact_task_progress().await
342 }
343
344 async fn apply_throttle(
345 &self,
346 kind: PbThrottleTarget,
347 id: u32,
348 rate_limit: Option<u32>,
349 ) -> Result<()> {
350 self.0
351 .apply_throttle(kind, id, rate_limit)
352 .await
353 .map(|_| ())
354 }
355
356 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
357 self.0.get_cluster_recovery_status().await
358 }
359
360 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
361 self.0.get_cluster_limits().await
362 }
363
364 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
365 self.0.list_rate_limits().await
366 }
367
368 async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>> {
369 self.0.list_cdc_progress().await
370 }
371
372 async fn get_meta_store_endpoint(&self) -> Result<String> {
373 self.0.get_meta_store_endpoint().await
374 }
375
376 async fn alter_sink_props(
377 &self,
378 sink_id: u32,
379 changed_props: BTreeMap<String, String>,
380 changed_secret_refs: BTreeMap<String, PbSecretRef>,
381 connector_conn_ref: Option<u32>,
382 ) -> Result<()> {
383 self.0
384 .alter_sink_props(
385 sink_id,
386 changed_props,
387 changed_secret_refs,
388 connector_conn_ref,
389 )
390 .await
391 }
392
393 async fn alter_iceberg_table_props(
394 &self,
395 table_id: u32,
396 sink_id: u32,
397 source_id: u32,
398 changed_props: BTreeMap<String, String>,
399 changed_secret_refs: BTreeMap<String, PbSecretRef>,
400 connector_conn_ref: Option<u32>,
401 ) -> Result<()> {
402 self.0
403 .alter_iceberg_table_props(
404 table_id,
405 sink_id,
406 source_id,
407 changed_props,
408 changed_secret_refs,
409 connector_conn_ref,
410 )
411 .await
412 }
413
414 async fn alter_source_connector_props(
415 &self,
416 source_id: u32,
417 changed_props: BTreeMap<String, String>,
418 changed_secret_refs: BTreeMap<String, PbSecretRef>,
419 connector_conn_ref: Option<u32>,
420 ) -> Result<()> {
421 self.0
422 .alter_source_connector_props(
423 source_id,
424 changed_props,
425 changed_secret_refs,
426 connector_conn_ref,
427 )
428 .await
429 }
430
431 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
432 self.0.list_hosted_iceberg_tables().await
433 }
434
435 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>> {
436 self.0.get_fragment_by_id(fragment_id).await
437 }
438
439 fn worker_id(&self) -> u32 {
440 self.0.worker_id()
441 }
442
443 async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
444 self.0.set_sync_log_store_aligned(job_id, aligned).await
445 }
446
447 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
448 self.0.compact_iceberg_table(sink_id).await
449 }
450
451 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
452 self.0.expire_iceberg_table_snapshots(sink_id).await
453 }
454
455 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
456 self.0.refresh(request).await
457 }
458}