1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::change_log::TableChangeLogs;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
25use risingwave_pb::backup_service::{BackupJobStatus, MetaSnapshotMetadata};
26use risingwave_pb::catalog::Table;
27use risingwave_pb::common::WorkerNode;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig as PbMutableConfig;
30use risingwave_pb::hummock::write_limits::WriteLimit;
31use risingwave_pb::hummock::{
32 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
33};
34use risingwave_pb::id::ActorId;
35use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
36use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
37use risingwave_pb::meta::list_actor_states_response::ActorState;
38use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
39use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
40use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
41use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
42use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
43use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
44use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
45use risingwave_pb::meta::{
46 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
47 RefreshRequest, RefreshResponse, list_sink_log_store_tables_response,
48};
49use risingwave_pb::secret::PbSecretRef;
50use risingwave_rpc_client::error::Result;
51use risingwave_rpc_client::{HummockMetaClient, MetaClient};
52
53use crate::catalog::{DatabaseId, FragmentId, SinkId};
54
55#[async_trait::async_trait]
61pub trait FrontendMetaClient: Send + Sync {
62 async fn try_unregister(&self);
63
64 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
65
66 async fn backup_meta(&self, remarks: Option<String>) -> Result<u64>;
67 async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)>;
68 async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()>;
69
70 async fn recover(&self) -> Result<()>;
71
72 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
73
74 async fn list_table_fragments(
75 &self,
76 table_ids: &[JobId],
77 ) -> Result<HashMap<JobId, TableFragmentInfo>>;
78
79 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
80
81 async fn list_fragment_distribution(
82 &self,
83 include_node: bool,
84 ) -> Result<Vec<FragmentDistribution>>;
85
86 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
87
88 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
89
90 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
91
92 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
93
94 async fn list_sink_log_store_tables(
95 &self,
96 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>>;
97
98 async fn set_system_param(
99 &self,
100 param: String,
101 value: Option<String>,
102 ) -> Result<Option<SystemParamsReader>>;
103
104 async fn get_session_params(&self) -> Result<SessionConfig>;
105
106 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
107
108 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
109
110 async fn get_tables(
111 &self,
112 table_ids: Vec<TableId>,
113 include_dropped_table: bool,
114 ) -> Result<HashMap<TableId, Table>>;
115
116 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>>;
118
119 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
120
121 async fn get_hummock_table_change_log(
122 &self,
123 start_epoch_inclusive: Option<u64>,
124 end_epoch_inclusive: Option<u64>,
125 table_ids: Option<HashSet<TableId>>,
126 exclude_empty: bool,
127 limit: Option<u32>,
128 ) -> Result<TableChangeLogs>;
129
130 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
131
132 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
133
134 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
135
136 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
137
138 async fn list_hummock_active_write_limits(
139 &self,
140 ) -> Result<HashMap<CompactionGroupId, WriteLimit>>;
141
142 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
143
144 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
145 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
146
147 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
148
149 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
150
151 async fn apply_throttle(
152 &self,
153 throttle_target: PbThrottleTarget,
154 throttle_type: risingwave_pb::common::PbThrottleType,
155 id: u32,
156 rate_limit: Option<u32>,
157 ) -> Result<()>;
158
159 async fn alter_fragment_parallelism(
160 &self,
161 fragment_ids: Vec<FragmentId>,
162 parallelism: Option<PbTableParallelism>,
163 ) -> Result<()>;
164
165 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
166
167 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
168
169 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
170
171 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
172
173 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
174
175 async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>>;
176
177 async fn get_meta_store_endpoint(&self) -> Result<String>;
178
179 async fn alter_sink_props(
180 &self,
181 sink_id: SinkId,
182 changed_props: BTreeMap<String, String>,
183 changed_secret_refs: BTreeMap<String, PbSecretRef>,
184 connector_conn_ref: Option<ConnectionId>,
185 ) -> Result<()>;
186
187 async fn alter_iceberg_table_props(
188 &self,
189 table_id: TableId,
190 sink_id: SinkId,
191 source_id: SourceId,
192 changed_props: BTreeMap<String, String>,
193 changed_secret_refs: BTreeMap<String, PbSecretRef>,
194 connector_conn_ref: Option<ConnectionId>,
195 ) -> Result<()>;
196
197 async fn alter_source_connector_props(
198 &self,
199 source_id: SourceId,
200 changed_props: BTreeMap<String, String>,
201 changed_secret_refs: BTreeMap<String, PbSecretRef>,
202 connector_conn_ref: Option<ConnectionId>,
203 ) -> Result<()>;
204
205 async fn alter_connection_connector_props(
206 &self,
207 connection_id: u32,
208 changed_props: BTreeMap<String, String>,
209 changed_secret_refs: BTreeMap<String, PbSecretRef>,
210 ) -> Result<()>;
211
212 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
213
214 async fn get_fragment_by_id(
215 &self,
216 fragment_id: FragmentId,
217 ) -> Result<Option<FragmentDistribution>>;
218
219 async fn get_fragment_vnodes(
220 &self,
221 fragment_id: FragmentId,
222 ) -> Result<Vec<(ActorId, Vec<u32>)>>;
223
224 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
225
226 fn worker_id(&self) -> WorkerId;
227
228 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
229
230 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
231
232 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
233
234 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
235
236 fn cluster_id(&self) -> &str;
237
238 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
239
240 async fn update_compaction_config(
241 &self,
242 compaction_group_ids: Vec<CompactionGroupId>,
243 configs: Vec<PbMutableConfig>,
244 ) -> Result<()>;
245}
246
247pub struct FrontendMetaClientImpl(pub MetaClient);
248
249#[async_trait::async_trait]
250impl FrontendMetaClient for FrontendMetaClientImpl {
251 async fn try_unregister(&self) {
252 self.0.try_unregister().await;
253 }
254
255 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
256 self.0.flush(database_id).await
257 }
258
259 async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
260 self.0.backup_meta(remarks).await
261 }
262
263 async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
264 self.0.get_backup_job_status(job_id).await
265 }
266
267 async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
268 self.0.delete_meta_snapshot(snapshot_ids).await
269 }
270
271 async fn recover(&self) -> Result<()> {
272 self.0.recover().await
273 }
274
275 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
276 self.0.cancel_creating_jobs(infos).await
277 }
278
279 async fn list_table_fragments(
280 &self,
281 job_ids: &[JobId],
282 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
283 self.0.list_table_fragments(job_ids).await
284 }
285
286 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
287 self.0.list_streaming_job_states().await
288 }
289
290 async fn list_fragment_distribution(
291 &self,
292 include_node: bool,
293 ) -> Result<Vec<FragmentDistribution>> {
294 self.0.list_fragment_distributions(include_node).await
295 }
296
297 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
298 self.0.list_creating_fragment_distribution().await
299 }
300
301 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
302 self.0.list_actor_states().await
303 }
304
305 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
306 self.0.list_actor_splits().await
307 }
308
309 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
310 let manifest = self.0.get_meta_snapshot_manifest().await?;
311 Ok(manifest.snapshot_metadata)
312 }
313
314 async fn list_sink_log_store_tables(
315 &self,
316 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
317 self.0.list_sink_log_store_tables().await
318 }
319
320 async fn set_system_param(
321 &self,
322 param: String,
323 value: Option<String>,
324 ) -> Result<Option<SystemParamsReader>> {
325 self.0.set_system_param(param, value).await
326 }
327
328 async fn get_session_params(&self) -> Result<SessionConfig> {
329 let session_config: SessionConfig =
330 serde_json::from_str(&self.0.get_session_params().await?)
331 .context("failed to parse session config")?;
332 Ok(session_config)
333 }
334
335 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
336 self.0.set_session_param(param, value).await
337 }
338
339 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
340 let ddl_progress = self.0.get_ddl_progress().await?;
341 Ok(ddl_progress)
342 }
343
344 async fn get_tables(
345 &self,
346 table_ids: Vec<TableId>,
347 include_dropped_tables: bool,
348 ) -> Result<HashMap<TableId, Table>> {
349 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
350 Ok(tables)
351 }
352
353 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>> {
354 let pinned_versions = self
355 .0
356 .risectl_get_pinned_versions_summary()
357 .await?
358 .summary
359 .unwrap()
360 .pinned_versions;
361 let ret = pinned_versions
362 .into_iter()
363 .map(|v| (v.context_id, v.min_pinned_id))
364 .collect();
365 Ok(ret)
366 }
367
368 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
369 self.0.get_current_version().await
370 }
371
372 async fn get_hummock_table_change_log(
373 &self,
374 start_epoch_inclusive: Option<u64>,
375 end_epoch_inclusive: Option<u64>,
376 table_ids: Option<HashSet<TableId>>,
377 exclude_empty: bool,
378 limit: Option<u32>,
379 ) -> Result<TableChangeLogs> {
380 self.0
381 .get_table_change_logs(
382 true,
383 start_epoch_inclusive,
384 end_epoch_inclusive,
385 table_ids,
386 exclude_empty,
387 limit,
388 )
389 .await
390 }
391
392 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
393 self.0
394 .risectl_get_checkpoint_hummock_version()
395 .await
396 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
397 }
398
399 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
400 self.0
402 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
403 .await
404 }
405
406 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
407 self.0.list_branched_object().await
408 }
409
410 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
411 self.0.risectl_list_compaction_group().await
412 }
413
414 async fn list_hummock_active_write_limits(
415 &self,
416 ) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
417 self.0.list_active_write_limit().await
418 }
419
420 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
421 self.0.list_hummock_meta_config().await
422 }
423
424 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
425 self.0.list_event_log().await
426 }
427
428 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
429 self.0.list_compact_task_assignment().await
430 }
431
432 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
433 self.0.list_worker_nodes(None).await
434 }
435
436 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
437 self.0.list_compact_task_progress().await
438 }
439
440 async fn apply_throttle(
441 &self,
442 throttle_target: PbThrottleTarget,
443 throttle_type: risingwave_pb::common::PbThrottleType,
444 id: u32,
445 rate_limit: Option<u32>,
446 ) -> Result<()> {
447 self.0
448 .apply_throttle(throttle_target, throttle_type, id, rate_limit)
449 .await
450 .map(|_| ())
451 }
452
453 async fn alter_fragment_parallelism(
454 &self,
455 fragment_ids: Vec<FragmentId>,
456 parallelism: Option<PbTableParallelism>,
457 ) -> Result<()> {
458 self.0
459 .alter_fragment_parallelism(fragment_ids, parallelism)
460 .await
461 }
462
463 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
464 self.0.get_cluster_recovery_status().await
465 }
466
467 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
468 self.0.get_cluster_limits().await
469 }
470
471 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
472 self.0.list_rate_limits().await
473 }
474
475 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
476 self.0.list_cdc_progress().await
477 }
478
479 async fn get_meta_store_endpoint(&self) -> Result<String> {
480 self.0.get_meta_store_endpoint().await
481 }
482
483 async fn alter_sink_props(
484 &self,
485 sink_id: SinkId,
486 changed_props: BTreeMap<String, String>,
487 changed_secret_refs: BTreeMap<String, PbSecretRef>,
488 connector_conn_ref: Option<ConnectionId>,
489 ) -> Result<()> {
490 self.0
491 .alter_sink_props(
492 sink_id,
493 changed_props,
494 changed_secret_refs,
495 connector_conn_ref,
496 )
497 .await
498 }
499
500 async fn alter_iceberg_table_props(
501 &self,
502 table_id: TableId,
503 sink_id: SinkId,
504 source_id: SourceId,
505 changed_props: BTreeMap<String, String>,
506 changed_secret_refs: BTreeMap<String, PbSecretRef>,
507 connector_conn_ref: Option<ConnectionId>,
508 ) -> Result<()> {
509 self.0
510 .alter_iceberg_table_props(
511 table_id,
512 sink_id,
513 source_id,
514 changed_props,
515 changed_secret_refs,
516 connector_conn_ref,
517 )
518 .await
519 }
520
521 async fn alter_source_connector_props(
522 &self,
523 source_id: SourceId,
524 changed_props: BTreeMap<String, String>,
525 changed_secret_refs: BTreeMap<String, PbSecretRef>,
526 connector_conn_ref: Option<ConnectionId>,
527 ) -> Result<()> {
528 self.0
529 .alter_source_connector_props(
530 source_id,
531 changed_props,
532 changed_secret_refs,
533 connector_conn_ref,
534 )
535 .await
536 }
537
538 async fn alter_connection_connector_props(
539 &self,
540 connection_id: u32,
541 changed_props: BTreeMap<String, String>,
542 changed_secret_refs: BTreeMap<String, PbSecretRef>,
543 ) -> Result<()> {
544 self.0
545 .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
546 .await
547 }
548
549 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
550 self.0.list_hosted_iceberg_tables().await
551 }
552
553 async fn get_fragment_by_id(
554 &self,
555 fragment_id: FragmentId,
556 ) -> Result<Option<FragmentDistribution>> {
557 self.0.get_fragment_by_id(fragment_id).await
558 }
559
560 async fn get_fragment_vnodes(
561 &self,
562 fragment_id: FragmentId,
563 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
564 self.0.get_fragment_vnodes(fragment_id).await
565 }
566
567 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
568 self.0.get_actor_vnodes(actor_id).await
569 }
570
571 fn worker_id(&self) -> WorkerId {
572 self.0.worker_id()
573 }
574
575 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
576 self.0.set_sync_log_store_aligned(job_id, aligned).await
577 }
578
579 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
580 self.0.compact_iceberg_table(sink_id).await
581 }
582
583 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
584 self.0.expire_iceberg_table_snapshots(sink_id).await
585 }
586
587 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
588 self.0.refresh(request).await
589 }
590
591 fn cluster_id(&self) -> &str {
592 self.0.cluster_id()
593 }
594
595 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
596 self.0.list_unmigrated_tables().await
597 }
598
599 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
600 self.0.list_refresh_table_states().await
601 }
602
603 async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>> {
604 self.0.list_iceberg_compaction_status().await
605 }
606
607 async fn update_compaction_config(
608 &self,
609 compaction_group_ids: Vec<CompactionGroupId>,
610 configs: Vec<PbMutableConfig>,
611 ) -> Result<()> {
612 self.0
613 .risectl_update_compaction_config(&compaction_group_ids, &configs)
614 .await
615 }
616}