1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::change_log::TableChangeLogs;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
25use risingwave_pb::backup_service::{BackupJobStatus, MetaSnapshotMetadata};
26use risingwave_pb::catalog::Table;
27use risingwave_pb::common::WorkerNode;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::write_limits::WriteLimit;
30use risingwave_pb::hummock::{
31 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
32};
33use risingwave_pb::id::ActorId;
34use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
35use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
36use risingwave_pb::meta::list_actor_states_response::ActorState;
37use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
38use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
39use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
40use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
41use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
42use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
43use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
44use risingwave_pb::meta::{
45 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
46 RefreshRequest, RefreshResponse, list_sink_log_store_tables_response,
47};
48use risingwave_pb::secret::PbSecretRef;
49use risingwave_rpc_client::error::Result;
50use risingwave_rpc_client::{HummockMetaClient, MetaClient};
51
52use crate::catalog::{DatabaseId, FragmentId, SinkId};
53
54#[async_trait::async_trait]
60pub trait FrontendMetaClient: Send + Sync {
61 async fn try_unregister(&self);
62
63 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
64
65 async fn backup_meta(&self, remarks: Option<String>) -> Result<u64>;
66 async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)>;
67 async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()>;
68
69 async fn recover(&self) -> Result<()>;
70
71 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
72
73 async fn list_table_fragments(
74 &self,
75 table_ids: &[JobId],
76 ) -> Result<HashMap<JobId, TableFragmentInfo>>;
77
78 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
79
80 async fn list_fragment_distribution(
81 &self,
82 include_node: bool,
83 ) -> Result<Vec<FragmentDistribution>>;
84
85 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
86
87 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
88
89 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
90
91 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
92
93 async fn list_sink_log_store_tables(
94 &self,
95 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>>;
96
97 async fn set_system_param(
98 &self,
99 param: String,
100 value: Option<String>,
101 ) -> Result<Option<SystemParamsReader>>;
102
103 async fn get_session_params(&self) -> Result<SessionConfig>;
104
105 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
106
107 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
108
109 async fn get_tables(
110 &self,
111 table_ids: Vec<TableId>,
112 include_dropped_table: bool,
113 ) -> Result<HashMap<TableId, Table>>;
114
115 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>>;
117
118 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
119
120 async fn get_hummock_table_change_log(
121 &self,
122 start_epoch_inclusive: Option<u64>,
123 end_epoch_inclusive: Option<u64>,
124 table_ids: Option<HashSet<TableId>>,
125 exclude_empty: bool,
126 limit: Option<u32>,
127 ) -> Result<TableChangeLogs>;
128
129 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
130
131 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
132
133 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
134
135 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
136
137 async fn list_hummock_active_write_limits(
138 &self,
139 ) -> Result<HashMap<CompactionGroupId, WriteLimit>>;
140
141 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
142
143 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
144 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
145
146 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
147
148 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
149
150 async fn apply_throttle(
151 &self,
152 throttle_target: PbThrottleTarget,
153 throttle_type: risingwave_pb::common::PbThrottleType,
154 id: u32,
155 rate_limit: Option<u32>,
156 ) -> Result<()>;
157
158 async fn alter_fragment_parallelism(
159 &self,
160 fragment_ids: Vec<FragmentId>,
161 parallelism: Option<PbTableParallelism>,
162 ) -> Result<()>;
163
164 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
165
166 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
167
168 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
169
170 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
171
172 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
173
174 async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>>;
175
176 async fn get_meta_store_endpoint(&self) -> Result<String>;
177
178 async fn alter_sink_props(
179 &self,
180 sink_id: SinkId,
181 changed_props: BTreeMap<String, String>,
182 changed_secret_refs: BTreeMap<String, PbSecretRef>,
183 connector_conn_ref: Option<ConnectionId>,
184 ) -> Result<()>;
185
186 async fn alter_iceberg_table_props(
187 &self,
188 table_id: TableId,
189 sink_id: SinkId,
190 source_id: SourceId,
191 changed_props: BTreeMap<String, String>,
192 changed_secret_refs: BTreeMap<String, PbSecretRef>,
193 connector_conn_ref: Option<ConnectionId>,
194 ) -> Result<()>;
195
196 async fn alter_source_connector_props(
197 &self,
198 source_id: SourceId,
199 changed_props: BTreeMap<String, String>,
200 changed_secret_refs: BTreeMap<String, PbSecretRef>,
201 connector_conn_ref: Option<ConnectionId>,
202 ) -> Result<()>;
203
204 async fn alter_connection_connector_props(
205 &self,
206 connection_id: u32,
207 changed_props: BTreeMap<String, String>,
208 changed_secret_refs: BTreeMap<String, PbSecretRef>,
209 ) -> Result<()>;
210
211 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
212
213 async fn get_fragment_by_id(
214 &self,
215 fragment_id: FragmentId,
216 ) -> Result<Option<FragmentDistribution>>;
217
218 async fn get_fragment_vnodes(
219 &self,
220 fragment_id: FragmentId,
221 ) -> Result<Vec<(ActorId, Vec<u32>)>>;
222
223 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
224
225 fn worker_id(&self) -> WorkerId;
226
227 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
228
229 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
230
231 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
232
233 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
234
235 fn cluster_id(&self) -> &str;
236
237 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
238}
239
240pub struct FrontendMetaClientImpl(pub MetaClient);
241
242#[async_trait::async_trait]
243impl FrontendMetaClient for FrontendMetaClientImpl {
244 async fn try_unregister(&self) {
245 self.0.try_unregister().await;
246 }
247
248 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
249 self.0.flush(database_id).await
250 }
251
252 async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
253 self.0.backup_meta(remarks).await
254 }
255
256 async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
257 self.0.get_backup_job_status(job_id).await
258 }
259
260 async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
261 self.0.delete_meta_snapshot(snapshot_ids).await
262 }
263
264 async fn recover(&self) -> Result<()> {
265 self.0.recover().await
266 }
267
268 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
269 self.0.cancel_creating_jobs(infos).await
270 }
271
272 async fn list_table_fragments(
273 &self,
274 job_ids: &[JobId],
275 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
276 self.0.list_table_fragments(job_ids).await
277 }
278
279 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
280 self.0.list_streaming_job_states().await
281 }
282
283 async fn list_fragment_distribution(
284 &self,
285 include_node: bool,
286 ) -> Result<Vec<FragmentDistribution>> {
287 self.0.list_fragment_distributions(include_node).await
288 }
289
290 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
291 self.0.list_creating_fragment_distribution().await
292 }
293
294 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
295 self.0.list_actor_states().await
296 }
297
298 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
299 self.0.list_actor_splits().await
300 }
301
302 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
303 let manifest = self.0.get_meta_snapshot_manifest().await?;
304 Ok(manifest.snapshot_metadata)
305 }
306
307 async fn list_sink_log_store_tables(
308 &self,
309 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
310 self.0.list_sink_log_store_tables().await
311 }
312
313 async fn set_system_param(
314 &self,
315 param: String,
316 value: Option<String>,
317 ) -> Result<Option<SystemParamsReader>> {
318 self.0.set_system_param(param, value).await
319 }
320
321 async fn get_session_params(&self) -> Result<SessionConfig> {
322 let session_config: SessionConfig =
323 serde_json::from_str(&self.0.get_session_params().await?)
324 .context("failed to parse session config")?;
325 Ok(session_config)
326 }
327
328 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
329 self.0.set_session_param(param, value).await
330 }
331
332 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
333 let ddl_progress = self.0.get_ddl_progress().await?;
334 Ok(ddl_progress)
335 }
336
337 async fn get_tables(
338 &self,
339 table_ids: Vec<TableId>,
340 include_dropped_tables: bool,
341 ) -> Result<HashMap<TableId, Table>> {
342 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
343 Ok(tables)
344 }
345
346 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>> {
347 let pinned_versions = self
348 .0
349 .risectl_get_pinned_versions_summary()
350 .await?
351 .summary
352 .unwrap()
353 .pinned_versions;
354 let ret = pinned_versions
355 .into_iter()
356 .map(|v| (v.context_id, v.min_pinned_id))
357 .collect();
358 Ok(ret)
359 }
360
361 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
362 self.0.get_current_version().await
363 }
364
365 async fn get_hummock_table_change_log(
366 &self,
367 start_epoch_inclusive: Option<u64>,
368 end_epoch_inclusive: Option<u64>,
369 table_ids: Option<HashSet<TableId>>,
370 exclude_empty: bool,
371 limit: Option<u32>,
372 ) -> Result<TableChangeLogs> {
373 self.0
374 .get_table_change_logs(
375 true,
376 start_epoch_inclusive,
377 end_epoch_inclusive,
378 table_ids,
379 exclude_empty,
380 limit,
381 )
382 .await
383 }
384
385 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
386 self.0
387 .risectl_get_checkpoint_hummock_version()
388 .await
389 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
390 }
391
392 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
393 self.0
395 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
396 .await
397 }
398
399 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
400 self.0.list_branched_object().await
401 }
402
403 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
404 self.0.risectl_list_compaction_group().await
405 }
406
407 async fn list_hummock_active_write_limits(
408 &self,
409 ) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
410 self.0.list_active_write_limit().await
411 }
412
413 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
414 self.0.list_hummock_meta_config().await
415 }
416
417 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
418 self.0.list_event_log().await
419 }
420
421 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
422 self.0.list_compact_task_assignment().await
423 }
424
425 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
426 self.0.list_worker_nodes(None).await
427 }
428
429 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
430 self.0.list_compact_task_progress().await
431 }
432
433 async fn apply_throttle(
434 &self,
435 throttle_target: PbThrottleTarget,
436 throttle_type: risingwave_pb::common::PbThrottleType,
437 id: u32,
438 rate_limit: Option<u32>,
439 ) -> Result<()> {
440 self.0
441 .apply_throttle(throttle_target, throttle_type, id, rate_limit)
442 .await
443 .map(|_| ())
444 }
445
446 async fn alter_fragment_parallelism(
447 &self,
448 fragment_ids: Vec<FragmentId>,
449 parallelism: Option<PbTableParallelism>,
450 ) -> Result<()> {
451 self.0
452 .alter_fragment_parallelism(fragment_ids, parallelism)
453 .await
454 }
455
456 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
457 self.0.get_cluster_recovery_status().await
458 }
459
460 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
461 self.0.get_cluster_limits().await
462 }
463
464 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
465 self.0.list_rate_limits().await
466 }
467
468 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
469 self.0.list_cdc_progress().await
470 }
471
472 async fn get_meta_store_endpoint(&self) -> Result<String> {
473 self.0.get_meta_store_endpoint().await
474 }
475
476 async fn alter_sink_props(
477 &self,
478 sink_id: SinkId,
479 changed_props: BTreeMap<String, String>,
480 changed_secret_refs: BTreeMap<String, PbSecretRef>,
481 connector_conn_ref: Option<ConnectionId>,
482 ) -> Result<()> {
483 self.0
484 .alter_sink_props(
485 sink_id,
486 changed_props,
487 changed_secret_refs,
488 connector_conn_ref,
489 )
490 .await
491 }
492
493 async fn alter_iceberg_table_props(
494 &self,
495 table_id: TableId,
496 sink_id: SinkId,
497 source_id: SourceId,
498 changed_props: BTreeMap<String, String>,
499 changed_secret_refs: BTreeMap<String, PbSecretRef>,
500 connector_conn_ref: Option<ConnectionId>,
501 ) -> Result<()> {
502 self.0
503 .alter_iceberg_table_props(
504 table_id,
505 sink_id,
506 source_id,
507 changed_props,
508 changed_secret_refs,
509 connector_conn_ref,
510 )
511 .await
512 }
513
514 async fn alter_source_connector_props(
515 &self,
516 source_id: SourceId,
517 changed_props: BTreeMap<String, String>,
518 changed_secret_refs: BTreeMap<String, PbSecretRef>,
519 connector_conn_ref: Option<ConnectionId>,
520 ) -> Result<()> {
521 self.0
522 .alter_source_connector_props(
523 source_id,
524 changed_props,
525 changed_secret_refs,
526 connector_conn_ref,
527 )
528 .await
529 }
530
531 async fn alter_connection_connector_props(
532 &self,
533 connection_id: u32,
534 changed_props: BTreeMap<String, String>,
535 changed_secret_refs: BTreeMap<String, PbSecretRef>,
536 ) -> Result<()> {
537 self.0
538 .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
539 .await
540 }
541
542 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
543 self.0.list_hosted_iceberg_tables().await
544 }
545
546 async fn get_fragment_by_id(
547 &self,
548 fragment_id: FragmentId,
549 ) -> Result<Option<FragmentDistribution>> {
550 self.0.get_fragment_by_id(fragment_id).await
551 }
552
553 async fn get_fragment_vnodes(
554 &self,
555 fragment_id: FragmentId,
556 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
557 self.0.get_fragment_vnodes(fragment_id).await
558 }
559
560 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
561 self.0.get_actor_vnodes(actor_id).await
562 }
563
564 fn worker_id(&self) -> WorkerId {
565 self.0.worker_id()
566 }
567
568 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
569 self.0.set_sync_log_store_aligned(job_id, aligned).await
570 }
571
572 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
573 self.0.compact_iceberg_table(sink_id).await
574 }
575
576 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
577 self.0.expire_iceberg_table_snapshots(sink_id).await
578 }
579
580 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
581 self.0.refresh(request).await
582 }
583
584 fn cluster_id(&self) -> &str {
585 self.0.cluster_id()
586 }
587
588 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
589 self.0.list_unmigrated_tables().await
590 }
591
592 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
593 self.0.list_refresh_table_states().await
594 }
595
596 async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>> {
597 self.0.list_iceberg_compaction_status().await
598 }
599}