1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
38use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
39use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
40use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
41use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
42use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
43use risingwave_pb::meta::{
44 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
45 RefreshRequest, RefreshResponse,
46};
47use risingwave_pb::secret::PbSecretRef;
48use risingwave_rpc_client::error::Result;
49use risingwave_rpc_client::{HummockMetaClient, MetaClient};
50
51use crate::catalog::{DatabaseId, FragmentId, SinkId};
52
53#[async_trait::async_trait]
59pub trait FrontendMetaClient: Send + Sync {
60 async fn try_unregister(&self);
61
62 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
63
64 async fn wait(&self) -> Result<()>;
65
66 async fn recover(&self) -> Result<()>;
67
68 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
69
70 async fn list_table_fragments(
71 &self,
72 table_ids: &[JobId],
73 ) -> Result<HashMap<JobId, TableFragmentInfo>>;
74
75 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
76
77 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
78
79 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
80
81 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
82
83 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
84
85 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
86
87 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
88
89 async fn set_system_param(
90 &self,
91 param: String,
92 value: Option<String>,
93 ) -> Result<Option<SystemParamsReader>>;
94
95 async fn get_session_params(&self) -> Result<SessionConfig>;
96
97 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
98
99 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
100
101 async fn get_tables(
102 &self,
103 table_ids: Vec<TableId>,
104 include_dropped_table: bool,
105 ) -> Result<HashMap<TableId, Table>>;
106
107 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>>;
109
110 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
111
112 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
113
114 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
115
116 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
117
118 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
119
120 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
121
122 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
123
124 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
125 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
126
127 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
128
129 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
130
131 async fn apply_throttle(
132 &self,
133 throttle_target: PbThrottleTarget,
134 throttle_type: risingwave_pb::common::PbThrottleType,
135 id: u32,
136 rate_limit: Option<u32>,
137 ) -> Result<()>;
138
139 async fn alter_fragment_parallelism(
140 &self,
141 fragment_ids: Vec<FragmentId>,
142 parallelism: Option<PbTableParallelism>,
143 ) -> Result<()>;
144
145 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
146
147 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
148
149 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
150
151 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
152
153 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
154
155 async fn get_meta_store_endpoint(&self) -> Result<String>;
156
157 async fn alter_sink_props(
158 &self,
159 sink_id: SinkId,
160 changed_props: BTreeMap<String, String>,
161 changed_secret_refs: BTreeMap<String, PbSecretRef>,
162 connector_conn_ref: Option<ConnectionId>,
163 ) -> Result<()>;
164
165 async fn alter_iceberg_table_props(
166 &self,
167 table_id: TableId,
168 sink_id: SinkId,
169 source_id: SourceId,
170 changed_props: BTreeMap<String, String>,
171 changed_secret_refs: BTreeMap<String, PbSecretRef>,
172 connector_conn_ref: Option<ConnectionId>,
173 ) -> Result<()>;
174
175 async fn alter_source_connector_props(
176 &self,
177 source_id: SourceId,
178 changed_props: BTreeMap<String, String>,
179 changed_secret_refs: BTreeMap<String, PbSecretRef>,
180 connector_conn_ref: Option<ConnectionId>,
181 ) -> Result<()>;
182
183 async fn alter_connection_connector_props(
184 &self,
185 connection_id: u32,
186 changed_props: BTreeMap<String, String>,
187 changed_secret_refs: BTreeMap<String, PbSecretRef>,
188 ) -> Result<()>;
189
190 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
191
192 async fn get_fragment_by_id(
193 &self,
194 fragment_id: FragmentId,
195 ) -> Result<Option<FragmentDistribution>>;
196
197 async fn get_fragment_vnodes(
198 &self,
199 fragment_id: FragmentId,
200 ) -> Result<Vec<(ActorId, Vec<u32>)>>;
201
202 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
203
204 fn worker_id(&self) -> WorkerId;
205
206 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
207
208 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
209
210 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
211
212 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
213
214 fn cluster_id(&self) -> &str;
215
216 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
217}
218
219pub struct FrontendMetaClientImpl(pub MetaClient);
220
221#[async_trait::async_trait]
222impl FrontendMetaClient for FrontendMetaClientImpl {
223 async fn try_unregister(&self) {
224 self.0.try_unregister().await;
225 }
226
227 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
228 self.0.flush(database_id).await
229 }
230
231 async fn wait(&self) -> Result<()> {
232 self.0.wait().await
233 }
234
235 async fn recover(&self) -> Result<()> {
236 self.0.recover().await
237 }
238
239 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
240 self.0.cancel_creating_jobs(infos).await
241 }
242
243 async fn list_table_fragments(
244 &self,
245 job_ids: &[JobId],
246 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
247 self.0.list_table_fragments(job_ids).await
248 }
249
250 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
251 self.0.list_streaming_job_states().await
252 }
253
254 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
255 self.0.list_fragment_distributions().await
256 }
257
258 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
259 self.0.list_creating_fragment_distribution().await
260 }
261
262 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
263 self.0.list_actor_states().await
264 }
265
266 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
267 self.0.list_actor_splits().await
268 }
269
270 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
271 self.0.list_object_dependencies().await
272 }
273
274 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
275 let manifest = self.0.get_meta_snapshot_manifest().await?;
276 Ok(manifest.snapshot_metadata)
277 }
278
279 async fn set_system_param(
280 &self,
281 param: String,
282 value: Option<String>,
283 ) -> Result<Option<SystemParamsReader>> {
284 self.0.set_system_param(param, value).await
285 }
286
287 async fn get_session_params(&self) -> Result<SessionConfig> {
288 let session_config: SessionConfig =
289 serde_json::from_str(&self.0.get_session_params().await?)
290 .context("failed to parse session config")?;
291 Ok(session_config)
292 }
293
294 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
295 self.0.set_session_param(param, value).await
296 }
297
298 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
299 let ddl_progress = self.0.get_ddl_progress().await?;
300 Ok(ddl_progress)
301 }
302
303 async fn get_tables(
304 &self,
305 table_ids: Vec<TableId>,
306 include_dropped_tables: bool,
307 ) -> Result<HashMap<TableId, Table>> {
308 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
309 Ok(tables)
310 }
311
312 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>> {
313 let pinned_versions = self
314 .0
315 .risectl_get_pinned_versions_summary()
316 .await?
317 .summary
318 .unwrap()
319 .pinned_versions;
320 let ret = pinned_versions
321 .into_iter()
322 .map(|v| (v.context_id, v.min_pinned_id))
323 .collect();
324 Ok(ret)
325 }
326
327 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
328 self.0.get_current_version().await
329 }
330
331 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
332 self.0
333 .risectl_get_checkpoint_hummock_version()
334 .await
335 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
336 }
337
338 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
339 self.0
341 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
342 .await
343 }
344
345 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
346 self.0.list_branched_object().await
347 }
348
349 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
350 self.0.risectl_list_compaction_group().await
351 }
352
353 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
354 self.0.list_active_write_limit().await
355 }
356
357 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
358 self.0.list_hummock_meta_config().await
359 }
360
361 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
362 self.0.list_event_log().await
363 }
364
365 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
366 self.0.list_compact_task_assignment().await
367 }
368
369 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
370 self.0.list_worker_nodes(None).await
371 }
372
373 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
374 self.0.list_compact_task_progress().await
375 }
376
377 async fn apply_throttle(
378 &self,
379 throttle_target: PbThrottleTarget,
380 throttle_type: risingwave_pb::common::PbThrottleType,
381 id: u32,
382 rate_limit: Option<u32>,
383 ) -> Result<()> {
384 self.0
385 .apply_throttle(throttle_target, throttle_type, id, rate_limit)
386 .await
387 .map(|_| ())
388 }
389
390 async fn alter_fragment_parallelism(
391 &self,
392 fragment_ids: Vec<FragmentId>,
393 parallelism: Option<PbTableParallelism>,
394 ) -> Result<()> {
395 self.0
396 .alter_fragment_parallelism(fragment_ids, parallelism)
397 .await
398 }
399
400 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
401 self.0.get_cluster_recovery_status().await
402 }
403
404 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
405 self.0.get_cluster_limits().await
406 }
407
408 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
409 self.0.list_rate_limits().await
410 }
411
412 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
413 self.0.list_cdc_progress().await
414 }
415
416 async fn get_meta_store_endpoint(&self) -> Result<String> {
417 self.0.get_meta_store_endpoint().await
418 }
419
420 async fn alter_sink_props(
421 &self,
422 sink_id: SinkId,
423 changed_props: BTreeMap<String, String>,
424 changed_secret_refs: BTreeMap<String, PbSecretRef>,
425 connector_conn_ref: Option<ConnectionId>,
426 ) -> Result<()> {
427 self.0
428 .alter_sink_props(
429 sink_id,
430 changed_props,
431 changed_secret_refs,
432 connector_conn_ref,
433 )
434 .await
435 }
436
437 async fn alter_iceberg_table_props(
438 &self,
439 table_id: TableId,
440 sink_id: SinkId,
441 source_id: SourceId,
442 changed_props: BTreeMap<String, String>,
443 changed_secret_refs: BTreeMap<String, PbSecretRef>,
444 connector_conn_ref: Option<ConnectionId>,
445 ) -> Result<()> {
446 self.0
447 .alter_iceberg_table_props(
448 table_id,
449 sink_id,
450 source_id,
451 changed_props,
452 changed_secret_refs,
453 connector_conn_ref,
454 )
455 .await
456 }
457
458 async fn alter_source_connector_props(
459 &self,
460 source_id: SourceId,
461 changed_props: BTreeMap<String, String>,
462 changed_secret_refs: BTreeMap<String, PbSecretRef>,
463 connector_conn_ref: Option<ConnectionId>,
464 ) -> Result<()> {
465 self.0
466 .alter_source_connector_props(
467 source_id,
468 changed_props,
469 changed_secret_refs,
470 connector_conn_ref,
471 )
472 .await
473 }
474
475 async fn alter_connection_connector_props(
476 &self,
477 connection_id: u32,
478 changed_props: BTreeMap<String, String>,
479 changed_secret_refs: BTreeMap<String, PbSecretRef>,
480 ) -> Result<()> {
481 self.0
482 .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
483 .await
484 }
485
486 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
487 self.0.list_hosted_iceberg_tables().await
488 }
489
490 async fn get_fragment_by_id(
491 &self,
492 fragment_id: FragmentId,
493 ) -> Result<Option<FragmentDistribution>> {
494 self.0.get_fragment_by_id(fragment_id).await
495 }
496
497 async fn get_fragment_vnodes(
498 &self,
499 fragment_id: FragmentId,
500 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
501 self.0.get_fragment_vnodes(fragment_id).await
502 }
503
504 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
505 self.0.get_actor_vnodes(actor_id).await
506 }
507
508 fn worker_id(&self) -> WorkerId {
509 self.0.worker_id()
510 }
511
512 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
513 self.0.set_sync_log_store_aligned(job_id, aligned).await
514 }
515
516 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
517 self.0.compact_iceberg_table(sink_id).await
518 }
519
520 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
521 self.0.expire_iceberg_table_snapshots(sink_id).await
522 }
523
524 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
525 self.0.refresh(request).await
526 }
527
528 fn cluster_id(&self) -> &str {
529 self.0.cluster_id()
530 }
531
532 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
533 self.0.list_unmigrated_tables().await
534 }
535
536 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
537 self.0.list_refresh_table_states().await
538 }
539}