risingwave_frontend/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::change_log::TableChangeLogs;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
25use risingwave_pb::backup_service::{BackupJobStatus, MetaSnapshotMetadata};
26use risingwave_pb::catalog::Table;
27use risingwave_pb::common::WorkerNode;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::write_limits::WriteLimit;
30use risingwave_pb::hummock::{
31    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
32};
33use risingwave_pb::id::ActorId;
34use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
35use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
36use risingwave_pb::meta::list_actor_states_response::ActorState;
37use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
38use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
39use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
40use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
41use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
42use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
43use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
44use risingwave_pb::meta::{
45    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
46    RefreshRequest, RefreshResponse, list_sink_log_store_tables_response,
47};
48use risingwave_pb::secret::PbSecretRef;
49use risingwave_rpc_client::error::Result;
50use risingwave_rpc_client::{HummockMetaClient, MetaClient};
51
52use crate::catalog::{DatabaseId, FragmentId, SinkId};
53
54/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
55/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
56/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
57/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
58/// in this trait so that the mocking can be simplified.
59#[async_trait::async_trait]
60pub trait FrontendMetaClient: Send + Sync {
61    async fn try_unregister(&self);
62
63    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
64
65    async fn backup_meta(&self, remarks: Option<String>) -> Result<u64>;
66    async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)>;
67    async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()>;
68
69    async fn recover(&self) -> Result<()>;
70
71    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
72
73    async fn list_table_fragments(
74        &self,
75        table_ids: &[JobId],
76    ) -> Result<HashMap<JobId, TableFragmentInfo>>;
77
78    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
79
80    async fn list_fragment_distribution(
81        &self,
82        include_node: bool,
83    ) -> Result<Vec<FragmentDistribution>>;
84
85    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
86
87    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
88
89    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
90
91    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
92
93    async fn list_sink_log_store_tables(
94        &self,
95    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>>;
96
97    async fn set_system_param(
98        &self,
99        param: String,
100        value: Option<String>,
101    ) -> Result<Option<SystemParamsReader>>;
102
103    async fn get_session_params(&self) -> Result<SessionConfig>;
104
105    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
106
107    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
108
109    async fn get_tables(
110        &self,
111        table_ids: Vec<TableId>,
112        include_dropped_table: bool,
113    ) -> Result<HashMap<TableId, Table>>;
114
115    /// Returns vector of (`worker_id`, `min_pinned_version_id`)
116    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>>;
117
118    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
119
120    async fn get_hummock_table_change_log(
121        &self,
122        start_epoch_inclusive: Option<u64>,
123        end_epoch_inclusive: Option<u64>,
124        table_ids: Option<HashSet<TableId>>,
125        exclude_empty: bool,
126        limit: Option<u32>,
127    ) -> Result<TableChangeLogs>;
128
129    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
130
131    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
132
133    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
134
135    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
136
137    async fn list_hummock_active_write_limits(
138        &self,
139    ) -> Result<HashMap<CompactionGroupId, WriteLimit>>;
140
141    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
142
143    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
144    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
145
146    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
147
148    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
149
150    async fn apply_throttle(
151        &self,
152        throttle_target: PbThrottleTarget,
153        throttle_type: risingwave_pb::common::PbThrottleType,
154        id: u32,
155        rate_limit: Option<u32>,
156    ) -> Result<()>;
157
158    async fn alter_fragment_parallelism(
159        &self,
160        fragment_ids: Vec<FragmentId>,
161        parallelism: Option<PbTableParallelism>,
162    ) -> Result<()>;
163
164    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
165
166    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
167
168    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
169
170    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
171
172    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
173
174    async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>>;
175
176    async fn get_meta_store_endpoint(&self) -> Result<String>;
177
178    async fn alter_sink_props(
179        &self,
180        sink_id: SinkId,
181        changed_props: BTreeMap<String, String>,
182        changed_secret_refs: BTreeMap<String, PbSecretRef>,
183        connector_conn_ref: Option<ConnectionId>,
184    ) -> Result<()>;
185
186    async fn alter_iceberg_table_props(
187        &self,
188        table_id: TableId,
189        sink_id: SinkId,
190        source_id: SourceId,
191        changed_props: BTreeMap<String, String>,
192        changed_secret_refs: BTreeMap<String, PbSecretRef>,
193        connector_conn_ref: Option<ConnectionId>,
194    ) -> Result<()>;
195
196    async fn alter_source_connector_props(
197        &self,
198        source_id: SourceId,
199        changed_props: BTreeMap<String, String>,
200        changed_secret_refs: BTreeMap<String, PbSecretRef>,
201        connector_conn_ref: Option<ConnectionId>,
202    ) -> Result<()>;
203
204    async fn alter_connection_connector_props(
205        &self,
206        connection_id: u32,
207        changed_props: BTreeMap<String, String>,
208        changed_secret_refs: BTreeMap<String, PbSecretRef>,
209    ) -> Result<()>;
210
211    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
212
213    async fn get_fragment_by_id(
214        &self,
215        fragment_id: FragmentId,
216    ) -> Result<Option<FragmentDistribution>>;
217
218    async fn get_fragment_vnodes(
219        &self,
220        fragment_id: FragmentId,
221    ) -> Result<Vec<(ActorId, Vec<u32>)>>;
222
223    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
224
225    fn worker_id(&self) -> WorkerId;
226
227    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
228
229    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
230
231    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
232
233    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
234
235    fn cluster_id(&self) -> &str;
236
237    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
238}
239
240pub struct FrontendMetaClientImpl(pub MetaClient);
241
242#[async_trait::async_trait]
243impl FrontendMetaClient for FrontendMetaClientImpl {
244    async fn try_unregister(&self) {
245        self.0.try_unregister().await;
246    }
247
248    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
249        self.0.flush(database_id).await
250    }
251
252    async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
253        self.0.backup_meta(remarks).await
254    }
255
256    async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
257        self.0.get_backup_job_status(job_id).await
258    }
259
260    async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
261        self.0.delete_meta_snapshot(snapshot_ids).await
262    }
263
264    async fn recover(&self) -> Result<()> {
265        self.0.recover().await
266    }
267
268    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
269        self.0.cancel_creating_jobs(infos).await
270    }
271
272    async fn list_table_fragments(
273        &self,
274        job_ids: &[JobId],
275    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
276        self.0.list_table_fragments(job_ids).await
277    }
278
279    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
280        self.0.list_streaming_job_states().await
281    }
282
283    async fn list_fragment_distribution(
284        &self,
285        include_node: bool,
286    ) -> Result<Vec<FragmentDistribution>> {
287        self.0.list_fragment_distributions(include_node).await
288    }
289
290    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
291        self.0.list_creating_fragment_distribution().await
292    }
293
294    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
295        self.0.list_actor_states().await
296    }
297
298    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
299        self.0.list_actor_splits().await
300    }
301
302    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
303        let manifest = self.0.get_meta_snapshot_manifest().await?;
304        Ok(manifest.snapshot_metadata)
305    }
306
307    async fn list_sink_log_store_tables(
308        &self,
309    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
310        self.0.list_sink_log_store_tables().await
311    }
312
313    async fn set_system_param(
314        &self,
315        param: String,
316        value: Option<String>,
317    ) -> Result<Option<SystemParamsReader>> {
318        self.0.set_system_param(param, value).await
319    }
320
321    async fn get_session_params(&self) -> Result<SessionConfig> {
322        let session_config: SessionConfig =
323            serde_json::from_str(&self.0.get_session_params().await?)
324                .context("failed to parse session config")?;
325        Ok(session_config)
326    }
327
328    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
329        self.0.set_session_param(param, value).await
330    }
331
332    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
333        let ddl_progress = self.0.get_ddl_progress().await?;
334        Ok(ddl_progress)
335    }
336
337    async fn get_tables(
338        &self,
339        table_ids: Vec<TableId>,
340        include_dropped_tables: bool,
341    ) -> Result<HashMap<TableId, Table>> {
342        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
343        Ok(tables)
344    }
345
346    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>> {
347        let pinned_versions = self
348            .0
349            .risectl_get_pinned_versions_summary()
350            .await?
351            .summary
352            .unwrap()
353            .pinned_versions;
354        let ret = pinned_versions
355            .into_iter()
356            .map(|v| (v.context_id, v.min_pinned_id))
357            .collect();
358        Ok(ret)
359    }
360
361    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
362        self.0.get_current_version().await
363    }
364
365    async fn get_hummock_table_change_log(
366        &self,
367        start_epoch_inclusive: Option<u64>,
368        end_epoch_inclusive: Option<u64>,
369        table_ids: Option<HashSet<TableId>>,
370        exclude_empty: bool,
371        limit: Option<u32>,
372    ) -> Result<TableChangeLogs> {
373        self.0
374            .get_table_change_logs(
375                true,
376                start_epoch_inclusive,
377                end_epoch_inclusive,
378                table_ids,
379                exclude_empty,
380                limit,
381            )
382            .await
383    }
384
385    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
386        self.0
387            .risectl_get_checkpoint_hummock_version()
388            .await
389            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
390    }
391
392    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
393        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
394        self.0
395            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
396            .await
397    }
398
399    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
400        self.0.list_branched_object().await
401    }
402
403    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
404        self.0.risectl_list_compaction_group().await
405    }
406
407    async fn list_hummock_active_write_limits(
408        &self,
409    ) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
410        self.0.list_active_write_limit().await
411    }
412
413    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
414        self.0.list_hummock_meta_config().await
415    }
416
417    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
418        self.0.list_event_log().await
419    }
420
421    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
422        self.0.list_compact_task_assignment().await
423    }
424
425    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
426        self.0.list_worker_nodes(None).await
427    }
428
429    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
430        self.0.list_compact_task_progress().await
431    }
432
433    async fn apply_throttle(
434        &self,
435        throttle_target: PbThrottleTarget,
436        throttle_type: risingwave_pb::common::PbThrottleType,
437        id: u32,
438        rate_limit: Option<u32>,
439    ) -> Result<()> {
440        self.0
441            .apply_throttle(throttle_target, throttle_type, id, rate_limit)
442            .await
443            .map(|_| ())
444    }
445
446    async fn alter_fragment_parallelism(
447        &self,
448        fragment_ids: Vec<FragmentId>,
449        parallelism: Option<PbTableParallelism>,
450    ) -> Result<()> {
451        self.0
452            .alter_fragment_parallelism(fragment_ids, parallelism)
453            .await
454    }
455
456    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
457        self.0.get_cluster_recovery_status().await
458    }
459
460    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
461        self.0.get_cluster_limits().await
462    }
463
464    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
465        self.0.list_rate_limits().await
466    }
467
468    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
469        self.0.list_cdc_progress().await
470    }
471
472    async fn get_meta_store_endpoint(&self) -> Result<String> {
473        self.0.get_meta_store_endpoint().await
474    }
475
476    async fn alter_sink_props(
477        &self,
478        sink_id: SinkId,
479        changed_props: BTreeMap<String, String>,
480        changed_secret_refs: BTreeMap<String, PbSecretRef>,
481        connector_conn_ref: Option<ConnectionId>,
482    ) -> Result<()> {
483        self.0
484            .alter_sink_props(
485                sink_id,
486                changed_props,
487                changed_secret_refs,
488                connector_conn_ref,
489            )
490            .await
491    }
492
493    async fn alter_iceberg_table_props(
494        &self,
495        table_id: TableId,
496        sink_id: SinkId,
497        source_id: SourceId,
498        changed_props: BTreeMap<String, String>,
499        changed_secret_refs: BTreeMap<String, PbSecretRef>,
500        connector_conn_ref: Option<ConnectionId>,
501    ) -> Result<()> {
502        self.0
503            .alter_iceberg_table_props(
504                table_id,
505                sink_id,
506                source_id,
507                changed_props,
508                changed_secret_refs,
509                connector_conn_ref,
510            )
511            .await
512    }
513
514    async fn alter_source_connector_props(
515        &self,
516        source_id: SourceId,
517        changed_props: BTreeMap<String, String>,
518        changed_secret_refs: BTreeMap<String, PbSecretRef>,
519        connector_conn_ref: Option<ConnectionId>,
520    ) -> Result<()> {
521        self.0
522            .alter_source_connector_props(
523                source_id,
524                changed_props,
525                changed_secret_refs,
526                connector_conn_ref,
527            )
528            .await
529    }
530
531    async fn alter_connection_connector_props(
532        &self,
533        connection_id: u32,
534        changed_props: BTreeMap<String, String>,
535        changed_secret_refs: BTreeMap<String, PbSecretRef>,
536    ) -> Result<()> {
537        self.0
538            .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
539            .await
540    }
541
542    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
543        self.0.list_hosted_iceberg_tables().await
544    }
545
546    async fn get_fragment_by_id(
547        &self,
548        fragment_id: FragmentId,
549    ) -> Result<Option<FragmentDistribution>> {
550        self.0.get_fragment_by_id(fragment_id).await
551    }
552
553    async fn get_fragment_vnodes(
554        &self,
555        fragment_id: FragmentId,
556    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
557        self.0.get_fragment_vnodes(fragment_id).await
558    }
559
560    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
561        self.0.get_actor_vnodes(actor_id).await
562    }
563
564    fn worker_id(&self) -> WorkerId {
565        self.0.worker_id()
566    }
567
568    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
569        self.0.set_sync_log_store_aligned(job_id, aligned).await
570    }
571
572    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
573        self.0.compact_iceberg_table(sink_id).await
574    }
575
576    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
577        self.0.expire_iceberg_table_snapshots(sink_id).await
578    }
579
580    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
581        self.0.refresh(request).await
582    }
583
584    fn cluster_id(&self) -> &str {
585        self.0.cluster_id()
586    }
587
588    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
589        self.0.list_unmigrated_tables().await
590    }
591
592    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
593        self.0.list_refresh_table_states().await
594    }
595
596    async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>> {
597        self.0.list_iceberg_compaction_status().await
598    }
599}