1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::id::JobId;
32use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
33use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
34use risingwave_pb::meta::list_actor_states_response::ActorState;
35use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
36use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
37use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
38use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
39use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
40use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
41use risingwave_pb::meta::{
42 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
43 RefreshRequest, RefreshResponse,
44};
45use risingwave_pb::secret::PbSecretRef;
46use risingwave_rpc_client::error::Result;
47use risingwave_rpc_client::{HummockMetaClient, MetaClient};
48
49use crate::catalog::{DatabaseId, SinkId, TableId};
50
51#[async_trait::async_trait]
57pub trait FrontendMetaClient: Send + Sync {
58 async fn try_unregister(&self);
59
60 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
61
62 async fn wait(&self) -> Result<()>;
63
64 async fn recover(&self) -> Result<()>;
65
66 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
67
68 async fn list_table_fragments(
69 &self,
70 table_ids: &[JobId],
71 ) -> Result<HashMap<JobId, TableFragmentInfo>>;
72
73 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
74
75 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
76
77 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
78
79 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
80
81 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
82
83 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
84
85 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
86
87 async fn set_system_param(
88 &self,
89 param: String,
90 value: Option<String>,
91 ) -> Result<Option<SystemParamsReader>>;
92
93 async fn get_session_params(&self) -> Result<SessionConfig>;
94
95 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
96
97 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
98
99 async fn get_tables(
100 &self,
101 table_ids: Vec<TableId>,
102 include_dropped_table: bool,
103 ) -> Result<HashMap<TableId, Table>>;
104
105 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
107
108 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
109
110 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
111
112 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
113
114 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
115
116 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
117
118 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
119
120 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
121
122 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
123 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
124
125 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
126
127 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
128
129 async fn apply_throttle(
130 &self,
131 kind: PbThrottleTarget,
132 id: u32,
133 rate_limit: Option<u32>,
134 ) -> Result<()>;
135
136 async fn alter_fragment_parallelism(
137 &self,
138 fragment_ids: Vec<u32>,
139 parallelism: Option<PbTableParallelism>,
140 ) -> Result<()>;
141
142 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
143
144 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
145
146 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
147
148 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
149
150 async fn get_meta_store_endpoint(&self) -> Result<String>;
151
152 async fn alter_sink_props(
153 &self,
154 sink_id: u32,
155 changed_props: BTreeMap<String, String>,
156 changed_secret_refs: BTreeMap<String, PbSecretRef>,
157 connector_conn_ref: Option<u32>,
158 ) -> Result<()>;
159
160 async fn alter_iceberg_table_props(
161 &self,
162 table_id: u32,
163 sink_id: u32,
164 source_id: u32,
165 changed_props: BTreeMap<String, String>,
166 changed_secret_refs: BTreeMap<String, PbSecretRef>,
167 connector_conn_ref: Option<u32>,
168 ) -> Result<()>;
169
170 async fn alter_source_connector_props(
171 &self,
172 source_id: u32,
173 changed_props: BTreeMap<String, String>,
174 changed_secret_refs: BTreeMap<String, PbSecretRef>,
175 connector_conn_ref: Option<u32>,
176 ) -> Result<()>;
177
178 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
179
180 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>>;
181
182 async fn get_fragment_vnodes(&self, fragment_id: u32) -> Result<Vec<(u32, Vec<u32>)>>;
183
184 async fn get_actor_vnodes(&self, actor_id: u32) -> Result<Vec<u32>>;
185
186 fn worker_id(&self) -> u32;
187
188 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
189
190 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
191
192 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
193
194 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
195
196 fn cluster_id(&self) -> &str;
197
198 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
199}
200
201pub struct FrontendMetaClientImpl(pub MetaClient);
202
203#[async_trait::async_trait]
204impl FrontendMetaClient for FrontendMetaClientImpl {
205 async fn try_unregister(&self) {
206 self.0.try_unregister().await;
207 }
208
209 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
210 self.0.flush(database_id).await
211 }
212
213 async fn wait(&self) -> Result<()> {
214 self.0.wait().await
215 }
216
217 async fn recover(&self) -> Result<()> {
218 self.0.recover().await
219 }
220
221 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
222 self.0.cancel_creating_jobs(infos).await
223 }
224
225 async fn list_table_fragments(
226 &self,
227 table_ids: &[JobId],
228 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
229 self.0.list_table_fragments(table_ids).await
230 }
231
232 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
233 self.0.list_streaming_job_states().await
234 }
235
236 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
237 self.0.list_fragment_distributions().await
238 }
239
240 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
241 self.0.list_creating_fragment_distribution().await
242 }
243
244 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
245 self.0.list_actor_states().await
246 }
247
248 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
249 self.0.list_actor_splits().await
250 }
251
252 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
253 self.0.list_object_dependencies().await
254 }
255
256 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
257 let manifest = self.0.get_meta_snapshot_manifest().await?;
258 Ok(manifest.snapshot_metadata)
259 }
260
261 async fn set_system_param(
262 &self,
263 param: String,
264 value: Option<String>,
265 ) -> Result<Option<SystemParamsReader>> {
266 self.0.set_system_param(param, value).await
267 }
268
269 async fn get_session_params(&self) -> Result<SessionConfig> {
270 let session_config: SessionConfig =
271 serde_json::from_str(&self.0.get_session_params().await?)
272 .context("failed to parse session config")?;
273 Ok(session_config)
274 }
275
276 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
277 self.0.set_session_param(param, value).await
278 }
279
280 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
281 let ddl_progress = self.0.get_ddl_progress().await?;
282 Ok(ddl_progress)
283 }
284
285 async fn get_tables(
286 &self,
287 table_ids: Vec<TableId>,
288 include_dropped_tables: bool,
289 ) -> Result<HashMap<TableId, Table>> {
290 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
291 Ok(tables)
292 }
293
294 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
295 let pinned_versions = self
296 .0
297 .risectl_get_pinned_versions_summary()
298 .await?
299 .summary
300 .unwrap()
301 .pinned_versions;
302 let ret = pinned_versions
303 .into_iter()
304 .map(|v| (v.context_id, v.min_pinned_id))
305 .collect();
306 Ok(ret)
307 }
308
309 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
310 self.0.get_current_version().await
311 }
312
313 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
314 self.0
315 .risectl_get_checkpoint_hummock_version()
316 .await
317 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
318 }
319
320 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
321 self.0
323 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
324 .await
325 }
326
327 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
328 self.0.list_branched_object().await
329 }
330
331 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
332 self.0.risectl_list_compaction_group().await
333 }
334
335 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
336 self.0.list_active_write_limit().await
337 }
338
339 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
340 self.0.list_hummock_meta_config().await
341 }
342
343 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
344 self.0.list_event_log().await
345 }
346
347 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
348 self.0.list_compact_task_assignment().await
349 }
350
351 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
352 self.0.list_worker_nodes(None).await
353 }
354
355 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
356 self.0.list_compact_task_progress().await
357 }
358
359 async fn apply_throttle(
360 &self,
361 kind: PbThrottleTarget,
362 id: u32,
363 rate_limit: Option<u32>,
364 ) -> Result<()> {
365 self.0
366 .apply_throttle(kind, id, rate_limit)
367 .await
368 .map(|_| ())
369 }
370
371 async fn alter_fragment_parallelism(
372 &self,
373 fragment_ids: Vec<u32>,
374 parallelism: Option<PbTableParallelism>,
375 ) -> Result<()> {
376 self.0
377 .alter_fragment_parallelism(fragment_ids, parallelism)
378 .await
379 }
380
381 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
382 self.0.get_cluster_recovery_status().await
383 }
384
385 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
386 self.0.get_cluster_limits().await
387 }
388
389 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
390 self.0.list_rate_limits().await
391 }
392
393 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
394 self.0.list_cdc_progress().await
395 }
396
397 async fn get_meta_store_endpoint(&self) -> Result<String> {
398 self.0.get_meta_store_endpoint().await
399 }
400
401 async fn alter_sink_props(
402 &self,
403 sink_id: u32,
404 changed_props: BTreeMap<String, String>,
405 changed_secret_refs: BTreeMap<String, PbSecretRef>,
406 connector_conn_ref: Option<u32>,
407 ) -> Result<()> {
408 self.0
409 .alter_sink_props(
410 sink_id,
411 changed_props,
412 changed_secret_refs,
413 connector_conn_ref,
414 )
415 .await
416 }
417
418 async fn alter_iceberg_table_props(
419 &self,
420 table_id: u32,
421 sink_id: u32,
422 source_id: u32,
423 changed_props: BTreeMap<String, String>,
424 changed_secret_refs: BTreeMap<String, PbSecretRef>,
425 connector_conn_ref: Option<u32>,
426 ) -> Result<()> {
427 self.0
428 .alter_iceberg_table_props(
429 table_id,
430 sink_id,
431 source_id,
432 changed_props,
433 changed_secret_refs,
434 connector_conn_ref,
435 )
436 .await
437 }
438
439 async fn alter_source_connector_props(
440 &self,
441 source_id: u32,
442 changed_props: BTreeMap<String, String>,
443 changed_secret_refs: BTreeMap<String, PbSecretRef>,
444 connector_conn_ref: Option<u32>,
445 ) -> Result<()> {
446 self.0
447 .alter_source_connector_props(
448 source_id,
449 changed_props,
450 changed_secret_refs,
451 connector_conn_ref,
452 )
453 .await
454 }
455
456 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
457 self.0.list_hosted_iceberg_tables().await
458 }
459
460 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>> {
461 self.0.get_fragment_by_id(fragment_id).await
462 }
463
464 async fn get_fragment_vnodes(&self, fragment_id: u32) -> Result<Vec<(u32, Vec<u32>)>> {
465 self.0.get_fragment_vnodes(fragment_id).await
466 }
467
468 async fn get_actor_vnodes(&self, actor_id: u32) -> Result<Vec<u32>> {
469 self.0.get_actor_vnodes(actor_id).await
470 }
471
472 fn worker_id(&self) -> u32 {
473 self.0.worker_id()
474 }
475
476 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
477 self.0.set_sync_log_store_aligned(job_id, aligned).await
478 }
479
480 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
481 self.0.compact_iceberg_table(sink_id).await
482 }
483
484 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
485 self.0.expire_iceberg_table_snapshots(sink_id).await
486 }
487
488 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
489 self.0.refresh(request).await
490 }
491
492 fn cluster_id(&self) -> &str {
493 self.0.cluster_id()
494 }
495
496 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
497 self.0.list_unmigrated_tables().await
498 }
499}