risingwave_frontend/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::change_log::TableChangeLogs;
23use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
24use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
25use risingwave_pb::backup_service::{BackupJobStatus, MetaSnapshotMetadata};
26use risingwave_pb::catalog::Table;
27use risingwave_pb::common::WorkerNode;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig as PbMutableConfig;
30use risingwave_pb::hummock::write_limits::WriteLimit;
31use risingwave_pb::hummock::{
32    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
33};
34use risingwave_pb::id::ActorId;
35use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
36use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
37use risingwave_pb::meta::list_actor_states_response::ActorState;
38use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
39use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
40use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
41use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
42use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
43use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
44use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
45use risingwave_pb::meta::{
46    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
47    RefreshRequest, RefreshResponse, list_sink_log_store_tables_response,
48};
49use risingwave_pb::secret::PbSecretRef;
50use risingwave_rpc_client::error::Result;
51use risingwave_rpc_client::{HummockMetaClient, MetaClient};
52
53use crate::catalog::{DatabaseId, FragmentId, SinkId};
54
55/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
56/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
57/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
58/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
59/// in this trait so that the mocking can be simplified.
60#[async_trait::async_trait]
61pub trait FrontendMetaClient: Send + Sync {
62    async fn try_unregister(&self);
63
64    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
65
66    async fn backup_meta(&self, remarks: Option<String>) -> Result<u64>;
67    async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)>;
68    async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()>;
69
70    async fn recover(&self) -> Result<()>;
71
72    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
73
74    async fn list_table_fragments(
75        &self,
76        table_ids: &[JobId],
77    ) -> Result<HashMap<JobId, TableFragmentInfo>>;
78
79    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
80
81    async fn list_fragment_distribution(
82        &self,
83        include_node: bool,
84    ) -> Result<Vec<FragmentDistribution>>;
85
86    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
87
88    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
89
90    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
91
92    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
93
94    async fn list_sink_log_store_tables(
95        &self,
96    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>>;
97
98    async fn set_system_param(
99        &self,
100        param: String,
101        value: Option<String>,
102    ) -> Result<Option<SystemParamsReader>>;
103
104    async fn get_session_params(&self) -> Result<SessionConfig>;
105
106    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
107
108    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
109
110    async fn get_tables(
111        &self,
112        table_ids: Vec<TableId>,
113        include_dropped_table: bool,
114    ) -> Result<HashMap<TableId, Table>>;
115
116    /// Returns vector of (`worker_id`, `min_pinned_version_id`)
117    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>>;
118
119    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
120
121    async fn get_hummock_table_change_log(
122        &self,
123        start_epoch_inclusive: Option<u64>,
124        end_epoch_inclusive: Option<u64>,
125        table_ids: Option<HashSet<TableId>>,
126        exclude_empty: bool,
127        limit: Option<u32>,
128    ) -> Result<TableChangeLogs>;
129
130    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
131
132    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
133
134    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
135
136    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
137
138    async fn list_hummock_active_write_limits(
139        &self,
140    ) -> Result<HashMap<CompactionGroupId, WriteLimit>>;
141
142    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
143
144    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
145    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
146
147    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
148
149    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
150
151    async fn apply_throttle(
152        &self,
153        throttle_target: PbThrottleTarget,
154        throttle_type: risingwave_pb::common::PbThrottleType,
155        id: u32,
156        rate_limit: Option<u32>,
157    ) -> Result<()>;
158
159    async fn alter_fragment_parallelism(
160        &self,
161        fragment_ids: Vec<FragmentId>,
162        parallelism: Option<PbTableParallelism>,
163    ) -> Result<()>;
164
165    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
166
167    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
168
169    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
170
171    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
172
173    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
174
175    async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>>;
176
177    async fn get_meta_store_endpoint(&self) -> Result<String>;
178
179    async fn alter_sink_props(
180        &self,
181        sink_id: SinkId,
182        changed_props: BTreeMap<String, String>,
183        changed_secret_refs: BTreeMap<String, PbSecretRef>,
184        connector_conn_ref: Option<ConnectionId>,
185    ) -> Result<()>;
186
187    async fn alter_iceberg_table_props(
188        &self,
189        table_id: TableId,
190        sink_id: SinkId,
191        source_id: SourceId,
192        changed_props: BTreeMap<String, String>,
193        changed_secret_refs: BTreeMap<String, PbSecretRef>,
194        connector_conn_ref: Option<ConnectionId>,
195    ) -> Result<()>;
196
197    async fn alter_source_connector_props(
198        &self,
199        source_id: SourceId,
200        changed_props: BTreeMap<String, String>,
201        changed_secret_refs: BTreeMap<String, PbSecretRef>,
202        connector_conn_ref: Option<ConnectionId>,
203    ) -> Result<()>;
204
205    async fn alter_connection_connector_props(
206        &self,
207        connection_id: u32,
208        changed_props: BTreeMap<String, String>,
209        changed_secret_refs: BTreeMap<String, PbSecretRef>,
210    ) -> Result<()>;
211
212    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
213
214    async fn get_fragment_by_id(
215        &self,
216        fragment_id: FragmentId,
217    ) -> Result<Option<FragmentDistribution>>;
218
219    async fn get_fragment_vnodes(
220        &self,
221        fragment_id: FragmentId,
222    ) -> Result<Vec<(ActorId, Vec<u32>)>>;
223
224    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
225
226    fn worker_id(&self) -> WorkerId;
227
228    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
229
230    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
231
232    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
233
234    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
235
236    fn cluster_id(&self) -> &str;
237
238    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
239
240    async fn update_compaction_config(
241        &self,
242        compaction_group_ids: Vec<CompactionGroupId>,
243        configs: Vec<PbMutableConfig>,
244    ) -> Result<()>;
245}
246
247pub struct FrontendMetaClientImpl(pub MetaClient);
248
249#[async_trait::async_trait]
250impl FrontendMetaClient for FrontendMetaClientImpl {
251    async fn try_unregister(&self) {
252        self.0.try_unregister().await;
253    }
254
255    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
256        self.0.flush(database_id).await
257    }
258
259    async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
260        self.0.backup_meta(remarks).await
261    }
262
263    async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
264        self.0.get_backup_job_status(job_id).await
265    }
266
267    async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
268        self.0.delete_meta_snapshot(snapshot_ids).await
269    }
270
271    async fn recover(&self) -> Result<()> {
272        self.0.recover().await
273    }
274
275    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
276        self.0.cancel_creating_jobs(infos).await
277    }
278
279    async fn list_table_fragments(
280        &self,
281        job_ids: &[JobId],
282    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
283        self.0.list_table_fragments(job_ids).await
284    }
285
286    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
287        self.0.list_streaming_job_states().await
288    }
289
290    async fn list_fragment_distribution(
291        &self,
292        include_node: bool,
293    ) -> Result<Vec<FragmentDistribution>> {
294        self.0.list_fragment_distributions(include_node).await
295    }
296
297    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
298        self.0.list_creating_fragment_distribution().await
299    }
300
301    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
302        self.0.list_actor_states().await
303    }
304
305    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
306        self.0.list_actor_splits().await
307    }
308
309    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
310        let manifest = self.0.get_meta_snapshot_manifest().await?;
311        Ok(manifest.snapshot_metadata)
312    }
313
314    async fn list_sink_log_store_tables(
315        &self,
316    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
317        self.0.list_sink_log_store_tables().await
318    }
319
320    async fn set_system_param(
321        &self,
322        param: String,
323        value: Option<String>,
324    ) -> Result<Option<SystemParamsReader>> {
325        self.0.set_system_param(param, value).await
326    }
327
328    async fn get_session_params(&self) -> Result<SessionConfig> {
329        let session_config: SessionConfig =
330            serde_json::from_str(&self.0.get_session_params().await?)
331                .context("failed to parse session config")?;
332        Ok(session_config)
333    }
334
335    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
336        self.0.set_session_param(param, value).await
337    }
338
339    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
340        let ddl_progress = self.0.get_ddl_progress().await?;
341        Ok(ddl_progress)
342    }
343
344    async fn get_tables(
345        &self,
346        table_ids: Vec<TableId>,
347        include_dropped_tables: bool,
348    ) -> Result<HashMap<TableId, Table>> {
349        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
350        Ok(tables)
351    }
352
353    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>> {
354        let pinned_versions = self
355            .0
356            .risectl_get_pinned_versions_summary()
357            .await?
358            .summary
359            .unwrap()
360            .pinned_versions;
361        let ret = pinned_versions
362            .into_iter()
363            .map(|v| (v.context_id, v.min_pinned_id))
364            .collect();
365        Ok(ret)
366    }
367
368    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
369        self.0.get_current_version().await
370    }
371
372    async fn get_hummock_table_change_log(
373        &self,
374        start_epoch_inclusive: Option<u64>,
375        end_epoch_inclusive: Option<u64>,
376        table_ids: Option<HashSet<TableId>>,
377        exclude_empty: bool,
378        limit: Option<u32>,
379    ) -> Result<TableChangeLogs> {
380        self.0
381            .get_table_change_logs(
382                true,
383                start_epoch_inclusive,
384                end_epoch_inclusive,
385                table_ids,
386                exclude_empty,
387                limit,
388            )
389            .await
390    }
391
392    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
393        self.0
394            .risectl_get_checkpoint_hummock_version()
395            .await
396            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
397    }
398
399    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
400        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
401        self.0
402            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
403            .await
404    }
405
406    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
407        self.0.list_branched_object().await
408    }
409
410    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
411        self.0.risectl_list_compaction_group().await
412    }
413
414    async fn list_hummock_active_write_limits(
415        &self,
416    ) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
417        self.0.list_active_write_limit().await
418    }
419
420    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
421        self.0.list_hummock_meta_config().await
422    }
423
424    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
425        self.0.list_event_log().await
426    }
427
428    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
429        self.0.list_compact_task_assignment().await
430    }
431
432    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
433        self.0.list_worker_nodes(None).await
434    }
435
436    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
437        self.0.list_compact_task_progress().await
438    }
439
440    async fn apply_throttle(
441        &self,
442        throttle_target: PbThrottleTarget,
443        throttle_type: risingwave_pb::common::PbThrottleType,
444        id: u32,
445        rate_limit: Option<u32>,
446    ) -> Result<()> {
447        self.0
448            .apply_throttle(throttle_target, throttle_type, id, rate_limit)
449            .await
450            .map(|_| ())
451    }
452
453    async fn alter_fragment_parallelism(
454        &self,
455        fragment_ids: Vec<FragmentId>,
456        parallelism: Option<PbTableParallelism>,
457    ) -> Result<()> {
458        self.0
459            .alter_fragment_parallelism(fragment_ids, parallelism)
460            .await
461    }
462
463    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
464        self.0.get_cluster_recovery_status().await
465    }
466
467    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
468        self.0.get_cluster_limits().await
469    }
470
471    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
472        self.0.list_rate_limits().await
473    }
474
475    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
476        self.0.list_cdc_progress().await
477    }
478
479    async fn get_meta_store_endpoint(&self) -> Result<String> {
480        self.0.get_meta_store_endpoint().await
481    }
482
483    async fn alter_sink_props(
484        &self,
485        sink_id: SinkId,
486        changed_props: BTreeMap<String, String>,
487        changed_secret_refs: BTreeMap<String, PbSecretRef>,
488        connector_conn_ref: Option<ConnectionId>,
489    ) -> Result<()> {
490        self.0
491            .alter_sink_props(
492                sink_id,
493                changed_props,
494                changed_secret_refs,
495                connector_conn_ref,
496            )
497            .await
498    }
499
500    async fn alter_iceberg_table_props(
501        &self,
502        table_id: TableId,
503        sink_id: SinkId,
504        source_id: SourceId,
505        changed_props: BTreeMap<String, String>,
506        changed_secret_refs: BTreeMap<String, PbSecretRef>,
507        connector_conn_ref: Option<ConnectionId>,
508    ) -> Result<()> {
509        self.0
510            .alter_iceberg_table_props(
511                table_id,
512                sink_id,
513                source_id,
514                changed_props,
515                changed_secret_refs,
516                connector_conn_ref,
517            )
518            .await
519    }
520
521    async fn alter_source_connector_props(
522        &self,
523        source_id: SourceId,
524        changed_props: BTreeMap<String, String>,
525        changed_secret_refs: BTreeMap<String, PbSecretRef>,
526        connector_conn_ref: Option<ConnectionId>,
527    ) -> Result<()> {
528        self.0
529            .alter_source_connector_props(
530                source_id,
531                changed_props,
532                changed_secret_refs,
533                connector_conn_ref,
534            )
535            .await
536    }
537
538    async fn alter_connection_connector_props(
539        &self,
540        connection_id: u32,
541        changed_props: BTreeMap<String, String>,
542        changed_secret_refs: BTreeMap<String, PbSecretRef>,
543    ) -> Result<()> {
544        self.0
545            .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
546            .await
547    }
548
549    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
550        self.0.list_hosted_iceberg_tables().await
551    }
552
553    async fn get_fragment_by_id(
554        &self,
555        fragment_id: FragmentId,
556    ) -> Result<Option<FragmentDistribution>> {
557        self.0.get_fragment_by_id(fragment_id).await
558    }
559
560    async fn get_fragment_vnodes(
561        &self,
562        fragment_id: FragmentId,
563    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
564        self.0.get_fragment_vnodes(fragment_id).await
565    }
566
567    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
568        self.0.get_actor_vnodes(actor_id).await
569    }
570
571    fn worker_id(&self) -> WorkerId {
572        self.0.worker_id()
573    }
574
575    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
576        self.0.set_sync_log_store_aligned(job_id, aligned).await
577    }
578
579    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
580        self.0.compact_iceberg_table(sink_id).await
581    }
582
583    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
584        self.0.expire_iceberg_table_snapshots(sink_id).await
585    }
586
587    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
588        self.0.refresh(request).await
589    }
590
591    fn cluster_id(&self) -> &str {
592        self.0.cluster_id()
593    }
594
595    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
596        self.0.list_unmigrated_tables().await
597    }
598
599    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
600        self.0.list_refresh_table_states().await
601    }
602
603    async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>> {
604        self.0.list_iceberg_compaction_status().await
605    }
606
607    async fn update_compaction_config(
608        &self,
609        compaction_group_ids: Vec<CompactionGroupId>,
610        configs: Vec<PbMutableConfig>,
611    ) -> Result<()> {
612        self.0
613            .risectl_update_compaction_config(&compaction_group_ids, &configs)
614            .await
615    }
616}