risingwave_frontend/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
38use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
39use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
40use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
41use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
42use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
43use risingwave_pb::meta::{
44    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
45    RefreshRequest, RefreshResponse,
46};
47use risingwave_pb::secret::PbSecretRef;
48use risingwave_rpc_client::error::Result;
49use risingwave_rpc_client::{HummockMetaClient, MetaClient};
50
51use crate::catalog::{DatabaseId, FragmentId, SinkId};
52
53/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
54/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
55/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
56/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
57/// in this trait so that the mocking can be simplified.
58#[async_trait::async_trait]
59pub trait FrontendMetaClient: Send + Sync {
60    async fn try_unregister(&self);
61
62    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
63
64    async fn wait(&self) -> Result<()>;
65
66    async fn recover(&self) -> Result<()>;
67
68    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
69
70    async fn list_table_fragments(
71        &self,
72        table_ids: &[JobId],
73    ) -> Result<HashMap<JobId, TableFragmentInfo>>;
74
75    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
76
77    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
78
79    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
80
81    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
82
83    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
84
85    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
86
87    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
88
89    async fn set_system_param(
90        &self,
91        param: String,
92        value: Option<String>,
93    ) -> Result<Option<SystemParamsReader>>;
94
95    async fn get_session_params(&self) -> Result<SessionConfig>;
96
97    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
98
99    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
100
101    async fn get_tables(
102        &self,
103        table_ids: Vec<TableId>,
104        include_dropped_table: bool,
105    ) -> Result<HashMap<TableId, Table>>;
106
107    /// Returns vector of (`worker_id`, `min_pinned_version_id`)
108    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>>;
109
110    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
111
112    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
113
114    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
115
116    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
117
118    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
119
120    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
121
122    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
123
124    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
125    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
126
127    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
128
129    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
130
131    async fn apply_throttle(
132        &self,
133        throttle_target: PbThrottleTarget,
134        throttle_type: risingwave_pb::common::PbThrottleType,
135        id: u32,
136        rate_limit: Option<u32>,
137    ) -> Result<()>;
138
139    async fn alter_fragment_parallelism(
140        &self,
141        fragment_ids: Vec<FragmentId>,
142        parallelism: Option<PbTableParallelism>,
143    ) -> Result<()>;
144
145    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
146
147    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
148
149    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
150
151    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
152
153    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
154
155    async fn get_meta_store_endpoint(&self) -> Result<String>;
156
157    async fn alter_sink_props(
158        &self,
159        sink_id: SinkId,
160        changed_props: BTreeMap<String, String>,
161        changed_secret_refs: BTreeMap<String, PbSecretRef>,
162        connector_conn_ref: Option<ConnectionId>,
163    ) -> Result<()>;
164
165    async fn alter_iceberg_table_props(
166        &self,
167        table_id: TableId,
168        sink_id: SinkId,
169        source_id: SourceId,
170        changed_props: BTreeMap<String, String>,
171        changed_secret_refs: BTreeMap<String, PbSecretRef>,
172        connector_conn_ref: Option<ConnectionId>,
173    ) -> Result<()>;
174
175    async fn alter_source_connector_props(
176        &self,
177        source_id: SourceId,
178        changed_props: BTreeMap<String, String>,
179        changed_secret_refs: BTreeMap<String, PbSecretRef>,
180        connector_conn_ref: Option<ConnectionId>,
181    ) -> Result<()>;
182
183    async fn alter_connection_connector_props(
184        &self,
185        connection_id: u32,
186        changed_props: BTreeMap<String, String>,
187        changed_secret_refs: BTreeMap<String, PbSecretRef>,
188    ) -> Result<()>;
189
190    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
191
192    async fn get_fragment_by_id(
193        &self,
194        fragment_id: FragmentId,
195    ) -> Result<Option<FragmentDistribution>>;
196
197    async fn get_fragment_vnodes(
198        &self,
199        fragment_id: FragmentId,
200    ) -> Result<Vec<(ActorId, Vec<u32>)>>;
201
202    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
203
204    fn worker_id(&self) -> WorkerId;
205
206    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
207
208    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
209
210    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
211
212    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
213
214    fn cluster_id(&self) -> &str;
215
216    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
217}
218
219pub struct FrontendMetaClientImpl(pub MetaClient);
220
221#[async_trait::async_trait]
222impl FrontendMetaClient for FrontendMetaClientImpl {
223    async fn try_unregister(&self) {
224        self.0.try_unregister().await;
225    }
226
227    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
228        self.0.flush(database_id).await
229    }
230
231    async fn wait(&self) -> Result<()> {
232        self.0.wait().await
233    }
234
235    async fn recover(&self) -> Result<()> {
236        self.0.recover().await
237    }
238
239    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
240        self.0.cancel_creating_jobs(infos).await
241    }
242
243    async fn list_table_fragments(
244        &self,
245        job_ids: &[JobId],
246    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
247        self.0.list_table_fragments(job_ids).await
248    }
249
250    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
251        self.0.list_streaming_job_states().await
252    }
253
254    async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
255        self.0.list_fragment_distributions().await
256    }
257
258    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
259        self.0.list_creating_fragment_distribution().await
260    }
261
262    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
263        self.0.list_actor_states().await
264    }
265
266    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
267        self.0.list_actor_splits().await
268    }
269
270    async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
271        self.0.list_object_dependencies().await
272    }
273
274    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
275        let manifest = self.0.get_meta_snapshot_manifest().await?;
276        Ok(manifest.snapshot_metadata)
277    }
278
279    async fn set_system_param(
280        &self,
281        param: String,
282        value: Option<String>,
283    ) -> Result<Option<SystemParamsReader>> {
284        self.0.set_system_param(param, value).await
285    }
286
287    async fn get_session_params(&self) -> Result<SessionConfig> {
288        let session_config: SessionConfig =
289            serde_json::from_str(&self.0.get_session_params().await?)
290                .context("failed to parse session config")?;
291        Ok(session_config)
292    }
293
294    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
295        self.0.set_session_param(param, value).await
296    }
297
298    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
299        let ddl_progress = self.0.get_ddl_progress().await?;
300        Ok(ddl_progress)
301    }
302
303    async fn get_tables(
304        &self,
305        table_ids: Vec<TableId>,
306        include_dropped_tables: bool,
307    ) -> Result<HashMap<TableId, Table>> {
308        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
309        Ok(tables)
310    }
311
312    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, u64)>> {
313        let pinned_versions = self
314            .0
315            .risectl_get_pinned_versions_summary()
316            .await?
317            .summary
318            .unwrap()
319            .pinned_versions;
320        let ret = pinned_versions
321            .into_iter()
322            .map(|v| (v.context_id, v.min_pinned_id))
323            .collect();
324        Ok(ret)
325    }
326
327    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
328        self.0.get_current_version().await
329    }
330
331    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
332        self.0
333            .risectl_get_checkpoint_hummock_version()
334            .await
335            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
336    }
337
338    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
339        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
340        self.0
341            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
342            .await
343    }
344
345    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
346        self.0.list_branched_object().await
347    }
348
349    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
350        self.0.risectl_list_compaction_group().await
351    }
352
353    async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
354        self.0.list_active_write_limit().await
355    }
356
357    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
358        self.0.list_hummock_meta_config().await
359    }
360
361    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
362        self.0.list_event_log().await
363    }
364
365    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
366        self.0.list_compact_task_assignment().await
367    }
368
369    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
370        self.0.list_worker_nodes(None).await
371    }
372
373    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
374        self.0.list_compact_task_progress().await
375    }
376
377    async fn apply_throttle(
378        &self,
379        throttle_target: PbThrottleTarget,
380        throttle_type: risingwave_pb::common::PbThrottleType,
381        id: u32,
382        rate_limit: Option<u32>,
383    ) -> Result<()> {
384        self.0
385            .apply_throttle(throttle_target, throttle_type, id, rate_limit)
386            .await
387            .map(|_| ())
388    }
389
390    async fn alter_fragment_parallelism(
391        &self,
392        fragment_ids: Vec<FragmentId>,
393        parallelism: Option<PbTableParallelism>,
394    ) -> Result<()> {
395        self.0
396            .alter_fragment_parallelism(fragment_ids, parallelism)
397            .await
398    }
399
400    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
401        self.0.get_cluster_recovery_status().await
402    }
403
404    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
405        self.0.get_cluster_limits().await
406    }
407
408    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
409        self.0.list_rate_limits().await
410    }
411
412    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
413        self.0.list_cdc_progress().await
414    }
415
416    async fn get_meta_store_endpoint(&self) -> Result<String> {
417        self.0.get_meta_store_endpoint().await
418    }
419
420    async fn alter_sink_props(
421        &self,
422        sink_id: SinkId,
423        changed_props: BTreeMap<String, String>,
424        changed_secret_refs: BTreeMap<String, PbSecretRef>,
425        connector_conn_ref: Option<ConnectionId>,
426    ) -> Result<()> {
427        self.0
428            .alter_sink_props(
429                sink_id,
430                changed_props,
431                changed_secret_refs,
432                connector_conn_ref,
433            )
434            .await
435    }
436
437    async fn alter_iceberg_table_props(
438        &self,
439        table_id: TableId,
440        sink_id: SinkId,
441        source_id: SourceId,
442        changed_props: BTreeMap<String, String>,
443        changed_secret_refs: BTreeMap<String, PbSecretRef>,
444        connector_conn_ref: Option<ConnectionId>,
445    ) -> Result<()> {
446        self.0
447            .alter_iceberg_table_props(
448                table_id,
449                sink_id,
450                source_id,
451                changed_props,
452                changed_secret_refs,
453                connector_conn_ref,
454            )
455            .await
456    }
457
458    async fn alter_source_connector_props(
459        &self,
460        source_id: SourceId,
461        changed_props: BTreeMap<String, String>,
462        changed_secret_refs: BTreeMap<String, PbSecretRef>,
463        connector_conn_ref: Option<ConnectionId>,
464    ) -> Result<()> {
465        self.0
466            .alter_source_connector_props(
467                source_id,
468                changed_props,
469                changed_secret_refs,
470                connector_conn_ref,
471            )
472            .await
473    }
474
475    async fn alter_connection_connector_props(
476        &self,
477        connection_id: u32,
478        changed_props: BTreeMap<String, String>,
479        changed_secret_refs: BTreeMap<String, PbSecretRef>,
480    ) -> Result<()> {
481        self.0
482            .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
483            .await
484    }
485
486    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
487        self.0.list_hosted_iceberg_tables().await
488    }
489
490    async fn get_fragment_by_id(
491        &self,
492        fragment_id: FragmentId,
493    ) -> Result<Option<FragmentDistribution>> {
494        self.0.get_fragment_by_id(fragment_id).await
495    }
496
497    async fn get_fragment_vnodes(
498        &self,
499        fragment_id: FragmentId,
500    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
501        self.0.get_fragment_vnodes(fragment_id).await
502    }
503
504    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
505        self.0.get_actor_vnodes(actor_id).await
506    }
507
508    fn worker_id(&self) -> WorkerId {
509        self.0.worker_id()
510    }
511
512    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
513        self.0.set_sync_log_store_aligned(job_id, aligned).await
514    }
515
516    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
517        self.0.compact_iceberg_table(sink_id).await
518    }
519
520    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
521        self.0.expire_iceberg_table_snapshots(sink_id).await
522    }
523
524    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
525        self.0.refresh(request).await
526    }
527
528    fn cluster_id(&self) -> &str {
529        self.0.cluster_id()
530    }
531
532    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
533        self.0.list_unmigrated_tables().await
534    }
535
536    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
537        self.0.list_refresh_table_states().await
538    }
539}