risingwave_frontend/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
32use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
33use risingwave_pb::meta::list_actor_states_response::ActorState;
34use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
35use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
36use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
37use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
38use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
39use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
40use risingwave_pb::meta::{
41    EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus, RefreshRequest,
42    RefreshResponse,
43};
44use risingwave_pb::secret::PbSecretRef;
45use risingwave_rpc_client::error::Result;
46use risingwave_rpc_client::{HummockMetaClient, MetaClient};
47
48use crate::catalog::{DatabaseId, SinkId};
49
50/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
51/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
52/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
53/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
54/// in this trait so that the mocking can be simplified.
55#[async_trait::async_trait]
56pub trait FrontendMetaClient: Send + Sync {
57    async fn try_unregister(&self);
58
59    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
60
61    async fn wait(&self) -> Result<()>;
62
63    async fn recover(&self) -> Result<()>;
64
65    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
66
67    async fn list_table_fragments(
68        &self,
69        table_ids: &[u32],
70    ) -> Result<HashMap<u32, TableFragmentInfo>>;
71
72    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
73
74    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
75
76    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
77
78    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
79
80    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
81
82    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
83
84    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
85
86    async fn set_system_param(
87        &self,
88        param: String,
89        value: Option<String>,
90    ) -> Result<Option<SystemParamsReader>>;
91
92    async fn get_session_params(&self) -> Result<SessionConfig>;
93
94    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
95
96    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
97
98    async fn get_tables(
99        &self,
100        table_ids: &[u32],
101        include_dropped_table: bool,
102    ) -> Result<HashMap<u32, Table>>;
103
104    /// Returns vector of (`worker_id`, `min_pinned_version_id`)
105    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
106
107    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
108
109    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
110
111    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
112
113    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
114
115    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
116
117    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
118
119    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
120
121    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
122    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
123
124    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
125
126    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
127
128    async fn apply_throttle(
129        &self,
130        kind: PbThrottleTarget,
131        id: u32,
132        rate_limit: Option<u32>,
133    ) -> Result<()>;
134
135    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
136
137    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
138
139    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
140
141    async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>>;
142
143    async fn get_meta_store_endpoint(&self) -> Result<String>;
144
145    async fn alter_sink_props(
146        &self,
147        sink_id: u32,
148        changed_props: BTreeMap<String, String>,
149        changed_secret_refs: BTreeMap<String, PbSecretRef>,
150        connector_conn_ref: Option<u32>,
151    ) -> Result<()>;
152
153    async fn alter_iceberg_table_props(
154        &self,
155        table_id: u32,
156        sink_id: u32,
157        source_id: u32,
158        changed_props: BTreeMap<String, String>,
159        changed_secret_refs: BTreeMap<String, PbSecretRef>,
160        connector_conn_ref: Option<u32>,
161    ) -> Result<()>;
162
163    async fn alter_source_connector_props(
164        &self,
165        source_id: u32,
166        changed_props: BTreeMap<String, String>,
167        changed_secret_refs: BTreeMap<String, PbSecretRef>,
168        connector_conn_ref: Option<u32>,
169    ) -> Result<()>;
170
171    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
172
173    async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>>;
174
175    fn worker_id(&self) -> u32;
176
177    async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()>;
178
179    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
180
181    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
182
183    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
184}
185
186pub struct FrontendMetaClientImpl(pub MetaClient);
187
188#[async_trait::async_trait]
189impl FrontendMetaClient for FrontendMetaClientImpl {
190    async fn try_unregister(&self) {
191        self.0.try_unregister().await;
192    }
193
194    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
195        self.0.flush(database_id).await
196    }
197
198    async fn wait(&self) -> Result<()> {
199        self.0.wait().await
200    }
201
202    async fn recover(&self) -> Result<()> {
203        self.0.recover().await
204    }
205
206    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
207        self.0.cancel_creating_jobs(infos).await
208    }
209
210    async fn list_table_fragments(
211        &self,
212        table_ids: &[u32],
213    ) -> Result<HashMap<u32, TableFragmentInfo>> {
214        self.0.list_table_fragments(table_ids).await
215    }
216
217    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
218        self.0.list_streaming_job_states().await
219    }
220
221    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
222        self.0.list_fragment_distributions().await
223    }
224
225    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
226        self.0.list_creating_fragment_distribution().await
227    }
228
229    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
230        self.0.list_actor_states().await
231    }
232
233    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
234        self.0.list_actor_splits().await
235    }
236
237    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
238        self.0.list_object_dependencies().await
239    }
240
241    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
242        let manifest = self.0.get_meta_snapshot_manifest().await?;
243        Ok(manifest.snapshot_metadata)
244    }
245
246    async fn set_system_param(
247        &self,
248        param: String,
249        value: Option<String>,
250    ) -> Result<Option<SystemParamsReader>> {
251        self.0.set_system_param(param, value).await
252    }
253
254    async fn get_session_params(&self) -> Result<SessionConfig> {
255        let session_config: SessionConfig =
256            serde_json::from_str(&self.0.get_session_params().await?)
257                .context("failed to parse session config")?;
258        Ok(session_config)
259    }
260
261    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
262        self.0.set_session_param(param, value).await
263    }
264
265    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
266        let ddl_progress = self.0.get_ddl_progress().await?;
267        Ok(ddl_progress)
268    }
269
270    async fn get_tables(
271        &self,
272        table_ids: &[u32],
273        include_dropped_tables: bool,
274    ) -> Result<HashMap<u32, Table>> {
275        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
276        Ok(tables)
277    }
278
279    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
280        let pinned_versions = self
281            .0
282            .risectl_get_pinned_versions_summary()
283            .await?
284            .summary
285            .unwrap()
286            .pinned_versions;
287        let ret = pinned_versions
288            .into_iter()
289            .map(|v| (v.context_id, v.min_pinned_id))
290            .collect();
291        Ok(ret)
292    }
293
294    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
295        self.0.get_current_version().await
296    }
297
298    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
299        self.0
300            .risectl_get_checkpoint_hummock_version()
301            .await
302            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
303    }
304
305    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
306        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
307        self.0
308            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
309            .await
310    }
311
312    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
313        self.0.list_branched_object().await
314    }
315
316    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
317        self.0.risectl_list_compaction_group().await
318    }
319
320    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
321        self.0.list_active_write_limit().await
322    }
323
324    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
325        self.0.list_hummock_meta_config().await
326    }
327
328    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
329        self.0.list_event_log().await
330    }
331
332    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
333        self.0.list_compact_task_assignment().await
334    }
335
336    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
337        self.0.list_worker_nodes(None).await
338    }
339
340    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
341        self.0.list_compact_task_progress().await
342    }
343
344    async fn apply_throttle(
345        &self,
346        kind: PbThrottleTarget,
347        id: u32,
348        rate_limit: Option<u32>,
349    ) -> Result<()> {
350        self.0
351            .apply_throttle(kind, id, rate_limit)
352            .await
353            .map(|_| ())
354    }
355
356    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
357        self.0.get_cluster_recovery_status().await
358    }
359
360    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
361        self.0.get_cluster_limits().await
362    }
363
364    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
365        self.0.list_rate_limits().await
366    }
367
368    async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>> {
369        self.0.list_cdc_progress().await
370    }
371
372    async fn get_meta_store_endpoint(&self) -> Result<String> {
373        self.0.get_meta_store_endpoint().await
374    }
375
376    async fn alter_sink_props(
377        &self,
378        sink_id: u32,
379        changed_props: BTreeMap<String, String>,
380        changed_secret_refs: BTreeMap<String, PbSecretRef>,
381        connector_conn_ref: Option<u32>,
382    ) -> Result<()> {
383        self.0
384            .alter_sink_props(
385                sink_id,
386                changed_props,
387                changed_secret_refs,
388                connector_conn_ref,
389            )
390            .await
391    }
392
393    async fn alter_iceberg_table_props(
394        &self,
395        table_id: u32,
396        sink_id: u32,
397        source_id: u32,
398        changed_props: BTreeMap<String, String>,
399        changed_secret_refs: BTreeMap<String, PbSecretRef>,
400        connector_conn_ref: Option<u32>,
401    ) -> Result<()> {
402        self.0
403            .alter_iceberg_table_props(
404                table_id,
405                sink_id,
406                source_id,
407                changed_props,
408                changed_secret_refs,
409                connector_conn_ref,
410            )
411            .await
412    }
413
414    async fn alter_source_connector_props(
415        &self,
416        source_id: u32,
417        changed_props: BTreeMap<String, String>,
418        changed_secret_refs: BTreeMap<String, PbSecretRef>,
419        connector_conn_ref: Option<u32>,
420    ) -> Result<()> {
421        self.0
422            .alter_source_connector_props(
423                source_id,
424                changed_props,
425                changed_secret_refs,
426                connector_conn_ref,
427            )
428            .await
429    }
430
431    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
432        self.0.list_hosted_iceberg_tables().await
433    }
434
435    async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>> {
436        self.0.get_fragment_by_id(fragment_id).await
437    }
438
439    fn worker_id(&self) -> u32 {
440        self.0.worker_id()
441    }
442
443    async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
444        self.0.set_sync_log_store_aligned(job_id, aligned).await
445    }
446
447    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
448        self.0.compact_iceberg_table(sink_id).await
449    }
450
451    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
452        self.0.expire_iceberg_table_snapshots(sink_id).await
453    }
454
455    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
456        self.0.refresh(request).await
457    }
458}