1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
38use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
39use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
40use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
41use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
42use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
43use risingwave_pb::meta::{
44 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
45 RefreshRequest, RefreshResponse,
46};
47use risingwave_pb::secret::PbSecretRef;
48use risingwave_rpc_client::error::Result;
49use risingwave_rpc_client::{HummockMetaClient, MetaClient};
50
51use crate::catalog::{DatabaseId, FragmentId, SinkId};
52
53#[async_trait::async_trait]
59pub trait FrontendMetaClient: Send + Sync {
60 async fn try_unregister(&self);
61
62 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
63
64 async fn wait(&self) -> Result<()>;
65
66 async fn recover(&self) -> Result<()>;
67
68 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
69
70 async fn list_table_fragments(
71 &self,
72 table_ids: &[JobId],
73 ) -> Result<HashMap<JobId, TableFragmentInfo>>;
74
75 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
76
77 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
78
79 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
80
81 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
82
83 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
84
85 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
86
87 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
88
89 async fn set_system_param(
90 &self,
91 param: String,
92 value: Option<String>,
93 ) -> Result<Option<SystemParamsReader>>;
94
95 async fn get_session_params(&self) -> Result<SessionConfig>;
96
97 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
98
99 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
100
101 async fn get_tables(
102 &self,
103 table_ids: Vec<TableId>,
104 include_dropped_table: bool,
105 ) -> Result<HashMap<TableId, Table>>;
106
107 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>>;
109
110 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
111
112 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
113
114 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
115
116 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
117
118 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
119
120 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
121
122 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
123
124 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
125 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
126
127 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
128
129 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
130
131 async fn apply_throttle(
132 &self,
133 kind: PbThrottleTarget,
134 id: u32,
135 rate_limit: Option<u32>,
136 ) -> Result<()>;
137
138 async fn alter_fragment_parallelism(
139 &self,
140 fragment_ids: Vec<FragmentId>,
141 parallelism: Option<PbTableParallelism>,
142 ) -> Result<()>;
143
144 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
145
146 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
147
148 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
149
150 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
151
152 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
153
154 async fn get_meta_store_endpoint(&self) -> Result<String>;
155
156 async fn alter_sink_props(
157 &self,
158 sink_id: SinkId,
159 changed_props: BTreeMap<String, String>,
160 changed_secret_refs: BTreeMap<String, PbSecretRef>,
161 connector_conn_ref: Option<ConnectionId>,
162 ) -> Result<()>;
163
164 async fn alter_iceberg_table_props(
165 &self,
166 table_id: TableId,
167 sink_id: SinkId,
168 source_id: SourceId,
169 changed_props: BTreeMap<String, String>,
170 changed_secret_refs: BTreeMap<String, PbSecretRef>,
171 connector_conn_ref: Option<ConnectionId>,
172 ) -> Result<()>;
173
174 async fn alter_source_connector_props(
175 &self,
176 source_id: SourceId,
177 changed_props: BTreeMap<String, String>,
178 changed_secret_refs: BTreeMap<String, PbSecretRef>,
179 connector_conn_ref: Option<ConnectionId>,
180 ) -> Result<()>;
181
182 async fn alter_connection_connector_props(
183 &self,
184 connection_id: u32,
185 changed_props: BTreeMap<String, String>,
186 changed_secret_refs: BTreeMap<String, PbSecretRef>,
187 ) -> Result<()>;
188
189 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
190
191 async fn get_fragment_by_id(
192 &self,
193 fragment_id: FragmentId,
194 ) -> Result<Option<FragmentDistribution>>;
195
196 async fn get_fragment_vnodes(
197 &self,
198 fragment_id: FragmentId,
199 ) -> Result<Vec<(ActorId, Vec<u32>)>>;
200
201 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
202
203 fn worker_id(&self) -> WorkerId;
204
205 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
206
207 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
208
209 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
210
211 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
212
213 fn cluster_id(&self) -> &str;
214
215 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
216}
217
218pub struct FrontendMetaClientImpl(pub MetaClient);
219
220#[async_trait::async_trait]
221impl FrontendMetaClient for FrontendMetaClientImpl {
222 async fn try_unregister(&self) {
223 self.0.try_unregister().await;
224 }
225
226 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
227 self.0.flush(database_id).await
228 }
229
230 async fn wait(&self) -> Result<()> {
231 self.0.wait().await
232 }
233
234 async fn recover(&self) -> Result<()> {
235 self.0.recover().await
236 }
237
238 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
239 self.0.cancel_creating_jobs(infos).await
240 }
241
242 async fn list_table_fragments(
243 &self,
244 job_ids: &[JobId],
245 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
246 self.0.list_table_fragments(job_ids).await
247 }
248
249 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
250 self.0.list_streaming_job_states().await
251 }
252
253 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
254 self.0.list_fragment_distributions().await
255 }
256
257 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
258 self.0.list_creating_fragment_distribution().await
259 }
260
261 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
262 self.0.list_actor_states().await
263 }
264
265 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
266 self.0.list_actor_splits().await
267 }
268
269 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
270 self.0.list_object_dependencies().await
271 }
272
273 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
274 let manifest = self.0.get_meta_snapshot_manifest().await?;
275 Ok(manifest.snapshot_metadata)
276 }
277
278 async fn set_system_param(
279 &self,
280 param: String,
281 value: Option<String>,
282 ) -> Result<Option<SystemParamsReader>> {
283 self.0.set_system_param(param, value).await
284 }
285
286 async fn get_session_params(&self) -> Result<SessionConfig> {
287 let session_config: SessionConfig =
288 serde_json::from_str(&self.0.get_session_params().await?)
289 .context("failed to parse session config")?;
290 Ok(session_config)
291 }
292
293 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
294 self.0.set_session_param(param, value).await
295 }
296
297 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
298 let ddl_progress = self.0.get_ddl_progress().await?;
299 Ok(ddl_progress)
300 }
301
302 async fn get_tables(
303 &self,
304 table_ids: Vec<TableId>,
305 include_dropped_tables: bool,
306 ) -> Result<HashMap<TableId, Table>> {
307 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
308 Ok(tables)
309 }
310
311 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>> {
312 let pinned_versions = self
313 .0
314 .risectl_get_pinned_versions_summary()
315 .await?
316 .summary
317 .unwrap()
318 .pinned_versions;
319 let ret = pinned_versions
320 .into_iter()
321 .map(|v| (v.context_id, v.min_pinned_id))
322 .collect();
323 Ok(ret)
324 }
325
326 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
327 self.0.get_current_version().await
328 }
329
330 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
331 self.0
332 .risectl_get_checkpoint_hummock_version()
333 .await
334 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
335 }
336
337 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
338 self.0
340 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
341 .await
342 }
343
344 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
345 self.0.list_branched_object().await
346 }
347
348 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
349 self.0.risectl_list_compaction_group().await
350 }
351
352 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
353 self.0.list_active_write_limit().await
354 }
355
356 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
357 self.0.list_hummock_meta_config().await
358 }
359
360 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
361 self.0.list_event_log().await
362 }
363
364 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
365 self.0.list_compact_task_assignment().await
366 }
367
368 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
369 self.0.list_worker_nodes(None).await
370 }
371
372 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
373 self.0.list_compact_task_progress().await
374 }
375
376 async fn apply_throttle(
377 &self,
378 kind: PbThrottleTarget,
379 id: u32,
380 rate_limit: Option<u32>,
381 ) -> Result<()> {
382 self.0
383 .apply_throttle(kind, id, rate_limit)
384 .await
385 .map(|_| ())
386 }
387
388 async fn alter_fragment_parallelism(
389 &self,
390 fragment_ids: Vec<FragmentId>,
391 parallelism: Option<PbTableParallelism>,
392 ) -> Result<()> {
393 self.0
394 .alter_fragment_parallelism(fragment_ids, parallelism)
395 .await
396 }
397
398 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
399 self.0.get_cluster_recovery_status().await
400 }
401
402 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
403 self.0.get_cluster_limits().await
404 }
405
406 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
407 self.0.list_rate_limits().await
408 }
409
410 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
411 self.0.list_cdc_progress().await
412 }
413
414 async fn get_meta_store_endpoint(&self) -> Result<String> {
415 self.0.get_meta_store_endpoint().await
416 }
417
418 async fn alter_sink_props(
419 &self,
420 sink_id: SinkId,
421 changed_props: BTreeMap<String, String>,
422 changed_secret_refs: BTreeMap<String, PbSecretRef>,
423 connector_conn_ref: Option<ConnectionId>,
424 ) -> Result<()> {
425 self.0
426 .alter_sink_props(
427 sink_id,
428 changed_props,
429 changed_secret_refs,
430 connector_conn_ref,
431 )
432 .await
433 }
434
435 async fn alter_iceberg_table_props(
436 &self,
437 table_id: TableId,
438 sink_id: SinkId,
439 source_id: SourceId,
440 changed_props: BTreeMap<String, String>,
441 changed_secret_refs: BTreeMap<String, PbSecretRef>,
442 connector_conn_ref: Option<ConnectionId>,
443 ) -> Result<()> {
444 self.0
445 .alter_iceberg_table_props(
446 table_id,
447 sink_id,
448 source_id,
449 changed_props,
450 changed_secret_refs,
451 connector_conn_ref,
452 )
453 .await
454 }
455
456 async fn alter_source_connector_props(
457 &self,
458 source_id: SourceId,
459 changed_props: BTreeMap<String, String>,
460 changed_secret_refs: BTreeMap<String, PbSecretRef>,
461 connector_conn_ref: Option<ConnectionId>,
462 ) -> Result<()> {
463 self.0
464 .alter_source_connector_props(
465 source_id,
466 changed_props,
467 changed_secret_refs,
468 connector_conn_ref,
469 )
470 .await
471 }
472
473 async fn alter_connection_connector_props(
474 &self,
475 connection_id: u32,
476 changed_props: BTreeMap<String, String>,
477 changed_secret_refs: BTreeMap<String, PbSecretRef>,
478 ) -> Result<()> {
479 self.0
480 .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
481 .await
482 }
483
484 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
485 self.0.list_hosted_iceberg_tables().await
486 }
487
488 async fn get_fragment_by_id(
489 &self,
490 fragment_id: FragmentId,
491 ) -> Result<Option<FragmentDistribution>> {
492 self.0.get_fragment_by_id(fragment_id).await
493 }
494
495 async fn get_fragment_vnodes(
496 &self,
497 fragment_id: FragmentId,
498 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
499 self.0.get_fragment_vnodes(fragment_id).await
500 }
501
502 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
503 self.0.get_actor_vnodes(actor_id).await
504 }
505
506 fn worker_id(&self) -> WorkerId {
507 self.0.worker_id()
508 }
509
510 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
511 self.0.set_sync_log_store_aligned(job_id, aligned).await
512 }
513
514 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
515 self.0.compact_iceberg_table(sink_id).await
516 }
517
518 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
519 self.0.expire_iceberg_table_snapshots(sink_id).await
520 }
521
522 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
523 self.0.refresh(request).await
524 }
525
526 fn cluster_id(&self) -> &str {
527 self.0.cluster_id()
528 }
529
530 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
531 self.0.list_unmigrated_tables().await
532 }
533
534 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
535 self.0.list_refresh_table_states().await
536 }
537}