risingwave_frontend/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::id::JobId;
32use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
33use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
34use risingwave_pb::meta::list_actor_states_response::ActorState;
35use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
36use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
37use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
38use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
39use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
40use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
41use risingwave_pb::meta::{
42    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
43    RefreshRequest, RefreshResponse,
44};
45use risingwave_pb::secret::PbSecretRef;
46use risingwave_rpc_client::error::Result;
47use risingwave_rpc_client::{HummockMetaClient, MetaClient};
48
49use crate::catalog::{DatabaseId, SinkId, TableId};
50
51/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
52/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
53/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
54/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
55/// in this trait so that the mocking can be simplified.
56#[async_trait::async_trait]
57pub trait FrontendMetaClient: Send + Sync {
58    async fn try_unregister(&self);
59
60    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
61
62    async fn wait(&self) -> Result<()>;
63
64    async fn recover(&self) -> Result<()>;
65
66    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
67
68    async fn list_table_fragments(
69        &self,
70        table_ids: &[JobId],
71    ) -> Result<HashMap<JobId, TableFragmentInfo>>;
72
73    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
74
75    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
76
77    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
78
79    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
80
81    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
82
83    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
84
85    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
86
87    async fn set_system_param(
88        &self,
89        param: String,
90        value: Option<String>,
91    ) -> Result<Option<SystemParamsReader>>;
92
93    async fn get_session_params(&self) -> Result<SessionConfig>;
94
95    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
96
97    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
98
99    async fn get_tables(
100        &self,
101        table_ids: Vec<TableId>,
102        include_dropped_table: bool,
103    ) -> Result<HashMap<TableId, Table>>;
104
105    /// Returns vector of (`worker_id`, `min_pinned_version_id`)
106    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
107
108    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
109
110    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
111
112    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
113
114    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
115
116    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
117
118    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
119
120    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
121
122    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
123    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
124
125    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
126
127    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
128
129    async fn apply_throttle(
130        &self,
131        kind: PbThrottleTarget,
132        id: u32,
133        rate_limit: Option<u32>,
134    ) -> Result<()>;
135
136    async fn alter_fragment_parallelism(
137        &self,
138        fragment_ids: Vec<u32>,
139        parallelism: Option<PbTableParallelism>,
140    ) -> Result<()>;
141
142    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
143
144    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
145
146    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
147
148    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
149
150    async fn get_meta_store_endpoint(&self) -> Result<String>;
151
152    async fn alter_sink_props(
153        &self,
154        sink_id: u32,
155        changed_props: BTreeMap<String, String>,
156        changed_secret_refs: BTreeMap<String, PbSecretRef>,
157        connector_conn_ref: Option<u32>,
158    ) -> Result<()>;
159
160    async fn alter_iceberg_table_props(
161        &self,
162        table_id: u32,
163        sink_id: u32,
164        source_id: u32,
165        changed_props: BTreeMap<String, String>,
166        changed_secret_refs: BTreeMap<String, PbSecretRef>,
167        connector_conn_ref: Option<u32>,
168    ) -> Result<()>;
169
170    async fn alter_source_connector_props(
171        &self,
172        source_id: u32,
173        changed_props: BTreeMap<String, String>,
174        changed_secret_refs: BTreeMap<String, PbSecretRef>,
175        connector_conn_ref: Option<u32>,
176    ) -> Result<()>;
177
178    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
179
180    async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>>;
181
182    async fn get_fragment_vnodes(&self, fragment_id: u32) -> Result<Vec<(u32, Vec<u32>)>>;
183
184    async fn get_actor_vnodes(&self, actor_id: u32) -> Result<Vec<u32>>;
185
186    fn worker_id(&self) -> u32;
187
188    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
189
190    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
191
192    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
193
194    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
195
196    fn cluster_id(&self) -> &str;
197
198    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
199}
200
201pub struct FrontendMetaClientImpl(pub MetaClient);
202
203#[async_trait::async_trait]
204impl FrontendMetaClient for FrontendMetaClientImpl {
205    async fn try_unregister(&self) {
206        self.0.try_unregister().await;
207    }
208
209    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
210        self.0.flush(database_id).await
211    }
212
213    async fn wait(&self) -> Result<()> {
214        self.0.wait().await
215    }
216
217    async fn recover(&self) -> Result<()> {
218        self.0.recover().await
219    }
220
221    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
222        self.0.cancel_creating_jobs(infos).await
223    }
224
225    async fn list_table_fragments(
226        &self,
227        table_ids: &[JobId],
228    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
229        self.0.list_table_fragments(table_ids).await
230    }
231
232    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
233        self.0.list_streaming_job_states().await
234    }
235
236    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
237        self.0.list_fragment_distributions().await
238    }
239
240    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
241        self.0.list_creating_fragment_distribution().await
242    }
243
244    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
245        self.0.list_actor_states().await
246    }
247
248    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
249        self.0.list_actor_splits().await
250    }
251
252    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
253        self.0.list_object_dependencies().await
254    }
255
256    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
257        let manifest = self.0.get_meta_snapshot_manifest().await?;
258        Ok(manifest.snapshot_metadata)
259    }
260
261    async fn set_system_param(
262        &self,
263        param: String,
264        value: Option<String>,
265    ) -> Result<Option<SystemParamsReader>> {
266        self.0.set_system_param(param, value).await
267    }
268
269    async fn get_session_params(&self) -> Result<SessionConfig> {
270        let session_config: SessionConfig =
271            serde_json::from_str(&self.0.get_session_params().await?)
272                .context("failed to parse session config")?;
273        Ok(session_config)
274    }
275
276    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
277        self.0.set_session_param(param, value).await
278    }
279
280    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
281        let ddl_progress = self.0.get_ddl_progress().await?;
282        Ok(ddl_progress)
283    }
284
285    async fn get_tables(
286        &self,
287        table_ids: Vec<TableId>,
288        include_dropped_tables: bool,
289    ) -> Result<HashMap<TableId, Table>> {
290        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
291        Ok(tables)
292    }
293
294    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
295        let pinned_versions = self
296            .0
297            .risectl_get_pinned_versions_summary()
298            .await?
299            .summary
300            .unwrap()
301            .pinned_versions;
302        let ret = pinned_versions
303            .into_iter()
304            .map(|v| (v.context_id, v.min_pinned_id))
305            .collect();
306        Ok(ret)
307    }
308
309    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
310        self.0.get_current_version().await
311    }
312
313    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
314        self.0
315            .risectl_get_checkpoint_hummock_version()
316            .await
317            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
318    }
319
320    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
321        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
322        self.0
323            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
324            .await
325    }
326
327    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
328        self.0.list_branched_object().await
329    }
330
331    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
332        self.0.risectl_list_compaction_group().await
333    }
334
335    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
336        self.0.list_active_write_limit().await
337    }
338
339    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
340        self.0.list_hummock_meta_config().await
341    }
342
343    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
344        self.0.list_event_log().await
345    }
346
347    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
348        self.0.list_compact_task_assignment().await
349    }
350
351    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
352        self.0.list_worker_nodes(None).await
353    }
354
355    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
356        self.0.list_compact_task_progress().await
357    }
358
359    async fn apply_throttle(
360        &self,
361        kind: PbThrottleTarget,
362        id: u32,
363        rate_limit: Option<u32>,
364    ) -> Result<()> {
365        self.0
366            .apply_throttle(kind, id, rate_limit)
367            .await
368            .map(|_| ())
369    }
370
371    async fn alter_fragment_parallelism(
372        &self,
373        fragment_ids: Vec<u32>,
374        parallelism: Option<PbTableParallelism>,
375    ) -> Result<()> {
376        self.0
377            .alter_fragment_parallelism(fragment_ids, parallelism)
378            .await
379    }
380
381    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
382        self.0.get_cluster_recovery_status().await
383    }
384
385    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
386        self.0.get_cluster_limits().await
387    }
388
389    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
390        self.0.list_rate_limits().await
391    }
392
393    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
394        self.0.list_cdc_progress().await
395    }
396
397    async fn get_meta_store_endpoint(&self) -> Result<String> {
398        self.0.get_meta_store_endpoint().await
399    }
400
401    async fn alter_sink_props(
402        &self,
403        sink_id: u32,
404        changed_props: BTreeMap<String, String>,
405        changed_secret_refs: BTreeMap<String, PbSecretRef>,
406        connector_conn_ref: Option<u32>,
407    ) -> Result<()> {
408        self.0
409            .alter_sink_props(
410                sink_id,
411                changed_props,
412                changed_secret_refs,
413                connector_conn_ref,
414            )
415            .await
416    }
417
418    async fn alter_iceberg_table_props(
419        &self,
420        table_id: u32,
421        sink_id: u32,
422        source_id: u32,
423        changed_props: BTreeMap<String, String>,
424        changed_secret_refs: BTreeMap<String, PbSecretRef>,
425        connector_conn_ref: Option<u32>,
426    ) -> Result<()> {
427        self.0
428            .alter_iceberg_table_props(
429                table_id,
430                sink_id,
431                source_id,
432                changed_props,
433                changed_secret_refs,
434                connector_conn_ref,
435            )
436            .await
437    }
438
439    async fn alter_source_connector_props(
440        &self,
441        source_id: u32,
442        changed_props: BTreeMap<String, String>,
443        changed_secret_refs: BTreeMap<String, PbSecretRef>,
444        connector_conn_ref: Option<u32>,
445    ) -> Result<()> {
446        self.0
447            .alter_source_connector_props(
448                source_id,
449                changed_props,
450                changed_secret_refs,
451                connector_conn_ref,
452            )
453            .await
454    }
455
456    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
457        self.0.list_hosted_iceberg_tables().await
458    }
459
460    async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>> {
461        self.0.get_fragment_by_id(fragment_id).await
462    }
463
464    async fn get_fragment_vnodes(&self, fragment_id: u32) -> Result<Vec<(u32, Vec<u32>)>> {
465        self.0.get_fragment_vnodes(fragment_id).await
466    }
467
468    async fn get_actor_vnodes(&self, actor_id: u32) -> Result<Vec<u32>> {
469        self.0.get_actor_vnodes(actor_id).await
470    }
471
472    fn worker_id(&self) -> u32 {
473        self.0.worker_id()
474    }
475
476    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
477        self.0.set_sync_log_store_aligned(job_id, aligned).await
478    }
479
480    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
481        self.0.compact_iceberg_table(sink_id).await
482    }
483
484    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
485        self.0.expire_iceberg_table_snapshots(sink_id).await
486    }
487
488    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
489        self.0.refresh(request).await
490    }
491
492    fn cluster_id(&self) -> &str {
493        self.0.cluster_id()
494    }
495
496    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
497        self.0.list_unmigrated_tables().await
498    }
499}