risingwave_frontend/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
32use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
33use risingwave_pb::meta::list_actor_states_response::ActorState;
34use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
35use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
36use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
37use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
38use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
39use risingwave_pb::meta::{EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus};
40use risingwave_pb::secret::PbSecretRef;
41use risingwave_rpc_client::error::Result;
42use risingwave_rpc_client::{HummockMetaClient, MetaClient};
43
44use crate::catalog::DatabaseId;
45
46/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
47/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
48/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
49/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
50/// in this trait so that the mocking can be simplified.
51#[async_trait::async_trait]
52pub trait FrontendMetaClient: Send + Sync {
53    async fn try_unregister(&self);
54
55    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
56
57    async fn wait(&self) -> Result<()>;
58
59    async fn recover(&self) -> Result<()>;
60
61    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
62
63    async fn list_table_fragments(
64        &self,
65        table_ids: &[u32],
66    ) -> Result<HashMap<u32, TableFragmentInfo>>;
67
68    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
69
70    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
71
72    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
73
74    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
75
76    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
77
78    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
79
80    async fn get_system_params(&self) -> Result<SystemParamsReader>;
81
82    async fn set_system_param(
83        &self,
84        param: String,
85        value: Option<String>,
86    ) -> Result<Option<SystemParamsReader>>;
87
88    async fn get_session_params(&self) -> Result<SessionConfig>;
89
90    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
91
92    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
93
94    async fn get_tables(
95        &self,
96        table_ids: &[u32],
97        include_dropped_table: bool,
98    ) -> Result<HashMap<u32, Table>>;
99
100    /// Returns vector of (worker_id, min_pinned_version_id)
101    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
102
103    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
104
105    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
106
107    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
108
109    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
110
111    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
112
113    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
114
115    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
116
117    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
118    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
119
120    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
121
122    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
123
124    async fn apply_throttle(
125        &self,
126        kind: PbThrottleTarget,
127        id: u32,
128        rate_limit: Option<u32>,
129    ) -> Result<()>;
130
131    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
132
133    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
134
135    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
136
137    async fn get_meta_store_endpoint(&self) -> Result<String>;
138
139    async fn alter_sink_props(
140        &self,
141        sink_id: u32,
142        changed_props: BTreeMap<String, String>,
143        changed_secret_refs: BTreeMap<String, PbSecretRef>,
144        connector_conn_ref: Option<u32>,
145    ) -> Result<()>;
146
147    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
148
149    async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>>;
150}
151
152pub struct FrontendMetaClientImpl(pub MetaClient);
153
154#[async_trait::async_trait]
155impl FrontendMetaClient for FrontendMetaClientImpl {
156    async fn try_unregister(&self) {
157        self.0.try_unregister().await;
158    }
159
160    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
161        self.0.flush(database_id).await
162    }
163
164    async fn wait(&self) -> Result<()> {
165        self.0.wait().await
166    }
167
168    async fn recover(&self) -> Result<()> {
169        self.0.recover().await
170    }
171
172    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
173        self.0.cancel_creating_jobs(infos).await
174    }
175
176    async fn list_table_fragments(
177        &self,
178        table_ids: &[u32],
179    ) -> Result<HashMap<u32, TableFragmentInfo>> {
180        self.0.list_table_fragments(table_ids).await
181    }
182
183    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
184        self.0.list_streaming_job_states().await
185    }
186
187    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
188        self.0.list_fragment_distributions().await
189    }
190
191    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
192        self.0.list_actor_states().await
193    }
194
195    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
196        self.0.list_actor_splits().await
197    }
198
199    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
200        self.0.list_object_dependencies().await
201    }
202
203    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
204        let manifest = self.0.get_meta_snapshot_manifest().await?;
205        Ok(manifest.snapshot_metadata)
206    }
207
208    async fn get_system_params(&self) -> Result<SystemParamsReader> {
209        self.0.get_system_params().await
210    }
211
212    async fn set_system_param(
213        &self,
214        param: String,
215        value: Option<String>,
216    ) -> Result<Option<SystemParamsReader>> {
217        self.0.set_system_param(param, value).await
218    }
219
220    async fn get_session_params(&self) -> Result<SessionConfig> {
221        let session_config: SessionConfig =
222            serde_json::from_str(&self.0.get_session_params().await?)
223                .context("failed to parse session config")?;
224        Ok(session_config)
225    }
226
227    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
228        self.0.set_session_param(param, value).await
229    }
230
231    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
232        let ddl_progress = self.0.get_ddl_progress().await?;
233        Ok(ddl_progress)
234    }
235
236    async fn get_tables(
237        &self,
238        table_ids: &[u32],
239        include_dropped_tables: bool,
240    ) -> Result<HashMap<u32, Table>> {
241        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
242        Ok(tables)
243    }
244
245    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
246        let pinned_versions = self
247            .0
248            .risectl_get_pinned_versions_summary()
249            .await?
250            .summary
251            .unwrap()
252            .pinned_versions;
253        let ret = pinned_versions
254            .into_iter()
255            .map(|v| (v.context_id, v.min_pinned_id))
256            .collect();
257        Ok(ret)
258    }
259
260    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
261        self.0.get_current_version().await
262    }
263
264    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
265        self.0
266            .risectl_get_checkpoint_hummock_version()
267            .await
268            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
269    }
270
271    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
272        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
273        self.0
274            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
275            .await
276    }
277
278    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
279        self.0.list_branched_object().await
280    }
281
282    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
283        self.0.risectl_list_compaction_group().await
284    }
285
286    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
287        self.0.list_active_write_limit().await
288    }
289
290    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
291        self.0.list_hummock_meta_config().await
292    }
293
294    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
295        self.0.list_event_log().await
296    }
297
298    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
299        self.0.list_compact_task_assignment().await
300    }
301
302    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
303        self.0.list_worker_nodes(None).await
304    }
305
306    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
307        self.0.list_compact_task_progress().await
308    }
309
310    async fn apply_throttle(
311        &self,
312        kind: PbThrottleTarget,
313        id: u32,
314        rate_limit: Option<u32>,
315    ) -> Result<()> {
316        self.0
317            .apply_throttle(kind, id, rate_limit)
318            .await
319            .map(|_| ())
320    }
321
322    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
323        self.0.get_cluster_recovery_status().await
324    }
325
326    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
327        self.0.get_cluster_limits().await
328    }
329
330    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
331        self.0.list_rate_limits().await
332    }
333
334    async fn get_meta_store_endpoint(&self) -> Result<String> {
335        self.0.get_meta_store_endpoint().await
336    }
337
338    async fn alter_sink_props(
339        &self,
340        sink_id: u32,
341        changed_props: BTreeMap<String, String>,
342        changed_secret_refs: BTreeMap<String, PbSecretRef>,
343        connector_conn_ref: Option<u32>,
344    ) -> Result<()> {
345        self.0
346            .alter_sink_props(
347                sink_id,
348                changed_props,
349                changed_secret_refs,
350                connector_conn_ref,
351            )
352            .await
353    }
354
355    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
356        self.0.list_hosted_iceberg_tables().await
357    }
358
359    async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>> {
360        self.0.get_fragment_by_id(fragment_id).await
361    }
362}