risingwave_frontend/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
38use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
39use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
40use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
41use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
42use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
43use risingwave_pb::meta::{
44    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
45    RefreshRequest, RefreshResponse, list_sink_log_store_tables_response,
46};
47use risingwave_pb::secret::PbSecretRef;
48use risingwave_rpc_client::error::Result;
49use risingwave_rpc_client::{HummockMetaClient, MetaClient};
50
51use crate::catalog::{DatabaseId, FragmentId, SinkId};
52
53/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
54/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
55/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
56/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
57/// in this trait so that the mocking can be simplified.
58#[async_trait::async_trait]
59pub trait FrontendMetaClient: Send + Sync {
60    async fn try_unregister(&self);
61
62    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
63
64    async fn recover(&self) -> Result<()>;
65
66    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
67
68    async fn list_table_fragments(
69        &self,
70        table_ids: &[JobId],
71    ) -> Result<HashMap<JobId, TableFragmentInfo>>;
72
73    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
74
75    async fn list_fragment_distribution(
76        &self,
77        include_node: bool,
78    ) -> Result<Vec<FragmentDistribution>>;
79
80    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
81
82    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
83
84    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
85
86    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
87
88    async fn list_sink_log_store_tables(
89        &self,
90    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>>;
91
92    async fn set_system_param(
93        &self,
94        param: String,
95        value: Option<String>,
96    ) -> Result<Option<SystemParamsReader>>;
97
98    async fn get_session_params(&self) -> Result<SessionConfig>;
99
100    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
101
102    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
103
104    async fn get_tables(
105        &self,
106        table_ids: Vec<TableId>,
107        include_dropped_table: bool,
108    ) -> Result<HashMap<TableId, Table>>;
109
110    /// Returns vector of (`worker_id`, `min_pinned_version_id`)
111    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>>;
112
113    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
114
115    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
116
117    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
118
119    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
120
121    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
122
123    async fn list_hummock_active_write_limits(
124        &self,
125    ) -> Result<HashMap<CompactionGroupId, WriteLimit>>;
126
127    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
128
129    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
130    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
131
132    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
133
134    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
135
136    async fn apply_throttle(
137        &self,
138        throttle_target: PbThrottleTarget,
139        throttle_type: risingwave_pb::common::PbThrottleType,
140        id: u32,
141        rate_limit: Option<u32>,
142    ) -> Result<()>;
143
144    async fn alter_fragment_parallelism(
145        &self,
146        fragment_ids: Vec<FragmentId>,
147        parallelism: Option<PbTableParallelism>,
148    ) -> Result<()>;
149
150    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
151
152    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
153
154    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
155
156    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
157
158    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
159
160    async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>>;
161
162    async fn get_meta_store_endpoint(&self) -> Result<String>;
163
164    async fn alter_sink_props(
165        &self,
166        sink_id: SinkId,
167        changed_props: BTreeMap<String, String>,
168        changed_secret_refs: BTreeMap<String, PbSecretRef>,
169        connector_conn_ref: Option<ConnectionId>,
170    ) -> Result<()>;
171
172    async fn alter_iceberg_table_props(
173        &self,
174        table_id: TableId,
175        sink_id: SinkId,
176        source_id: SourceId,
177        changed_props: BTreeMap<String, String>,
178        changed_secret_refs: BTreeMap<String, PbSecretRef>,
179        connector_conn_ref: Option<ConnectionId>,
180    ) -> Result<()>;
181
182    async fn alter_source_connector_props(
183        &self,
184        source_id: SourceId,
185        changed_props: BTreeMap<String, String>,
186        changed_secret_refs: BTreeMap<String, PbSecretRef>,
187        connector_conn_ref: Option<ConnectionId>,
188    ) -> Result<()>;
189
190    async fn alter_connection_connector_props(
191        &self,
192        connection_id: u32,
193        changed_props: BTreeMap<String, String>,
194        changed_secret_refs: BTreeMap<String, PbSecretRef>,
195    ) -> Result<()>;
196
197    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
198
199    async fn get_fragment_by_id(
200        &self,
201        fragment_id: FragmentId,
202    ) -> Result<Option<FragmentDistribution>>;
203
204    async fn get_fragment_vnodes(
205        &self,
206        fragment_id: FragmentId,
207    ) -> Result<Vec<(ActorId, Vec<u32>)>>;
208
209    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
210
211    fn worker_id(&self) -> WorkerId;
212
213    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
214
215    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
216
217    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
218
219    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
220
221    fn cluster_id(&self) -> &str;
222
223    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
224}
225
226pub struct FrontendMetaClientImpl(pub MetaClient);
227
228#[async_trait::async_trait]
229impl FrontendMetaClient for FrontendMetaClientImpl {
230    async fn try_unregister(&self) {
231        self.0.try_unregister().await;
232    }
233
234    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
235        self.0.flush(database_id).await
236    }
237
238    async fn recover(&self) -> Result<()> {
239        self.0.recover().await
240    }
241
242    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
243        self.0.cancel_creating_jobs(infos).await
244    }
245
246    async fn list_table_fragments(
247        &self,
248        job_ids: &[JobId],
249    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
250        self.0.list_table_fragments(job_ids).await
251    }
252
253    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
254        self.0.list_streaming_job_states().await
255    }
256
257    async fn list_fragment_distribution(
258        &self,
259        include_node: bool,
260    ) -> Result<Vec<FragmentDistribution>> {
261        self.0.list_fragment_distributions(include_node).await
262    }
263
264    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
265        self.0.list_creating_fragment_distribution().await
266    }
267
268    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
269        self.0.list_actor_states().await
270    }
271
272    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
273        self.0.list_actor_splits().await
274    }
275
276    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
277        let manifest = self.0.get_meta_snapshot_manifest().await?;
278        Ok(manifest.snapshot_metadata)
279    }
280
281    async fn list_sink_log_store_tables(
282        &self,
283    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
284        self.0.list_sink_log_store_tables().await
285    }
286
287    async fn set_system_param(
288        &self,
289        param: String,
290        value: Option<String>,
291    ) -> Result<Option<SystemParamsReader>> {
292        self.0.set_system_param(param, value).await
293    }
294
295    async fn get_session_params(&self) -> Result<SessionConfig> {
296        let session_config: SessionConfig =
297            serde_json::from_str(&self.0.get_session_params().await?)
298                .context("failed to parse session config")?;
299        Ok(session_config)
300    }
301
302    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
303        self.0.set_session_param(param, value).await
304    }
305
306    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
307        let ddl_progress = self.0.get_ddl_progress().await?;
308        Ok(ddl_progress)
309    }
310
311    async fn get_tables(
312        &self,
313        table_ids: Vec<TableId>,
314        include_dropped_tables: bool,
315    ) -> Result<HashMap<TableId, Table>> {
316        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
317        Ok(tables)
318    }
319
320    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>> {
321        let pinned_versions = self
322            .0
323            .risectl_get_pinned_versions_summary()
324            .await?
325            .summary
326            .unwrap()
327            .pinned_versions;
328        let ret = pinned_versions
329            .into_iter()
330            .map(|v| (v.context_id, v.min_pinned_id))
331            .collect();
332        Ok(ret)
333    }
334
335    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
336        self.0.get_current_version().await
337    }
338
339    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
340        self.0
341            .risectl_get_checkpoint_hummock_version()
342            .await
343            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
344    }
345
346    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
347        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
348        self.0
349            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
350            .await
351    }
352
353    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
354        self.0.list_branched_object().await
355    }
356
357    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
358        self.0.risectl_list_compaction_group().await
359    }
360
361    async fn list_hummock_active_write_limits(
362        &self,
363    ) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
364        self.0.list_active_write_limit().await
365    }
366
367    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
368        self.0.list_hummock_meta_config().await
369    }
370
371    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
372        self.0.list_event_log().await
373    }
374
375    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
376        self.0.list_compact_task_assignment().await
377    }
378
379    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
380        self.0.list_worker_nodes(None).await
381    }
382
383    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
384        self.0.list_compact_task_progress().await
385    }
386
387    async fn apply_throttle(
388        &self,
389        throttle_target: PbThrottleTarget,
390        throttle_type: risingwave_pb::common::PbThrottleType,
391        id: u32,
392        rate_limit: Option<u32>,
393    ) -> Result<()> {
394        self.0
395            .apply_throttle(throttle_target, throttle_type, id, rate_limit)
396            .await
397            .map(|_| ())
398    }
399
400    async fn alter_fragment_parallelism(
401        &self,
402        fragment_ids: Vec<FragmentId>,
403        parallelism: Option<PbTableParallelism>,
404    ) -> Result<()> {
405        self.0
406            .alter_fragment_parallelism(fragment_ids, parallelism)
407            .await
408    }
409
410    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
411        self.0.get_cluster_recovery_status().await
412    }
413
414    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
415        self.0.get_cluster_limits().await
416    }
417
418    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
419        self.0.list_rate_limits().await
420    }
421
422    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
423        self.0.list_cdc_progress().await
424    }
425
426    async fn get_meta_store_endpoint(&self) -> Result<String> {
427        self.0.get_meta_store_endpoint().await
428    }
429
430    async fn alter_sink_props(
431        &self,
432        sink_id: SinkId,
433        changed_props: BTreeMap<String, String>,
434        changed_secret_refs: BTreeMap<String, PbSecretRef>,
435        connector_conn_ref: Option<ConnectionId>,
436    ) -> Result<()> {
437        self.0
438            .alter_sink_props(
439                sink_id,
440                changed_props,
441                changed_secret_refs,
442                connector_conn_ref,
443            )
444            .await
445    }
446
447    async fn alter_iceberg_table_props(
448        &self,
449        table_id: TableId,
450        sink_id: SinkId,
451        source_id: SourceId,
452        changed_props: BTreeMap<String, String>,
453        changed_secret_refs: BTreeMap<String, PbSecretRef>,
454        connector_conn_ref: Option<ConnectionId>,
455    ) -> Result<()> {
456        self.0
457            .alter_iceberg_table_props(
458                table_id,
459                sink_id,
460                source_id,
461                changed_props,
462                changed_secret_refs,
463                connector_conn_ref,
464            )
465            .await
466    }
467
468    async fn alter_source_connector_props(
469        &self,
470        source_id: SourceId,
471        changed_props: BTreeMap<String, String>,
472        changed_secret_refs: BTreeMap<String, PbSecretRef>,
473        connector_conn_ref: Option<ConnectionId>,
474    ) -> Result<()> {
475        self.0
476            .alter_source_connector_props(
477                source_id,
478                changed_props,
479                changed_secret_refs,
480                connector_conn_ref,
481            )
482            .await
483    }
484
485    async fn alter_connection_connector_props(
486        &self,
487        connection_id: u32,
488        changed_props: BTreeMap<String, String>,
489        changed_secret_refs: BTreeMap<String, PbSecretRef>,
490    ) -> Result<()> {
491        self.0
492            .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
493            .await
494    }
495
496    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
497        self.0.list_hosted_iceberg_tables().await
498    }
499
500    async fn get_fragment_by_id(
501        &self,
502        fragment_id: FragmentId,
503    ) -> Result<Option<FragmentDistribution>> {
504        self.0.get_fragment_by_id(fragment_id).await
505    }
506
507    async fn get_fragment_vnodes(
508        &self,
509        fragment_id: FragmentId,
510    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
511        self.0.get_fragment_vnodes(fragment_id).await
512    }
513
514    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
515        self.0.get_actor_vnodes(actor_id).await
516    }
517
518    fn worker_id(&self) -> WorkerId {
519        self.0.worker_id()
520    }
521
522    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
523        self.0.set_sync_log_store_aligned(job_id, aligned).await
524    }
525
526    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
527        self.0.compact_iceberg_table(sink_id).await
528    }
529
530    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
531        self.0.expire_iceberg_table_snapshots(sink_id).await
532    }
533
534    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
535        self.0.refresh(request).await
536    }
537
538    fn cluster_id(&self) -> &str {
539        self.0.cluster_id()
540    }
541
542    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
543        self.0.list_unmigrated_tables().await
544    }
545
546    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
547        self.0.list_refresh_table_states().await
548    }
549
550    async fn list_iceberg_compaction_status(&self) -> Result<Vec<IcebergCompactionStatus>> {
551        self.0.list_iceberg_compaction_status().await
552    }
553}