risingwave_frontend/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
32use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
33use risingwave_pb::meta::list_actor_states_response::ActorState;
34use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
35use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
36use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
37use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
38use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
39use risingwave_pb::meta::{
40    EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus, RefreshRequest,
41    RefreshResponse,
42};
43use risingwave_pb::secret::PbSecretRef;
44use risingwave_rpc_client::error::Result;
45use risingwave_rpc_client::{HummockMetaClient, MetaClient};
46
47use crate::catalog::{DatabaseId, SinkId};
48
49/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
50/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
51/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
52/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
53/// in this trait so that the mocking can be simplified.
54#[async_trait::async_trait]
55pub trait FrontendMetaClient: Send + Sync {
56    async fn try_unregister(&self);
57
58    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
59
60    async fn wait(&self) -> Result<()>;
61
62    async fn recover(&self) -> Result<()>;
63
64    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
65
66    async fn list_table_fragments(
67        &self,
68        table_ids: &[u32],
69    ) -> Result<HashMap<u32, TableFragmentInfo>>;
70
71    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
72
73    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
74
75    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
76
77    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
78
79    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
80
81    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
82
83    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
84
85    async fn set_system_param(
86        &self,
87        param: String,
88        value: Option<String>,
89    ) -> Result<Option<SystemParamsReader>>;
90
91    async fn get_session_params(&self) -> Result<SessionConfig>;
92
93    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
94
95    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
96
97    async fn get_tables(
98        &self,
99        table_ids: &[u32],
100        include_dropped_table: bool,
101    ) -> Result<HashMap<u32, Table>>;
102
103    /// Returns vector of (`worker_id`, `min_pinned_version_id`)
104    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
105
106    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
107
108    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
109
110    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
111
112    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
113
114    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
115
116    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
117
118    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
119
120    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
121    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
122
123    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
124
125    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
126
127    async fn apply_throttle(
128        &self,
129        kind: PbThrottleTarget,
130        id: u32,
131        rate_limit: Option<u32>,
132    ) -> Result<()>;
133
134    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
135
136    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
137
138    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
139
140    async fn get_meta_store_endpoint(&self) -> Result<String>;
141
142    async fn alter_sink_props(
143        &self,
144        sink_id: u32,
145        changed_props: BTreeMap<String, String>,
146        changed_secret_refs: BTreeMap<String, PbSecretRef>,
147        connector_conn_ref: Option<u32>,
148    ) -> Result<()>;
149
150    async fn alter_source_connector_props(
151        &self,
152        source_id: u32,
153        changed_props: BTreeMap<String, String>,
154        changed_secret_refs: BTreeMap<String, PbSecretRef>,
155        connector_conn_ref: Option<u32>,
156    ) -> Result<()>;
157
158    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
159
160    async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>>;
161
162    fn worker_id(&self) -> u32;
163
164    async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()>;
165
166    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
167
168    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
169}
170
171pub struct FrontendMetaClientImpl(pub MetaClient);
172
173#[async_trait::async_trait]
174impl FrontendMetaClient for FrontendMetaClientImpl {
175    async fn try_unregister(&self) {
176        self.0.try_unregister().await;
177    }
178
179    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
180        self.0.flush(database_id).await
181    }
182
183    async fn wait(&self) -> Result<()> {
184        self.0.wait().await
185    }
186
187    async fn recover(&self) -> Result<()> {
188        self.0.recover().await
189    }
190
191    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
192        self.0.cancel_creating_jobs(infos).await
193    }
194
195    async fn list_table_fragments(
196        &self,
197        table_ids: &[u32],
198    ) -> Result<HashMap<u32, TableFragmentInfo>> {
199        self.0.list_table_fragments(table_ids).await
200    }
201
202    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
203        self.0.list_streaming_job_states().await
204    }
205
206    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
207        self.0.list_fragment_distributions().await
208    }
209
210    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
211        self.0.list_creating_fragment_distribution().await
212    }
213
214    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
215        self.0.list_actor_states().await
216    }
217
218    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
219        self.0.list_actor_splits().await
220    }
221
222    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
223        self.0.list_object_dependencies().await
224    }
225
226    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
227        let manifest = self.0.get_meta_snapshot_manifest().await?;
228        Ok(manifest.snapshot_metadata)
229    }
230
231    async fn set_system_param(
232        &self,
233        param: String,
234        value: Option<String>,
235    ) -> Result<Option<SystemParamsReader>> {
236        self.0.set_system_param(param, value).await
237    }
238
239    async fn get_session_params(&self) -> Result<SessionConfig> {
240        let session_config: SessionConfig =
241            serde_json::from_str(&self.0.get_session_params().await?)
242                .context("failed to parse session config")?;
243        Ok(session_config)
244    }
245
246    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
247        self.0.set_session_param(param, value).await
248    }
249
250    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
251        let ddl_progress = self.0.get_ddl_progress().await?;
252        Ok(ddl_progress)
253    }
254
255    async fn get_tables(
256        &self,
257        table_ids: &[u32],
258        include_dropped_tables: bool,
259    ) -> Result<HashMap<u32, Table>> {
260        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
261        Ok(tables)
262    }
263
264    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
265        let pinned_versions = self
266            .0
267            .risectl_get_pinned_versions_summary()
268            .await?
269            .summary
270            .unwrap()
271            .pinned_versions;
272        let ret = pinned_versions
273            .into_iter()
274            .map(|v| (v.context_id, v.min_pinned_id))
275            .collect();
276        Ok(ret)
277    }
278
279    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
280        self.0.get_current_version().await
281    }
282
283    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
284        self.0
285            .risectl_get_checkpoint_hummock_version()
286            .await
287            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
288    }
289
290    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
291        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
292        self.0
293            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
294            .await
295    }
296
297    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
298        self.0.list_branched_object().await
299    }
300
301    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
302        self.0.risectl_list_compaction_group().await
303    }
304
305    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
306        self.0.list_active_write_limit().await
307    }
308
309    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
310        self.0.list_hummock_meta_config().await
311    }
312
313    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
314        self.0.list_event_log().await
315    }
316
317    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
318        self.0.list_compact_task_assignment().await
319    }
320
321    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
322        self.0.list_worker_nodes(None).await
323    }
324
325    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
326        self.0.list_compact_task_progress().await
327    }
328
329    async fn apply_throttle(
330        &self,
331        kind: PbThrottleTarget,
332        id: u32,
333        rate_limit: Option<u32>,
334    ) -> Result<()> {
335        self.0
336            .apply_throttle(kind, id, rate_limit)
337            .await
338            .map(|_| ())
339    }
340
341    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
342        self.0.get_cluster_recovery_status().await
343    }
344
345    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
346        self.0.get_cluster_limits().await
347    }
348
349    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
350        self.0.list_rate_limits().await
351    }
352
353    async fn get_meta_store_endpoint(&self) -> Result<String> {
354        self.0.get_meta_store_endpoint().await
355    }
356
357    async fn alter_sink_props(
358        &self,
359        sink_id: u32,
360        changed_props: BTreeMap<String, String>,
361        changed_secret_refs: BTreeMap<String, PbSecretRef>,
362        connector_conn_ref: Option<u32>,
363    ) -> Result<()> {
364        self.0
365            .alter_sink_props(
366                sink_id,
367                changed_props,
368                changed_secret_refs,
369                connector_conn_ref,
370            )
371            .await
372    }
373
374    async fn alter_source_connector_props(
375        &self,
376        source_id: u32,
377        changed_props: BTreeMap<String, String>,
378        changed_secret_refs: BTreeMap<String, PbSecretRef>,
379        connector_conn_ref: Option<u32>,
380    ) -> Result<()> {
381        self.0
382            .alter_source_connector_props(
383                source_id,
384                changed_props,
385                changed_secret_refs,
386                connector_conn_ref,
387            )
388            .await
389    }
390
391    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
392        self.0.list_hosted_iceberg_tables().await
393    }
394
395    async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>> {
396        self.0.get_fragment_by_id(fragment_id).await
397    }
398
399    fn worker_id(&self) -> u32 {
400        self.0.worker_id()
401    }
402
403    async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
404        self.0.set_sync_log_store_aligned(job_id, aligned).await
405    }
406
407    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
408        self.0.compact_iceberg_table(sink_id).await
409    }
410
411    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
412        self.0.refresh(request).await
413    }
414}