1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
38use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
39use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
40use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
41use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
42use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
43use risingwave_pb::meta::{
44 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
45 RefreshRequest, RefreshResponse, list_sink_log_store_tables_response,
46};
47use risingwave_pb::secret::PbSecretRef;
48use risingwave_rpc_client::error::Result;
49use risingwave_rpc_client::{HummockMetaClient, MetaClient};
50
51use crate::catalog::{DatabaseId, FragmentId, SinkId};
52
53#[async_trait::async_trait]
59pub trait FrontendMetaClient: Send + Sync {
60 async fn try_unregister(&self);
61
62 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
63
64 async fn recover(&self) -> Result<()>;
65
66 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
67
68 async fn list_table_fragments(
69 &self,
70 table_ids: &[JobId],
71 ) -> Result<HashMap<JobId, TableFragmentInfo>>;
72
73 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
74
75 async fn list_fragment_distribution(
76 &self,
77 include_node: bool,
78 ) -> Result<Vec<FragmentDistribution>>;
79
80 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
81
82 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
83
84 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
85
86 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
87
88 async fn list_sink_log_store_tables(
89 &self,
90 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>>;
91
92 async fn set_system_param(
93 &self,
94 param: String,
95 value: Option<String>,
96 ) -> Result<Option<SystemParamsReader>>;
97
98 async fn get_session_params(&self) -> Result<SessionConfig>;
99
100 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
101
102 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
103
104 async fn get_tables(
105 &self,
106 table_ids: Vec<TableId>,
107 include_dropped_table: bool,
108 ) -> Result<HashMap<TableId, Table>>;
109
110 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>>;
112
113 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
114
115 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
116
117 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
118
119 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
120
121 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
122
123 async fn list_hummock_active_write_limits(
124 &self,
125 ) -> Result<HashMap<CompactionGroupId, WriteLimit>>;
126
127 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
128
129 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
130 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
131
132 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
133
134 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
135
136 async fn apply_throttle(
137 &self,
138 throttle_target: PbThrottleTarget,
139 throttle_type: risingwave_pb::common::PbThrottleType,
140 id: u32,
141 rate_limit: Option<u32>,
142 ) -> Result<()>;
143
144 async fn alter_fragment_parallelism(
145 &self,
146 fragment_ids: Vec<FragmentId>,
147 parallelism: Option<PbTableParallelism>,
148 ) -> Result<()>;
149
150 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
151
152 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
153
154 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
155
156 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
157
158 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
159
160 async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>>;
161
162 async fn get_meta_store_endpoint(&self) -> Result<String>;
163
164 async fn alter_sink_props(
165 &self,
166 sink_id: SinkId,
167 changed_props: BTreeMap<String, String>,
168 changed_secret_refs: BTreeMap<String, PbSecretRef>,
169 connector_conn_ref: Option<ConnectionId>,
170 ) -> Result<()>;
171
172 async fn alter_iceberg_table_props(
173 &self,
174 table_id: TableId,
175 sink_id: SinkId,
176 source_id: SourceId,
177 changed_props: BTreeMap<String, String>,
178 changed_secret_refs: BTreeMap<String, PbSecretRef>,
179 connector_conn_ref: Option<ConnectionId>,
180 ) -> Result<()>;
181
182 async fn alter_source_connector_props(
183 &self,
184 source_id: SourceId,
185 changed_props: BTreeMap<String, String>,
186 changed_secret_refs: BTreeMap<String, PbSecretRef>,
187 connector_conn_ref: Option<ConnectionId>,
188 ) -> Result<()>;
189
190 async fn alter_connection_connector_props(
191 &self,
192 connection_id: u32,
193 changed_props: BTreeMap<String, String>,
194 changed_secret_refs: BTreeMap<String, PbSecretRef>,
195 ) -> Result<()>;
196
197 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
198
199 async fn get_fragment_by_id(
200 &self,
201 fragment_id: FragmentId,
202 ) -> Result<Option<FragmentDistribution>>;
203
204 async fn get_fragment_vnodes(
205 &self,
206 fragment_id: FragmentId,
207 ) -> Result<Vec<(ActorId, Vec<u32>)>>;
208
209 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
210
211 fn worker_id(&self) -> WorkerId;
212
213 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
214
215 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
216
217 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
218
219 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
220
221 fn cluster_id(&self) -> &str;
222
223 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
224}
225
226pub struct FrontendMetaClientImpl(pub MetaClient);
227
228#[async_trait::async_trait]
229impl FrontendMetaClient for FrontendMetaClientImpl {
230 async fn try_unregister(&self) {
231 self.0.try_unregister().await;
232 }
233
234 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
235 self.0.flush(database_id).await
236 }
237
238 async fn recover(&self) -> Result<()> {
239 self.0.recover().await
240 }
241
242 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
243 self.0.cancel_creating_jobs(infos).await
244 }
245
246 async fn list_table_fragments(
247 &self,
248 job_ids: &[JobId],
249 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
250 self.0.list_table_fragments(job_ids).await
251 }
252
253 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
254 self.0.list_streaming_job_states().await
255 }
256
257 async fn list_fragment_distribution(
258 &self,
259 include_node: bool,
260 ) -> Result<Vec<FragmentDistribution>> {
261 self.0.list_fragment_distributions(include_node).await
262 }
263
264 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
265 self.0.list_creating_fragment_distribution().await
266 }
267
268 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
269 self.0.list_actor_states().await
270 }
271
272 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
273 self.0.list_actor_splits().await
274 }
275
276 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
277 let manifest = self.0.get_meta_snapshot_manifest().await?;
278 Ok(manifest.snapshot_metadata)
279 }
280
281 async fn list_sink_log_store_tables(
282 &self,
283 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
284 self.0.list_sink_log_store_tables().await
285 }
286
287 async fn set_system_param(
288 &self,
289 param: String,
290 value: Option<String>,
291 ) -> Result<Option<SystemParamsReader>> {
292 self.0.set_system_param(param, value).await
293 }
294
295 async fn get_session_params(&self) -> Result<SessionConfig> {
296 let session_config: SessionConfig =
297 serde_json::from_str(&self.0.get_session_params().await?)
298 .context("failed to parse session config")?;
299 Ok(session_config)
300 }
301
302 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
303 self.0.set_session_param(param, value).await
304 }
305
306 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
307 let ddl_progress = self.0.get_ddl_progress().await?;
308 Ok(ddl_progress)
309 }
310
311 async fn get_tables(
312 &self,
313 table_ids: Vec<TableId>,
314 include_dropped_tables: bool,
315 ) -> Result<HashMap<TableId, Table>> {
316 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
317 Ok(tables)
318 }
319
320 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>> {
321 let pinned_versions = self
322 .0
323 .risectl_get_pinned_versions_summary()
324 .await?
325 .summary
326 .unwrap()
327 .pinned_versions;
328 let ret = pinned_versions
329 .into_iter()
330 .map(|v| (v.context_id, v.min_pinned_id))
331 .collect();
332 Ok(ret)
333 }
334
335 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
336 self.0.get_current_version().await
337 }
338
339 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
340 self.0
341 .risectl_get_checkpoint_hummock_version()
342 .await
343 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
344 }
345
346 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
347 self.0
349 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
350 .await
351 }
352
353 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
354 self.0.list_branched_object().await
355 }
356
357 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
358 self.0.risectl_list_compaction_group().await
359 }
360
361 async fn list_hummock_active_write_limits(
362 &self,
363 ) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
364 self.0.list_active_write_limit().await
365 }
366
367 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
368 self.0.list_hummock_meta_config().await
369 }
370
371 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
372 self.0.list_event_log().await
373 }
374
375 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
376 self.0.list_compact_task_assignment().await
377 }
378
379 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
380 self.0.list_worker_nodes(None).await
381 }
382
383 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
384 self.0.list_compact_task_progress().await
385 }
386
387 async fn apply_throttle(
388 &self,
389 throttle_target: PbThrottleTarget,
390 throttle_type: risingwave_pb::common::PbThrottleType,
391 id: u32,
392 rate_limit: Option<u32>,
393 ) -> Result<()> {
394 self.0
395 .apply_throttle(throttle_target, throttle_type, id, rate_limit)
396 .await
397 .map(|_| ())
398 }
399
400 async fn alter_fragment_parallelism(
401 &self,
402 fragment_ids: Vec<FragmentId>,
403 parallelism: Option<PbTableParallelism>,
404 ) -> Result<()> {
405 self.0
406 .alter_fragment_parallelism(fragment_ids, parallelism)
407 .await
408 }
409
410 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
411 self.0.get_cluster_recovery_status().await
412 }
413
414 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
415 self.0.get_cluster_limits().await
416 }
417
418 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
419 self.0.list_rate_limits().await
420 }
421
422 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
423 self.0.list_cdc_progress().await
424 }
425
426 async fn get_meta_store_endpoint(&self) -> Result<String> {
427 self.0.get_meta_store_endpoint().await
428 }
429
430 async fn alter_sink_props(
431 &self,
432 sink_id: SinkId,
433 changed_props: BTreeMap<String, String>,
434 changed_secret_refs: BTreeMap<String, PbSecretRef>,
435 connector_conn_ref: Option<ConnectionId>,
436 ) -> Result<()> {
437 self.0
438 .alter_sink_props(
439 sink_id,
440 changed_props,
441 changed_secret_refs,
442 connector_conn_ref,
443 )
444 .await
445 }
446
447 async fn alter_iceberg_table_props(
448 &self,
449 table_id: TableId,
450 sink_id: SinkId,
451 source_id: SourceId,
452 changed_props: BTreeMap<String, String>,
453 changed_secret_refs: BTreeMap<String, PbSecretRef>,
454 connector_conn_ref: Option<ConnectionId>,
455 ) -> Result<()> {
456 self.0
457 .alter_iceberg_table_props(
458 table_id,
459 sink_id,
460 source_id,
461 changed_props,
462 changed_secret_refs,
463 connector_conn_ref,
464 )
465 .await
466 }
467
468 async fn alter_source_connector_props(
469 &self,
470 source_id: SourceId,
471 changed_props: BTreeMap<String, String>,
472 changed_secret_refs: BTreeMap<String, PbSecretRef>,
473 connector_conn_ref: Option<ConnectionId>,
474 ) -> Result<()> {
475 self.0
476 .alter_source_connector_props(
477 source_id,
478 changed_props,
479 changed_secret_refs,
480 connector_conn_ref,
481 )
482 .await
483 }
484
485 async fn alter_connection_connector_props(
486 &self,
487 connection_id: u32,
488 changed_props: BTreeMap<String, String>,
489 changed_secret_refs: BTreeMap<String, PbSecretRef>,
490 ) -> Result<()> {
491 self.0
492 .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
493 .await
494 }
495
496 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
497 self.0.list_hosted_iceberg_tables().await
498 }
499
500 async fn get_fragment_by_id(
501 &self,
502 fragment_id: FragmentId,
503 ) -> Result<Option<FragmentDistribution>> {
504 self.0.get_fragment_by_id(fragment_id).await
505 }
506
507 async fn get_fragment_vnodes(
508 &self,
509 fragment_id: FragmentId,
510 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
511 self.0.get_fragment_vnodes(fragment_id).await
512 }
513
514 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
515 self.0.get_actor_vnodes(actor_id).await
516 }
517
518 fn worker_id(&self) -> WorkerId {
519 self.0.worker_id()
520 }
521
522 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
523 self.0.set_sync_log_store_aligned(job_id, aligned).await
524 }
525
526 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
527 self.0.compact_iceberg_table(sink_id).await
528 }
529
530 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
531 self.0.expire_iceberg_table_snapshots(sink_id).await
532 }
533
534 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
535 self.0.refresh(request).await
536 }
537
538 fn cluster_id(&self) -> &str {
539 self.0.cluster_id()
540 }
541
542 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
543 self.0.list_unmigrated_tables().await
544 }
545
546 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
547 self.0.list_refresh_table_states().await
548 }
549
550 async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>> {
551 self.0.list_iceberg_compaction_status().await
552 }
553}