risingwave_frontend/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
38use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
39use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
40use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
41use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
42use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
43use risingwave_pb::meta::{
44    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
45    RefreshRequest, RefreshResponse,
46};
47use risingwave_pb::secret::PbSecretRef;
48use risingwave_rpc_client::error::Result;
49use risingwave_rpc_client::{HummockMetaClient, MetaClient};
50
51use crate::catalog::{DatabaseId, FragmentId, SinkId};
52
53/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
54/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
55/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
56/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
57/// in this trait so that the mocking can be simplified.
58#[async_trait::async_trait]
59pub trait FrontendMetaClient: Send + Sync {
60    async fn try_unregister(&self);
61
62    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
63
64    async fn wait(&self) -> Result<()>;
65
66    async fn recover(&self) -> Result<()>;
67
68    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
69
70    async fn list_table_fragments(
71        &self,
72        table_ids: &[JobId],
73    ) -> Result<HashMap<JobId, TableFragmentInfo>>;
74
75    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
76
77    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
78
79    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
80
81    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
82
83    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
84
85    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
86
87    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
88
89    async fn set_system_param(
90        &self,
91        param: String,
92        value: Option<String>,
93    ) -> Result<Option<SystemParamsReader>>;
94
95    async fn get_session_params(&self) -> Result<SessionConfig>;
96
97    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
98
99    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
100
101    async fn get_tables(
102        &self,
103        table_ids: Vec<TableId>,
104        include_dropped_table: bool,
105    ) -> Result<HashMap<TableId, Table>>;
106
107    /// Returns vector of (`worker_id`, `min_pinned_version_id`)
108    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>>;
109
110    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
111
112    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
113
114    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
115
116    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
117
118    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
119
120    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
121
122    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
123
124    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
125    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
126
127    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
128
129    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
130
131    async fn apply_throttle(
132        &self,
133        kind: PbThrottleTarget,
134        id: u32,
135        rate_limit: Option<u32>,
136    ) -> Result<()>;
137
138    async fn alter_fragment_parallelism(
139        &self,
140        fragment_ids: Vec<FragmentId>,
141        parallelism: Option<PbTableParallelism>,
142    ) -> Result<()>;
143
144    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
145
146    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
147
148    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
149
150    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
151
152    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
153
154    async fn get_meta_store_endpoint(&self) -> Result<String>;
155
156    async fn alter_sink_props(
157        &self,
158        sink_id: SinkId,
159        changed_props: BTreeMap<String, String>,
160        changed_secret_refs: BTreeMap<String, PbSecretRef>,
161        connector_conn_ref: Option<ConnectionId>,
162    ) -> Result<()>;
163
164    async fn alter_iceberg_table_props(
165        &self,
166        table_id: TableId,
167        sink_id: SinkId,
168        source_id: SourceId,
169        changed_props: BTreeMap<String, String>,
170        changed_secret_refs: BTreeMap<String, PbSecretRef>,
171        connector_conn_ref: Option<ConnectionId>,
172    ) -> Result<()>;
173
174    async fn alter_source_connector_props(
175        &self,
176        source_id: SourceId,
177        changed_props: BTreeMap<String, String>,
178        changed_secret_refs: BTreeMap<String, PbSecretRef>,
179        connector_conn_ref: Option<ConnectionId>,
180    ) -> Result<()>;
181
182    async fn alter_connection_connector_props(
183        &self,
184        connection_id: u32,
185        changed_props: BTreeMap<String, String>,
186        changed_secret_refs: BTreeMap<String, PbSecretRef>,
187    ) -> Result<()>;
188
189    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
190
191    async fn get_fragment_by_id(
192        &self,
193        fragment_id: FragmentId,
194    ) -> Result<Option<FragmentDistribution>>;
195
196    async fn get_fragment_vnodes(
197        &self,
198        fragment_id: FragmentId,
199    ) -> Result<Vec<(ActorId, Vec<u32>)>>;
200
201    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
202
203    fn worker_id(&self) -> WorkerId;
204
205    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
206
207    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
208
209    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
210
211    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
212
213    fn cluster_id(&self) -> &str;
214
215    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
216}
217
218pub struct FrontendMetaClientImpl(pub MetaClient);
219
220#[async_trait::async_trait]
221impl FrontendMetaClient for FrontendMetaClientImpl {
222    async fn try_unregister(&self) {
223        self.0.try_unregister().await;
224    }
225
226    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
227        self.0.flush(database_id).await
228    }
229
230    async fn wait(&self) -> Result<()> {
231        self.0.wait().await
232    }
233
234    async fn recover(&self) -> Result<()> {
235        self.0.recover().await
236    }
237
238    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
239        self.0.cancel_creating_jobs(infos).await
240    }
241
242    async fn list_table_fragments(
243        &self,
244        job_ids: &[JobId],
245    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
246        self.0.list_table_fragments(job_ids).await
247    }
248
249    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
250        self.0.list_streaming_job_states().await
251    }
252
253    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
254        self.0.list_fragment_distributions().await
255    }
256
257    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
258        self.0.list_creating_fragment_distribution().await
259    }
260
261    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
262        self.0.list_actor_states().await
263    }
264
265    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
266        self.0.list_actor_splits().await
267    }
268
269    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
270        self.0.list_object_dependencies().await
271    }
272
273    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
274        let manifest = self.0.get_meta_snapshot_manifest().await?;
275        Ok(manifest.snapshot_metadata)
276    }
277
278    async fn set_system_param(
279        &self,
280        param: String,
281        value: Option<String>,
282    ) -> Result<Option<SystemParamsReader>> {
283        self.0.set_system_param(param, value).await
284    }
285
286    async fn get_session_params(&self) -> Result<SessionConfig> {
287        let session_config: SessionConfig =
288            serde_json::from_str(&self.0.get_session_params().await?)
289                .context("failed to parse session config")?;
290        Ok(session_config)
291    }
292
293    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
294        self.0.set_session_param(param, value).await
295    }
296
297    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
298        let ddl_progress = self.0.get_ddl_progress().await?;
299        Ok(ddl_progress)
300    }
301
302    async fn get_tables(
303        &self,
304        table_ids: Vec<TableId>,
305        include_dropped_tables: bool,
306    ) -> Result<HashMap<TableId, Table>> {
307        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
308        Ok(tables)
309    }
310
311    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>> {
312        let pinned_versions = self
313            .0
314            .risectl_get_pinned_versions_summary()
315            .await?
316            .summary
317            .unwrap()
318            .pinned_versions;
319        let ret = pinned_versions
320            .into_iter()
321            .map(|v| (v.context_id, v.min_pinned_id))
322            .collect();
323        Ok(ret)
324    }
325
326    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
327        self.0.get_current_version().await
328    }
329
330    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
331        self.0
332            .risectl_get_checkpoint_hummock_version()
333            .await
334            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
335    }
336
337    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
338        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
339        self.0
340            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
341            .await
342    }
343
344    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
345        self.0.list_branched_object().await
346    }
347
348    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
349        self.0.risectl_list_compaction_group().await
350    }
351
352    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
353        self.0.list_active_write_limit().await
354    }
355
356    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
357        self.0.list_hummock_meta_config().await
358    }
359
360    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
361        self.0.list_event_log().await
362    }
363
364    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
365        self.0.list_compact_task_assignment().await
366    }
367
368    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
369        self.0.list_worker_nodes(None).await
370    }
371
372    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
373        self.0.list_compact_task_progress().await
374    }
375
376    async fn apply_throttle(
377        &self,
378        kind: PbThrottleTarget,
379        id: u32,
380        rate_limit: Option<u32>,
381    ) -> Result<()> {
382        self.0
383            .apply_throttle(kind, id, rate_limit)
384            .await
385            .map(|_| ())
386    }
387
388    async fn alter_fragment_parallelism(
389        &self,
390        fragment_ids: Vec<FragmentId>,
391        parallelism: Option<PbTableParallelism>,
392    ) -> Result<()> {
393        self.0
394            .alter_fragment_parallelism(fragment_ids, parallelism)
395            .await
396    }
397
398    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
399        self.0.get_cluster_recovery_status().await
400    }
401
402    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
403        self.0.get_cluster_limits().await
404    }
405
406    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
407        self.0.list_rate_limits().await
408    }
409
410    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
411        self.0.list_cdc_progress().await
412    }
413
414    async fn get_meta_store_endpoint(&self) -> Result<String> {
415        self.0.get_meta_store_endpoint().await
416    }
417
418    async fn alter_sink_props(
419        &self,
420        sink_id: SinkId,
421        changed_props: BTreeMap<String, String>,
422        changed_secret_refs: BTreeMap<String, PbSecretRef>,
423        connector_conn_ref: Option<ConnectionId>,
424    ) -> Result<()> {
425        self.0
426            .alter_sink_props(
427                sink_id,
428                changed_props,
429                changed_secret_refs,
430                connector_conn_ref,
431            )
432            .await
433    }
434
435    async fn alter_iceberg_table_props(
436        &self,
437        table_id: TableId,
438        sink_id: SinkId,
439        source_id: SourceId,
440        changed_props: BTreeMap<String, String>,
441        changed_secret_refs: BTreeMap<String, PbSecretRef>,
442        connector_conn_ref: Option<ConnectionId>,
443    ) -> Result<()> {
444        self.0
445            .alter_iceberg_table_props(
446                table_id,
447                sink_id,
448                source_id,
449                changed_props,
450                changed_secret_refs,
451                connector_conn_ref,
452            )
453            .await
454    }
455
456    async fn alter_source_connector_props(
457        &self,
458        source_id: SourceId,
459        changed_props: BTreeMap<String, String>,
460        changed_secret_refs: BTreeMap<String, PbSecretRef>,
461        connector_conn_ref: Option<ConnectionId>,
462    ) -> Result<()> {
463        self.0
464            .alter_source_connector_props(
465                source_id,
466                changed_props,
467                changed_secret_refs,
468                connector_conn_ref,
469            )
470            .await
471    }
472
473    async fn alter_connection_connector_props(
474        &self,
475        connection_id: u32,
476        changed_props: BTreeMap<String, String>,
477        changed_secret_refs: BTreeMap<String, PbSecretRef>,
478    ) -> Result<()> {
479        self.0
480            .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
481            .await
482    }
483
484    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
485        self.0.list_hosted_iceberg_tables().await
486    }
487
488    async fn get_fragment_by_id(
489        &self,
490        fragment_id: FragmentId,
491    ) -> Result<Option<FragmentDistribution>> {
492        self.0.get_fragment_by_id(fragment_id).await
493    }
494
495    async fn get_fragment_vnodes(
496        &self,
497        fragment_id: FragmentId,
498    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
499        self.0.get_fragment_vnodes(fragment_id).await
500    }
501
502    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
503        self.0.get_actor_vnodes(actor_id).await
504    }
505
506    fn worker_id(&self) -> WorkerId {
507        self.0.worker_id()
508    }
509
510    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
511        self.0.set_sync_log_store_aligned(job_id, aligned).await
512    }
513
514    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
515        self.0.compact_iceberg_table(sink_id).await
516    }
517
518    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
519        self.0.expire_iceberg_table_snapshots(sink_id).await
520    }
521
522    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
523        self.0.refresh(request).await
524    }
525
526    fn cluster_id(&self) -> &str {
527        self.0.cluster_id()
528    }
529
530    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
531        self.0.list_unmigrated_tables().await
532    }
533
534    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
535        self.0.list_refresh_table_states().await
536    }
537}