risingwave_frontend/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
32use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
33use risingwave_pb::meta::list_actor_states_response::ActorState;
34use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
35use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
36use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
37use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
38use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
39use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
40use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus};
41use risingwave_pb::secret::PbSecretRef;
42use risingwave_rpc_client::error::Result;
43use risingwave_rpc_client::{HummockMetaClient, MetaClient};
44
45use crate::catalog::DatabaseId;
46
47/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
48/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
49/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
50/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
51/// in this trait so that the mocking can be simplified.
52#[async_trait::async_trait]
53pub trait FrontendMetaClient: Send + Sync {
54    async fn try_unregister(&self);
55
56    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
57
58    async fn wait(&self) -> Result<()>;
59
60    async fn recover(&self) -> Result<()>;
61
62    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
63
64    async fn list_table_fragments(
65        &self,
66        table_ids: &[u32],
67    ) -> Result<HashMap<u32, TableFragmentInfo>>;
68
69    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
70
71    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
72
73    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
74
75    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
76
77    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
78
79    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
80
81    async fn get_system_params(&self) -> Result<SystemParamsReader>;
82
83    async fn set_system_param(
84        &self,
85        param: String,
86        value: Option<String>,
87    ) -> Result<Option<SystemParamsReader>>;
88
89    async fn get_session_params(&self) -> Result<SessionConfig>;
90
91    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
92
93    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
94
95    async fn get_tables(
96        &self,
97        table_ids: &[u32],
98        include_dropped_table: bool,
99    ) -> Result<HashMap<u32, Table>>;
100
101    /// Returns vector of (worker_id, min_pinned_version_id)
102    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
103
104    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
105
106    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
107
108    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
109
110    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
111
112    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
113
114    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
115
116    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
117
118    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
119    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
120
121    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
122
123    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
124
125    async fn apply_throttle(
126        &self,
127        kind: PbThrottleTarget,
128        id: u32,
129        rate_limit: Option<u32>,
130    ) -> Result<()>;
131
132    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
133
134    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
135
136    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
137
138    async fn get_meta_store_endpoint(&self) -> Result<String>;
139
140    async fn alter_sink_props(
141        &self,
142        sink_id: u32,
143        changed_props: BTreeMap<String, String>,
144        changed_secret_refs: BTreeMap<String, PbSecretRef>,
145        connector_conn_ref: Option<u32>,
146    ) -> Result<()>;
147
148    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
149}
150
151pub struct FrontendMetaClientImpl(pub MetaClient);
152
153#[async_trait::async_trait]
154impl FrontendMetaClient for FrontendMetaClientImpl {
155    async fn try_unregister(&self) {
156        self.0.try_unregister().await;
157    }
158
159    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
160        self.0.flush(database_id).await
161    }
162
163    async fn wait(&self) -> Result<()> {
164        self.0.wait().await
165    }
166
167    async fn recover(&self) -> Result<()> {
168        self.0.recover().await
169    }
170
171    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
172        self.0.cancel_creating_jobs(infos).await
173    }
174
175    async fn list_table_fragments(
176        &self,
177        table_ids: &[u32],
178    ) -> Result<HashMap<u32, TableFragmentInfo>> {
179        self.0.list_table_fragments(table_ids).await
180    }
181
182    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
183        self.0.list_streaming_job_states().await
184    }
185
186    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
187        self.0.list_fragment_distributions().await
188    }
189
190    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
191        self.0.list_actor_states().await
192    }
193
194    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
195        self.0.list_actor_splits().await
196    }
197
198    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
199        self.0.list_object_dependencies().await
200    }
201
202    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
203        let manifest = self.0.get_meta_snapshot_manifest().await?;
204        Ok(manifest.snapshot_metadata)
205    }
206
207    async fn get_system_params(&self) -> Result<SystemParamsReader> {
208        self.0.get_system_params().await
209    }
210
211    async fn set_system_param(
212        &self,
213        param: String,
214        value: Option<String>,
215    ) -> Result<Option<SystemParamsReader>> {
216        self.0.set_system_param(param, value).await
217    }
218
219    async fn get_session_params(&self) -> Result<SessionConfig> {
220        let session_config: SessionConfig =
221            serde_json::from_str(&self.0.get_session_params().await?)
222                .context("failed to parse session config")?;
223        Ok(session_config)
224    }
225
226    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
227        self.0.set_session_param(param, value).await
228    }
229
230    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
231        let ddl_progress = self.0.get_ddl_progress().await?;
232        Ok(ddl_progress)
233    }
234
235    async fn get_tables(
236        &self,
237        table_ids: &[u32],
238        include_dropped_tables: bool,
239    ) -> Result<HashMap<u32, Table>> {
240        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
241        Ok(tables)
242    }
243
244    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
245        let pinned_versions = self
246            .0
247            .risectl_get_pinned_versions_summary()
248            .await?
249            .summary
250            .unwrap()
251            .pinned_versions;
252        let ret = pinned_versions
253            .into_iter()
254            .map(|v| (v.context_id, v.min_pinned_id))
255            .collect();
256        Ok(ret)
257    }
258
259    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
260        self.0.get_current_version().await
261    }
262
263    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
264        self.0
265            .risectl_get_checkpoint_hummock_version()
266            .await
267            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
268    }
269
270    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
271        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
272        self.0
273            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
274            .await
275    }
276
277    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
278        self.0.list_branched_object().await
279    }
280
281    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
282        self.0.risectl_list_compaction_group().await
283    }
284
285    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
286        self.0.list_active_write_limit().await
287    }
288
289    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
290        self.0.list_hummock_meta_config().await
291    }
292
293    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
294        self.0.list_event_log().await
295    }
296
297    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
298        self.0.list_compact_task_assignment().await
299    }
300
301    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
302        self.0.list_worker_nodes(None).await
303    }
304
305    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
306        self.0.list_compact_task_progress().await
307    }
308
309    async fn apply_throttle(
310        &self,
311        kind: PbThrottleTarget,
312        id: u32,
313        rate_limit: Option<u32>,
314    ) -> Result<()> {
315        self.0
316            .apply_throttle(kind, id, rate_limit)
317            .await
318            .map(|_| ())
319    }
320
321    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
322        self.0.get_cluster_recovery_status().await
323    }
324
325    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
326        self.0.get_cluster_limits().await
327    }
328
329    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
330        self.0.list_rate_limits().await
331    }
332
333    async fn get_meta_store_endpoint(&self) -> Result<String> {
334        self.0.get_meta_store_endpoint().await
335    }
336
337    async fn alter_sink_props(
338        &self,
339        sink_id: u32,
340        changed_props: BTreeMap<String, String>,
341        changed_secret_refs: BTreeMap<String, PbSecretRef>,
342        connector_conn_ref: Option<u32>,
343    ) -> Result<()> {
344        self.0
345            .alter_sink_props(
346                sink_id,
347                changed_props,
348                changed_secret_refs,
349                connector_conn_ref,
350            )
351            .await
352    }
353
354    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
355        self.0.list_hosted_iceberg_tables().await
356    }
357}