1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
38use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
39use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
40use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
41use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
42use risingwave_pb::meta::{
43 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
44 RefreshRequest, RefreshResponse, list_sink_log_store_tables_response,
45};
46use risingwave_pb::secret::PbSecretRef;
47use risingwave_rpc_client::error::Result;
48use risingwave_rpc_client::{HummockMetaClient, MetaClient};
49
50use crate::catalog::{DatabaseId, FragmentId, SinkId};
51
52#[async_trait::async_trait]
58pub trait FrontendMetaClient: Send + Sync {
59 async fn try_unregister(&self);
60
61 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
62
63 async fn recover(&self) -> Result<()>;
64
65 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
66
67 async fn list_table_fragments(
68 &self,
69 table_ids: &[JobId],
70 ) -> Result<HashMap<JobId, TableFragmentInfo>>;
71
72 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
73
74 async fn list_fragment_distribution(
75 &self,
76 include_node: bool,
77 ) -> Result<Vec<FragmentDistribution>>;
78
79 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
80
81 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
82
83 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
84
85 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
86
87 async fn list_sink_log_store_tables(
88 &self,
89 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>>;
90
91 async fn set_system_param(
92 &self,
93 param: String,
94 value: Option<String>,
95 ) -> Result<Option<SystemParamsReader>>;
96
97 async fn get_session_params(&self) -> Result<SessionConfig>;
98
99 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
100
101 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
102
103 async fn get_tables(
104 &self,
105 table_ids: Vec<TableId>,
106 include_dropped_table: bool,
107 ) -> Result<HashMap<TableId, Table>>;
108
109 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>>;
111
112 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
113
114 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
115
116 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
117
118 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
119
120 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
121
122 async fn list_hummock_active_write_limits(
123 &self,
124 ) -> Result<HashMap<CompactionGroupId, WriteLimit>>;
125
126 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
127
128 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
129 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
130
131 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
132
133 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
134
135 async fn apply_throttle(
136 &self,
137 throttle_target: PbThrottleTarget,
138 throttle_type: risingwave_pb::common::PbThrottleType,
139 id: u32,
140 rate_limit: Option<u32>,
141 ) -> Result<()>;
142
143 async fn alter_fragment_parallelism(
144 &self,
145 fragment_ids: Vec<FragmentId>,
146 parallelism: Option<PbTableParallelism>,
147 ) -> Result<()>;
148
149 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
150
151 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
152
153 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
154
155 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
156
157 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
158
159 async fn get_meta_store_endpoint(&self) -> Result<String>;
160
161 async fn alter_sink_props(
162 &self,
163 sink_id: SinkId,
164 changed_props: BTreeMap<String, String>,
165 changed_secret_refs: BTreeMap<String, PbSecretRef>,
166 connector_conn_ref: Option<ConnectionId>,
167 ) -> Result<()>;
168
169 async fn alter_iceberg_table_props(
170 &self,
171 table_id: TableId,
172 sink_id: SinkId,
173 source_id: SourceId,
174 changed_props: BTreeMap<String, String>,
175 changed_secret_refs: BTreeMap<String, PbSecretRef>,
176 connector_conn_ref: Option<ConnectionId>,
177 ) -> Result<()>;
178
179 async fn alter_source_connector_props(
180 &self,
181 source_id: SourceId,
182 changed_props: BTreeMap<String, String>,
183 changed_secret_refs: BTreeMap<String, PbSecretRef>,
184 connector_conn_ref: Option<ConnectionId>,
185 ) -> Result<()>;
186
187 async fn alter_connection_connector_props(
188 &self,
189 connection_id: u32,
190 changed_props: BTreeMap<String, String>,
191 changed_secret_refs: BTreeMap<String, PbSecretRef>,
192 ) -> Result<()>;
193
194 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
195
196 async fn get_fragment_by_id(
197 &self,
198 fragment_id: FragmentId,
199 ) -> Result<Option<FragmentDistribution>>;
200
201 async fn get_fragment_vnodes(
202 &self,
203 fragment_id: FragmentId,
204 ) -> Result<Vec<(ActorId, Vec<u32>)>>;
205
206 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
207
208 fn worker_id(&self) -> WorkerId;
209
210 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
211
212 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
213
214 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
215
216 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
217
218 fn cluster_id(&self) -> &str;
219
220 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
221}
222
223pub struct FrontendMetaClientImpl(pub MetaClient);
224
225#[async_trait::async_trait]
226impl FrontendMetaClient for FrontendMetaClientImpl {
227 async fn try_unregister(&self) {
228 self.0.try_unregister().await;
229 }
230
231 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
232 self.0.flush(database_id).await
233 }
234
235 async fn recover(&self) -> Result<()> {
236 self.0.recover().await
237 }
238
239 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
240 self.0.cancel_creating_jobs(infos).await
241 }
242
243 async fn list_table_fragments(
244 &self,
245 job_ids: &[JobId],
246 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
247 self.0.list_table_fragments(job_ids).await
248 }
249
250 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
251 self.0.list_streaming_job_states().await
252 }
253
254 async fn list_fragment_distribution(
255 &self,
256 include_node: bool,
257 ) -> Result<Vec<FragmentDistribution>> {
258 self.0.list_fragment_distributions(include_node).await
259 }
260
261 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
262 self.0.list_creating_fragment_distribution().await
263 }
264
265 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
266 self.0.list_actor_states().await
267 }
268
269 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
270 self.0.list_actor_splits().await
271 }
272
273 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
274 let manifest = self.0.get_meta_snapshot_manifest().await?;
275 Ok(manifest.snapshot_metadata)
276 }
277
278 async fn list_sink_log_store_tables(
279 &self,
280 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
281 self.0.list_sink_log_store_tables().await
282 }
283
284 async fn set_system_param(
285 &self,
286 param: String,
287 value: Option<String>,
288 ) -> Result<Option<SystemParamsReader>> {
289 self.0.set_system_param(param, value).await
290 }
291
292 async fn get_session_params(&self) -> Result<SessionConfig> {
293 let session_config: SessionConfig =
294 serde_json::from_str(&self.0.get_session_params().await?)
295 .context("failed to parse session config")?;
296 Ok(session_config)
297 }
298
299 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
300 self.0.set_session_param(param, value).await
301 }
302
303 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
304 let ddl_progress = self.0.get_ddl_progress().await?;
305 Ok(ddl_progress)
306 }
307
308 async fn get_tables(
309 &self,
310 table_ids: Vec<TableId>,
311 include_dropped_tables: bool,
312 ) -> Result<HashMap<TableId, Table>> {
313 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
314 Ok(tables)
315 }
316
317 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>> {
318 let pinned_versions = self
319 .0
320 .risectl_get_pinned_versions_summary()
321 .await?
322 .summary
323 .unwrap()
324 .pinned_versions;
325 let ret = pinned_versions
326 .into_iter()
327 .map(|v| (v.context_id, v.min_pinned_id))
328 .collect();
329 Ok(ret)
330 }
331
332 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
333 self.0.get_current_version().await
334 }
335
336 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
337 self.0
338 .risectl_get_checkpoint_hummock_version()
339 .await
340 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
341 }
342
343 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
344 self.0
346 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
347 .await
348 }
349
350 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
351 self.0.list_branched_object().await
352 }
353
354 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
355 self.0.risectl_list_compaction_group().await
356 }
357
358 async fn list_hummock_active_write_limits(
359 &self,
360 ) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
361 self.0.list_active_write_limit().await
362 }
363
364 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
365 self.0.list_hummock_meta_config().await
366 }
367
368 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
369 self.0.list_event_log().await
370 }
371
372 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
373 self.0.list_compact_task_assignment().await
374 }
375
376 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
377 self.0.list_worker_nodes(None).await
378 }
379
380 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
381 self.0.list_compact_task_progress().await
382 }
383
384 async fn apply_throttle(
385 &self,
386 throttle_target: PbThrottleTarget,
387 throttle_type: risingwave_pb::common::PbThrottleType,
388 id: u32,
389 rate_limit: Option<u32>,
390 ) -> Result<()> {
391 self.0
392 .apply_throttle(throttle_target, throttle_type, id, rate_limit)
393 .await
394 .map(|_| ())
395 }
396
397 async fn alter_fragment_parallelism(
398 &self,
399 fragment_ids: Vec<FragmentId>,
400 parallelism: Option<PbTableParallelism>,
401 ) -> Result<()> {
402 self.0
403 .alter_fragment_parallelism(fragment_ids, parallelism)
404 .await
405 }
406
407 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
408 self.0.get_cluster_recovery_status().await
409 }
410
411 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
412 self.0.get_cluster_limits().await
413 }
414
415 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
416 self.0.list_rate_limits().await
417 }
418
419 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
420 self.0.list_cdc_progress().await
421 }
422
423 async fn get_meta_store_endpoint(&self) -> Result<String> {
424 self.0.get_meta_store_endpoint().await
425 }
426
427 async fn alter_sink_props(
428 &self,
429 sink_id: SinkId,
430 changed_props: BTreeMap<String, String>,
431 changed_secret_refs: BTreeMap<String, PbSecretRef>,
432 connector_conn_ref: Option<ConnectionId>,
433 ) -> Result<()> {
434 self.0
435 .alter_sink_props(
436 sink_id,
437 changed_props,
438 changed_secret_refs,
439 connector_conn_ref,
440 )
441 .await
442 }
443
444 async fn alter_iceberg_table_props(
445 &self,
446 table_id: TableId,
447 sink_id: SinkId,
448 source_id: SourceId,
449 changed_props: BTreeMap<String, String>,
450 changed_secret_refs: BTreeMap<String, PbSecretRef>,
451 connector_conn_ref: Option<ConnectionId>,
452 ) -> Result<()> {
453 self.0
454 .alter_iceberg_table_props(
455 table_id,
456 sink_id,
457 source_id,
458 changed_props,
459 changed_secret_refs,
460 connector_conn_ref,
461 )
462 .await
463 }
464
465 async fn alter_source_connector_props(
466 &self,
467 source_id: SourceId,
468 changed_props: BTreeMap<String, String>,
469 changed_secret_refs: BTreeMap<String, PbSecretRef>,
470 connector_conn_ref: Option<ConnectionId>,
471 ) -> Result<()> {
472 self.0
473 .alter_source_connector_props(
474 source_id,
475 changed_props,
476 changed_secret_refs,
477 connector_conn_ref,
478 )
479 .await
480 }
481
482 async fn alter_connection_connector_props(
483 &self,
484 connection_id: u32,
485 changed_props: BTreeMap<String, String>,
486 changed_secret_refs: BTreeMap<String, PbSecretRef>,
487 ) -> Result<()> {
488 self.0
489 .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
490 .await
491 }
492
493 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
494 self.0.list_hosted_iceberg_tables().await
495 }
496
497 async fn get_fragment_by_id(
498 &self,
499 fragment_id: FragmentId,
500 ) -> Result<Option<FragmentDistribution>> {
501 self.0.get_fragment_by_id(fragment_id).await
502 }
503
504 async fn get_fragment_vnodes(
505 &self,
506 fragment_id: FragmentId,
507 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
508 self.0.get_fragment_vnodes(fragment_id).await
509 }
510
511 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
512 self.0.get_actor_vnodes(actor_id).await
513 }
514
515 fn worker_id(&self) -> WorkerId {
516 self.0.worker_id()
517 }
518
519 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
520 self.0.set_sync_log_store_aligned(job_id, aligned).await
521 }
522
523 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
524 self.0.compact_iceberg_table(sink_id).await
525 }
526
527 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
528 self.0.expire_iceberg_table_snapshots(sink_id).await
529 }
530
531 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
532 self.0.refresh(request).await
533 }
534
535 fn cluster_id(&self) -> &str {
536 self.0.cluster_id()
537 }
538
539 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
540 self.0.list_unmigrated_tables().await
541 }
542
543 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
544 self.0.list_refresh_table_states().await
545 }
546}