1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
38use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
39use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
40use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
41use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
42use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
43use risingwave_pb::meta::{
44 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
45 RefreshRequest, RefreshResponse,
46};
47use risingwave_pb::secret::PbSecretRef;
48use risingwave_rpc_client::error::Result;
49use risingwave_rpc_client::{HummockMetaClient, MetaClient};
50
51use crate::catalog::{DatabaseId, FragmentId, SinkId};
52
53#[async_trait::async_trait]
59pub trait FrontendMetaClient: Send + Sync {
60 async fn try_unregister(&self);
61
62 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
63
64 async fn wait(&self) -> Result<()>;
65
66 async fn recover(&self) -> Result<()>;
67
68 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
69
70 async fn list_table_fragments(
71 &self,
72 table_ids: &[JobId],
73 ) -> Result<HashMap<JobId, TableFragmentInfo>>;
74
75 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
76
77 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
78
79 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
80
81 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
82
83 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
84
85 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
86
87 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
88
89 async fn set_system_param(
90 &self,
91 param: String,
92 value: Option<String>,
93 ) -> Result<Option<SystemParamsReader>>;
94
95 async fn get_session_params(&self) -> Result<SessionConfig>;
96
97 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
98
99 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
100
101 async fn get_tables(
102 &self,
103 table_ids: Vec<TableId>,
104 include_dropped_table: bool,
105 ) -> Result<HashMap<TableId, Table>>;
106
107 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>>;
109
110 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
111
112 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
113
114 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
115
116 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
117
118 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
119
120 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
121
122 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
123
124 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
125 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
126
127 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
128
129 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
130
131 async fn apply_throttle(
132 &self,
133 kind: PbThrottleTarget,
134 id: u32,
135 rate_limit: Option<u32>,
136 ) -> Result<()>;
137
138 async fn alter_fragment_parallelism(
139 &self,
140 fragment_ids: Vec<FragmentId>,
141 parallelism: Option<PbTableParallelism>,
142 ) -> Result<()>;
143
144 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
145
146 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
147
148 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
149
150 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
151
152 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
153
154 async fn get_meta_store_endpoint(&self) -> Result<String>;
155
156 async fn alter_sink_props(
157 &self,
158 sink_id: SinkId,
159 changed_props: BTreeMap<String, String>,
160 changed_secret_refs: BTreeMap<String, PbSecretRef>,
161 connector_conn_ref: Option<ConnectionId>,
162 ) -> Result<()>;
163
164 async fn alter_iceberg_table_props(
165 &self,
166 table_id: TableId,
167 sink_id: SinkId,
168 source_id: SourceId,
169 changed_props: BTreeMap<String, String>,
170 changed_secret_refs: BTreeMap<String, PbSecretRef>,
171 connector_conn_ref: Option<ConnectionId>,
172 ) -> Result<()>;
173
174 async fn alter_source_connector_props(
175 &self,
176 source_id: SourceId,
177 changed_props: BTreeMap<String, String>,
178 changed_secret_refs: BTreeMap<String, PbSecretRef>,
179 connector_conn_ref: Option<ConnectionId>,
180 ) -> Result<()>;
181
182 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
183
184 async fn get_fragment_by_id(
185 &self,
186 fragment_id: FragmentId,
187 ) -> Result<Option<FragmentDistribution>>;
188
189 async fn get_fragment_vnodes(
190 &self,
191 fragment_id: FragmentId,
192 ) -> Result<Vec<(ActorId, Vec<u32>)>>;
193
194 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
195
196 fn worker_id(&self) -> WorkerId;
197
198 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
199
200 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
201
202 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
203
204 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
205
206 fn cluster_id(&self) -> &str;
207
208 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
209}
210
211pub struct FrontendMetaClientImpl(pub MetaClient);
212
213#[async_trait::async_trait]
214impl FrontendMetaClient for FrontendMetaClientImpl {
215 async fn try_unregister(&self) {
216 self.0.try_unregister().await;
217 }
218
219 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
220 self.0.flush(database_id).await
221 }
222
223 async fn wait(&self) -> Result<()> {
224 self.0.wait().await
225 }
226
227 async fn recover(&self) -> Result<()> {
228 self.0.recover().await
229 }
230
231 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
232 self.0.cancel_creating_jobs(infos).await
233 }
234
235 async fn list_table_fragments(
236 &self,
237 job_ids: &[JobId],
238 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
239 self.0.list_table_fragments(job_ids).await
240 }
241
242 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
243 self.0.list_streaming_job_states().await
244 }
245
246 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
247 self.0.list_fragment_distributions().await
248 }
249
250 async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
251 self.0.list_creating_fragment_distribution().await
252 }
253
254 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
255 self.0.list_actor_states().await
256 }
257
258 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
259 self.0.list_actor_splits().await
260 }
261
262 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
263 self.0.list_object_dependencies().await
264 }
265
266 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
267 let manifest = self.0.get_meta_snapshot_manifest().await?;
268 Ok(manifest.snapshot_metadata)
269 }
270
271 async fn set_system_param(
272 &self,
273 param: String,
274 value: Option<String>,
275 ) -> Result<Option<SystemParamsReader>> {
276 self.0.set_system_param(param, value).await
277 }
278
279 async fn get_session_params(&self) -> Result<SessionConfig> {
280 let session_config: SessionConfig =
281 serde_json::from_str(&self.0.get_session_params().await?)
282 .context("failed to parse session config")?;
283 Ok(session_config)
284 }
285
286 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
287 self.0.set_session_param(param, value).await
288 }
289
290 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
291 let ddl_progress = self.0.get_ddl_progress().await?;
292 Ok(ddl_progress)
293 }
294
295 async fn get_tables(
296 &self,
297 table_ids: Vec<TableId>,
298 include_dropped_tables: bool,
299 ) -> Result<HashMap<TableId, Table>> {
300 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
301 Ok(tables)
302 }
303
304 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>> {
305 let pinned_versions = self
306 .0
307 .risectl_get_pinned_versions_summary()
308 .await?
309 .summary
310 .unwrap()
311 .pinned_versions;
312 let ret = pinned_versions
313 .into_iter()
314 .map(|v| (v.context_id, v.min_pinned_id))
315 .collect();
316 Ok(ret)
317 }
318
319 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
320 self.0.get_current_version().await
321 }
322
323 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
324 self.0
325 .risectl_get_checkpoint_hummock_version()
326 .await
327 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
328 }
329
330 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
331 self.0
333 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
334 .await
335 }
336
337 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
338 self.0.list_branched_object().await
339 }
340
341 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
342 self.0.risectl_list_compaction_group().await
343 }
344
345 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
346 self.0.list_active_write_limit().await
347 }
348
349 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
350 self.0.list_hummock_meta_config().await
351 }
352
353 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
354 self.0.list_event_log().await
355 }
356
357 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
358 self.0.list_compact_task_assignment().await
359 }
360
361 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
362 self.0.list_worker_nodes(None).await
363 }
364
365 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
366 self.0.list_compact_task_progress().await
367 }
368
369 async fn apply_throttle(
370 &self,
371 kind: PbThrottleTarget,
372 id: u32,
373 rate_limit: Option<u32>,
374 ) -> Result<()> {
375 self.0
376 .apply_throttle(kind, id, rate_limit)
377 .await
378 .map(|_| ())
379 }
380
381 async fn alter_fragment_parallelism(
382 &self,
383 fragment_ids: Vec<FragmentId>,
384 parallelism: Option<PbTableParallelism>,
385 ) -> Result<()> {
386 self.0
387 .alter_fragment_parallelism(fragment_ids, parallelism)
388 .await
389 }
390
391 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
392 self.0.get_cluster_recovery_status().await
393 }
394
395 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
396 self.0.get_cluster_limits().await
397 }
398
399 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
400 self.0.list_rate_limits().await
401 }
402
403 async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
404 self.0.list_cdc_progress().await
405 }
406
407 async fn get_meta_store_endpoint(&self) -> Result<String> {
408 self.0.get_meta_store_endpoint().await
409 }
410
411 async fn alter_sink_props(
412 &self,
413 sink_id: SinkId,
414 changed_props: BTreeMap<String, String>,
415 changed_secret_refs: BTreeMap<String, PbSecretRef>,
416 connector_conn_ref: Option<ConnectionId>,
417 ) -> Result<()> {
418 self.0
419 .alter_sink_props(
420 sink_id,
421 changed_props,
422 changed_secret_refs,
423 connector_conn_ref,
424 )
425 .await
426 }
427
428 async fn alter_iceberg_table_props(
429 &self,
430 table_id: TableId,
431 sink_id: SinkId,
432 source_id: SourceId,
433 changed_props: BTreeMap<String, String>,
434 changed_secret_refs: BTreeMap<String, PbSecretRef>,
435 connector_conn_ref: Option<ConnectionId>,
436 ) -> Result<()> {
437 self.0
438 .alter_iceberg_table_props(
439 table_id,
440 sink_id,
441 source_id,
442 changed_props,
443 changed_secret_refs,
444 connector_conn_ref,
445 )
446 .await
447 }
448
449 async fn alter_source_connector_props(
450 &self,
451 source_id: SourceId,
452 changed_props: BTreeMap<String, String>,
453 changed_secret_refs: BTreeMap<String, PbSecretRef>,
454 connector_conn_ref: Option<ConnectionId>,
455 ) -> Result<()> {
456 self.0
457 .alter_source_connector_props(
458 source_id,
459 changed_props,
460 changed_secret_refs,
461 connector_conn_ref,
462 )
463 .await
464 }
465
466 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
467 self.0.list_hosted_iceberg_tables().await
468 }
469
470 async fn get_fragment_by_id(
471 &self,
472 fragment_id: FragmentId,
473 ) -> Result<Option<FragmentDistribution>> {
474 self.0.get_fragment_by_id(fragment_id).await
475 }
476
477 async fn get_fragment_vnodes(
478 &self,
479 fragment_id: FragmentId,
480 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
481 self.0.get_fragment_vnodes(fragment_id).await
482 }
483
484 async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
485 self.0.get_actor_vnodes(actor_id).await
486 }
487
488 fn worker_id(&self) -> WorkerId {
489 self.0.worker_id()
490 }
491
492 async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
493 self.0.set_sync_log_store_aligned(job_id, aligned).await
494 }
495
496 async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
497 self.0.compact_iceberg_table(sink_id).await
498 }
499
500 async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
501 self.0.expire_iceberg_table_snapshots(sink_id).await
502 }
503
504 async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
505 self.0.refresh(request).await
506 }
507
508 fn cluster_id(&self) -> &str {
509 self.0.cluster_id()
510 }
511
512 async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
513 self.0.list_unmigrated_tables().await
514 }
515
516 async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
517 self.0.list_refresh_table_states().await
518 }
519}