1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
38use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
39use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
40use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
41use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
42use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
43use risingwave_pb::meta::{
44 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
45 RefreshRequest, RefreshResponse, list_sink_log_store_tables_response,
46};
47use risingwave_pb::secret::PbSecretRef;
48use risingwave_rpc_client::error::Result;
49use risingwave_rpc_client::{HummockMetaClient, MetaClient};
50
51use crate::catalog::{DatabaseId, FragmentId, SinkId};
52
53#[async_trait::async_trait]
59pub trait FrontendMetaClient: Send + Sync {
60 async fn try_unregister(&self);
61
62 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
63
64 async fn wait(&self) -> Result<()>;
65
66 async fn recover(&self) -> Result<()>;
67
68 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
69
70 async fn list_table_fragments(
71 &self,
72 table_ids: &[JobId],
73 ) -> Result<HashMap<JobId, TableFragmentInfo>>;
74
75 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
76
77 async fn list_fragment_distribution(
78 &self,
79 include_node: bool,
80 ) -> Result<Vec<FragmentDistribution>>;
81
82 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
83
84 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
85
86 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
87
88 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
89
90 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
91
92 async fn list_sink_log_store_tables(
93 &self,
94 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>>;
95
96 async fn set_system_param(
97 &self,
98 param: String,
99 value: Option<String>,
100 ) -> Result<Option<SystemParamsReader>>;
101
102 async fn get_session_params(&self) -> Result<SessionConfig>;
103
104 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
105
106 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
107
108 async fn get_tables(
109 &self,
110 table_ids: Vec<TableId>,
111 include_dropped_table: bool,
112 ) -> Result<HashMap<TableId, Table>>;
113
114 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>>;
116
117 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
118
119 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
120
121 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
122
123 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
124
125 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
126
127 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
128
129 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
130
131 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
132 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
133
134 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
135
136 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
137
138 async fn apply_throttle(
139 &self,
140 throttle_target: PbThrottleTarget,
141 throttle_type: risingwave_pb::common::PbThrottleType,
142 id: u32,
143 rate_limit: Option<u32>,
144 ) -> Result<()>;
145
146 async fn alter_fragment_parallelism(
147 &self,
148 fragment_ids: Vec<FragmentId>,
149 parallelism: Option<PbTableParallelism>,
150 ) -> Result<()>;
151
152 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
153
154 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
155
156 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
157
158 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
159
160 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
161
162 async fn get_meta_store_endpoint(&self) -> Result<String>;
163
164 async fn alter_sink_props(
165 &self,
166 sink_id: SinkId,
167 changed_props: BTreeMap<String, String>,
168 changed_secret_refs: BTreeMap<String, PbSecretRef>,
169 connector_conn_ref: Option<ConnectionId>,
170 ) -> Result<()>;
171
172 async fn alter_iceberg_table_props(
173 &self,
174 table_id: TableId,
175 sink_id: SinkId,
176 source_id: SourceId,
177 changed_props: BTreeMap<String, String>,
178 changed_secret_refs: BTreeMap<String, PbSecretRef>,
179 connector_conn_ref: Option<ConnectionId>,
180 ) -> Result<()>;
181
182 async fn alter_source_connector_props(
183 &self,
184 source_id: SourceId,
185 changed_props: BTreeMap<String, String>,
186 changed_secret_refs: BTreeMap<String, PbSecretRef>,
187 connector_conn_ref: Option<ConnectionId>,
188 ) -> Result<()>;
189
190 async fn alter_connection_connector_props(
191 &self,
192 connection_id: u32,
193 changed_props: BTreeMap<String, String>,
194 changed_secret_refs: BTreeMap<String, PbSecretRef>,
195 ) -> Result<()>;
196
197 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
198
199 async fn get_fragment_by_id(
200 &self,
201 fragment_id: FragmentId,
202 ) -> Result<Option<FragmentDistribution>>;
203
204 async fn get_fragment_vnodes(
205 &self,
206 fragment_id: FragmentId,
207 ) -> Result<Vec<(ActorId, Vec<u32>)>>;
208
209 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
210
211 fn worker_id(&self) -> WorkerId;
212
213 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
214
215 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
216
217 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
218
219 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
220
221 fn cluster_id(&self) -> &str;
222
223 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
224}
225
226pub struct FrontendMetaClientImpl(pub MetaClient);
227
228#[async_trait::async_trait]
229impl FrontendMetaClient for FrontendMetaClientImpl {
230 async fn try_unregister(&self) {
231 self.0.try_unregister().await;
232 }
233
234 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
235 self.0.flush(database_id).await
236 }
237
238 async fn wait(&self) -> Result<()> {
239 self.0.wait().await
240 }
241
242 async fn recover(&self) -> Result<()> {
243 self.0.recover().await
244 }
245
246 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
247 self.0.cancel_creating_jobs(infos).await
248 }
249
250 async fn list_table_fragments(
251 &self,
252 job_ids: &[JobId],
253 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
254 self.0.list_table_fragments(job_ids).await
255 }
256
257 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
258 self.0.list_streaming_job_states().await
259 }
260
261 async fn list_fragment_distribution(
262 &self,
263 include_node: bool,
264 ) -> Result<Vec<FragmentDistribution>> {
265 self.0.list_fragment_distributions(include_node).await
266 }
267
268 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
269 self.0.list_creating_fragment_distribution().await
270 }
271
272 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
273 self.0.list_actor_states().await
274 }
275
276 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
277 self.0.list_actor_splits().await
278 }
279
280 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
281 self.0.list_object_dependencies().await
282 }
283
284 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
285 let manifest = self.0.get_meta_snapshot_manifest().await?;
286 Ok(manifest.snapshot_metadata)
287 }
288
289 async fn list_sink_log_store_tables(
290 &self,
291 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
292 self.0.list_sink_log_store_tables().await
293 }
294
295 async fn set_system_param(
296 &self,
297 param: String,
298 value: Option<String>,
299 ) -> Result<Option<SystemParamsReader>> {
300 self.0.set_system_param(param, value).await
301 }
302
303 async fn get_session_params(&self) -> Result<SessionConfig> {
304 let session_config: SessionConfig =
305 serde_json::from_str(&self.0.get_session_params().await?)
306 .context("failed to parse session config")?;
307 Ok(session_config)
308 }
309
310 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
311 self.0.set_session_param(param, value).await
312 }
313
314 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
315 let ddl_progress = self.0.get_ddl_progress().await?;
316 Ok(ddl_progress)
317 }
318
319 async fn get_tables(
320 &self,
321 table_ids: Vec<TableId>,
322 include_dropped_tables: bool,
323 ) -> Result<HashMap<TableId, Table>> {
324 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
325 Ok(tables)
326 }
327
328 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>> {
329 let pinned_versions = self
330 .0
331 .risectl_get_pinned_versions_summary()
332 .await?
333 .summary
334 .unwrap()
335 .pinned_versions;
336 let ret = pinned_versions
337 .into_iter()
338 .map(|v| (v.context_id, v.min_pinned_id))
339 .collect();
340 Ok(ret)
341 }
342
343 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
344 self.0.get_current_version().await
345 }
346
347 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
348 self.0
349 .risectl_get_checkpoint_hummock_version()
350 .await
351 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
352 }
353
354 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
355 self.0
357 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
358 .await
359 }
360
361 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
362 self.0.list_branched_object().await
363 }
364
365 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
366 self.0.risectl_list_compaction_group().await
367 }
368
369 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
370 self.0.list_active_write_limit().await
371 }
372
373 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
374 self.0.list_hummock_meta_config().await
375 }
376
377 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
378 self.0.list_event_log().await
379 }
380
381 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
382 self.0.list_compact_task_assignment().await
383 }
384
385 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
386 self.0.list_worker_nodes(None).await
387 }
388
389 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
390 self.0.list_compact_task_progress().await
391 }
392
393 async fn apply_throttle(
394 &self,
395 throttle_target: PbThrottleTarget,
396 throttle_type: risingwave_pb::common::PbThrottleType,
397 id: u32,
398 rate_limit: Option<u32>,
399 ) -> Result<()> {
400 self.0
401 .apply_throttle(throttle_target, throttle_type, id, rate_limit)
402 .await
403 .map(|_| ())
404 }
405
406 async fn alter_fragment_parallelism(
407 &self,
408 fragment_ids: Vec<FragmentId>,
409 parallelism: Option<PbTableParallelism>,
410 ) -> Result<()> {
411 self.0
412 .alter_fragment_parallelism(fragment_ids, parallelism)
413 .await
414 }
415
416 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
417 self.0.get_cluster_recovery_status().await
418 }
419
420 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
421 self.0.get_cluster_limits().await
422 }
423
424 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
425 self.0.list_rate_limits().await
426 }
427
428 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
429 self.0.list_cdc_progress().await
430 }
431
432 async fn get_meta_store_endpoint(&self) -> Result<String> {
433 self.0.get_meta_store_endpoint().await
434 }
435
436 async fn alter_sink_props(
437 &self,
438 sink_id: SinkId,
439 changed_props: BTreeMap<String, String>,
440 changed_secret_refs: BTreeMap<String, PbSecretRef>,
441 connector_conn_ref: Option<ConnectionId>,
442 ) -> Result<()> {
443 self.0
444 .alter_sink_props(
445 sink_id,
446 changed_props,
447 changed_secret_refs,
448 connector_conn_ref,
449 )
450 .await
451 }
452
453 async fn alter_iceberg_table_props(
454 &self,
455 table_id: TableId,
456 sink_id: SinkId,
457 source_id: SourceId,
458 changed_props: BTreeMap<String, String>,
459 changed_secret_refs: BTreeMap<String, PbSecretRef>,
460 connector_conn_ref: Option<ConnectionId>,
461 ) -> Result<()> {
462 self.0
463 .alter_iceberg_table_props(
464 table_id,
465 sink_id,
466 source_id,
467 changed_props,
468 changed_secret_refs,
469 connector_conn_ref,
470 )
471 .await
472 }
473
474 async fn alter_source_connector_props(
475 &self,
476 source_id: SourceId,
477 changed_props: BTreeMap<String, String>,
478 changed_secret_refs: BTreeMap<String, PbSecretRef>,
479 connector_conn_ref: Option<ConnectionId>,
480 ) -> Result<()> {
481 self.0
482 .alter_source_connector_props(
483 source_id,
484 changed_props,
485 changed_secret_refs,
486 connector_conn_ref,
487 )
488 .await
489 }
490
491 async fn alter_connection_connector_props(
492 &self,
493 connection_id: u32,
494 changed_props: BTreeMap<String, String>,
495 changed_secret_refs: BTreeMap<String, PbSecretRef>,
496 ) -> Result<()> {
497 self.0
498 .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
499 .await
500 }
501
502 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
503 self.0.list_hosted_iceberg_tables().await
504 }
505
506 async fn get_fragment_by_id(
507 &self,
508 fragment_id: FragmentId,
509 ) -> Result<Option<FragmentDistribution>> {
510 self.0.get_fragment_by_id(fragment_id).await
511 }
512
513 async fn get_fragment_vnodes(
514 &self,
515 fragment_id: FragmentId,
516 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
517 self.0.get_fragment_vnodes(fragment_id).await
518 }
519
520 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
521 self.0.get_actor_vnodes(actor_id).await
522 }
523
524 fn worker_id(&self) -> WorkerId {
525 self.0.worker_id()
526 }
527
528 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
529 self.0.set_sync_log_store_aligned(job_id, aligned).await
530 }
531
532 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
533 self.0.compact_iceberg_table(sink_id).await
534 }
535
536 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
537 self.0.expire_iceberg_table_snapshots(sink_id).await
538 }
539
540 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
541 self.0.refresh(request).await
542 }
543
544 fn cluster_id(&self) -> &str {
545 self.0.cluster_id()
546 }
547
548 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
549 self.0.list_unmigrated_tables().await
550 }
551
552 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
553 self.0.list_refresh_table_states().await
554 }
555}