risingwave_frontend/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId};
19use risingwave_common::session_config::SessionConfig;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::util::cluster_limit::ClusterLimit;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
24use risingwave_pb::backup_service::MetaSnapshotMetadata;
25use risingwave_pb::catalog::Table;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
31};
32use risingwave_pb::id::ActorId;
33use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
34use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
35use risingwave_pb::meta::list_actor_states_response::ActorState;
36use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
37use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
38use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
39use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
40use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
41use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
42use risingwave_pb::meta::{
43    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
44    RefreshRequest, RefreshResponse, list_sink_log_store_tables_response,
45};
46use risingwave_pb::secret::PbSecretRef;
47use risingwave_rpc_client::error::Result;
48use risingwave_rpc_client::{HummockMetaClient, MetaClient};
49
50use crate::catalog::{DatabaseId, FragmentId, SinkId};
51
52/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
53/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
54/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
55/// Hence instead of to mock all rpc of `MetaClient` in tests, we aggregate those "direct" rpc
56/// in this trait so that the mocking can be simplified.
57#[async_trait::async_trait]
58pub trait FrontendMetaClient: Send + Sync {
59    async fn try_unregister(&self);
60
61    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
62
63    async fn recover(&self) -> Result<()>;
64
65    async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
66
67    async fn list_table_fragments(
68        &self,
69        table_ids: &[JobId],
70    ) -> Result<HashMap<JobId, TableFragmentInfo>>;
71
72    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
73
74    async fn list_fragment_distribution(
75        &self,
76        include_node: bool,
77    ) -> Result<Vec<FragmentDistribution>>;
78
79    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
80
81    async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
82
83    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
84
85    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
86
87    async fn list_sink_log_store_tables(
88        &self,
89    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>>;
90
91    async fn set_system_param(
92        &self,
93        param: String,
94        value: Option<String>,
95    ) -> Result<Option<SystemParamsReader>>;
96
97    async fn get_session_params(&self) -> Result<SessionConfig>;
98
99    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
100
101    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
102
103    async fn get_tables(
104        &self,
105        table_ids: Vec<TableId>,
106        include_dropped_table: bool,
107    ) -> Result<HashMap<TableId, Table>>;
108
109    /// Returns vector of (`worker_id`, `min_pinned_version_id`)
110    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>>;
111
112    async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
113
114    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
115
116    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
117
118    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
119
120    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
121
122    async fn list_hummock_active_write_limits(
123        &self,
124    ) -> Result<HashMap<CompactionGroupId, WriteLimit>>;
125
126    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
127
128    async fn list_event_log(&self) -> Result<Vec<EventLog>>;
129    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
130
131    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
132
133    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
134
135    async fn apply_throttle(
136        &self,
137        throttle_target: PbThrottleTarget,
138        throttle_type: risingwave_pb::common::PbThrottleType,
139        id: u32,
140        rate_limit: Option<u32>,
141    ) -> Result<()>;
142
143    async fn alter_fragment_parallelism(
144        &self,
145        fragment_ids: Vec<FragmentId>,
146        parallelism: Option<PbTableParallelism>,
147    ) -> Result<()>;
148
149    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
150
151    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
152
153    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
154
155    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>>;
156
157    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>>;
158
159    async fn get_meta_store_endpoint(&self) -> Result<String>;
160
161    async fn alter_sink_props(
162        &self,
163        sink_id: SinkId,
164        changed_props: BTreeMap<String, String>,
165        changed_secret_refs: BTreeMap<String, PbSecretRef>,
166        connector_conn_ref: Option<ConnectionId>,
167    ) -> Result<()>;
168
169    async fn alter_iceberg_table_props(
170        &self,
171        table_id: TableId,
172        sink_id: SinkId,
173        source_id: SourceId,
174        changed_props: BTreeMap<String, String>,
175        changed_secret_refs: BTreeMap<String, PbSecretRef>,
176        connector_conn_ref: Option<ConnectionId>,
177    ) -> Result<()>;
178
179    async fn alter_source_connector_props(
180        &self,
181        source_id: SourceId,
182        changed_props: BTreeMap<String, String>,
183        changed_secret_refs: BTreeMap<String, PbSecretRef>,
184        connector_conn_ref: Option<ConnectionId>,
185    ) -> Result<()>;
186
187    async fn alter_connection_connector_props(
188        &self,
189        connection_id: u32,
190        changed_props: BTreeMap<String, String>,
191        changed_secret_refs: BTreeMap<String, PbSecretRef>,
192    ) -> Result<()>;
193
194    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
195
196    async fn get_fragment_by_id(
197        &self,
198        fragment_id: FragmentId,
199    ) -> Result<Option<FragmentDistribution>>;
200
201    async fn get_fragment_vnodes(
202        &self,
203        fragment_id: FragmentId,
204    ) -> Result<Vec<(ActorId, Vec<u32>)>>;
205
206    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>>;
207
208    fn worker_id(&self) -> WorkerId;
209
210    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()>;
211
212    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
213
214    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()>;
215
216    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
217
218    fn cluster_id(&self) -> &str;
219
220    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>>;
221}
222
223pub struct FrontendMetaClientImpl(pub MetaClient);
224
225#[async_trait::async_trait]
226impl FrontendMetaClient for FrontendMetaClientImpl {
227    async fn try_unregister(&self) {
228        self.0.try_unregister().await;
229    }
230
231    async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
232        self.0.flush(database_id).await
233    }
234
235    async fn recover(&self) -> Result<()> {
236        self.0.recover().await
237    }
238
239    async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
240        self.0.cancel_creating_jobs(infos).await
241    }
242
243    async fn list_table_fragments(
244        &self,
245        job_ids: &[JobId],
246    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
247        self.0.list_table_fragments(job_ids).await
248    }
249
250    async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
251        self.0.list_streaming_job_states().await
252    }
253
254    async fn list_fragment_distribution(
255        &self,
256        include_node: bool,
257    ) -> Result<Vec<FragmentDistribution>> {
258        self.0.list_fragment_distributions(include_node).await
259    }
260
261    async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
262        self.0.list_creating_fragment_distribution().await
263    }
264
265    async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
266        self.0.list_actor_states().await
267    }
268
269    async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
270        self.0.list_actor_splits().await
271    }
272
273    async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
274        let manifest = self.0.get_meta_snapshot_manifest().await?;
275        Ok(manifest.snapshot_metadata)
276    }
277
278    async fn list_sink_log_store_tables(
279        &self,
280    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
281        self.0.list_sink_log_store_tables().await
282    }
283
284    async fn set_system_param(
285        &self,
286        param: String,
287        value: Option<String>,
288    ) -> Result<Option<SystemParamsReader>> {
289        self.0.set_system_param(param, value).await
290    }
291
292    async fn get_session_params(&self) -> Result<SessionConfig> {
293        let session_config: SessionConfig =
294            serde_json::from_str(&self.0.get_session_params().await?)
295                .context("failed to parse session config")?;
296        Ok(session_config)
297    }
298
299    async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
300        self.0.set_session_param(param, value).await
301    }
302
303    async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
304        let ddl_progress = self.0.get_ddl_progress().await?;
305        Ok(ddl_progress)
306    }
307
308    async fn get_tables(
309        &self,
310        table_ids: Vec<TableId>,
311        include_dropped_tables: bool,
312    ) -> Result<HashMap<TableId, Table>> {
313        let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
314        Ok(tables)
315    }
316
317    async fn list_hummock_pinned_versions(&self) -> Result<Vec<(WorkerId, HummockVersionId)>> {
318        let pinned_versions = self
319            .0
320            .risectl_get_pinned_versions_summary()
321            .await?
322            .summary
323            .unwrap()
324            .pinned_versions;
325        let ret = pinned_versions
326            .into_iter()
327            .map(|v| (v.context_id, v.min_pinned_id))
328            .collect();
329        Ok(ret)
330    }
331
332    async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
333        self.0.get_current_version().await
334    }
335
336    async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
337        self.0
338            .risectl_get_checkpoint_hummock_version()
339            .await
340            .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
341    }
342
343    async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
344        // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
345        self.0
346            .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
347            .await
348    }
349
350    async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
351        self.0.list_branched_object().await
352    }
353
354    async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
355        self.0.risectl_list_compaction_group().await
356    }
357
358    async fn list_hummock_active_write_limits(
359        &self,
360    ) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
361        self.0.list_active_write_limit().await
362    }
363
364    async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
365        self.0.list_hummock_meta_config().await
366    }
367
368    async fn list_event_log(&self) -> Result<Vec<EventLog>> {
369        self.0.list_event_log().await
370    }
371
372    async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
373        self.0.list_compact_task_assignment().await
374    }
375
376    async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
377        self.0.list_worker_nodes(None).await
378    }
379
380    async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
381        self.0.list_compact_task_progress().await
382    }
383
384    async fn apply_throttle(
385        &self,
386        throttle_target: PbThrottleTarget,
387        throttle_type: risingwave_pb::common::PbThrottleType,
388        id: u32,
389        rate_limit: Option<u32>,
390    ) -> Result<()> {
391        self.0
392            .apply_throttle(throttle_target, throttle_type, id, rate_limit)
393            .await
394            .map(|_| ())
395    }
396
397    async fn alter_fragment_parallelism(
398        &self,
399        fragment_ids: Vec<FragmentId>,
400        parallelism: Option<PbTableParallelism>,
401    ) -> Result<()> {
402        self.0
403            .alter_fragment_parallelism(fragment_ids, parallelism)
404            .await
405    }
406
407    async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
408        self.0.get_cluster_recovery_status().await
409    }
410
411    async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
412        self.0.get_cluster_limits().await
413    }
414
415    async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
416        self.0.list_rate_limits().await
417    }
418
419    async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
420        self.0.list_cdc_progress().await
421    }
422
423    async fn get_meta_store_endpoint(&self) -> Result<String> {
424        self.0.get_meta_store_endpoint().await
425    }
426
427    async fn alter_sink_props(
428        &self,
429        sink_id: SinkId,
430        changed_props: BTreeMap<String, String>,
431        changed_secret_refs: BTreeMap<String, PbSecretRef>,
432        connector_conn_ref: Option<ConnectionId>,
433    ) -> Result<()> {
434        self.0
435            .alter_sink_props(
436                sink_id,
437                changed_props,
438                changed_secret_refs,
439                connector_conn_ref,
440            )
441            .await
442    }
443
444    async fn alter_iceberg_table_props(
445        &self,
446        table_id: TableId,
447        sink_id: SinkId,
448        source_id: SourceId,
449        changed_props: BTreeMap<String, String>,
450        changed_secret_refs: BTreeMap<String, PbSecretRef>,
451        connector_conn_ref: Option<ConnectionId>,
452    ) -> Result<()> {
453        self.0
454            .alter_iceberg_table_props(
455                table_id,
456                sink_id,
457                source_id,
458                changed_props,
459                changed_secret_refs,
460                connector_conn_ref,
461            )
462            .await
463    }
464
465    async fn alter_source_connector_props(
466        &self,
467        source_id: SourceId,
468        changed_props: BTreeMap<String, String>,
469        changed_secret_refs: BTreeMap<String, PbSecretRef>,
470        connector_conn_ref: Option<ConnectionId>,
471    ) -> Result<()> {
472        self.0
473            .alter_source_connector_props(
474                source_id,
475                changed_props,
476                changed_secret_refs,
477                connector_conn_ref,
478            )
479            .await
480    }
481
482    async fn alter_connection_connector_props(
483        &self,
484        connection_id: u32,
485        changed_props: BTreeMap<String, String>,
486        changed_secret_refs: BTreeMap<String, PbSecretRef>,
487    ) -> Result<()> {
488        self.0
489            .alter_connection_connector_props(connection_id, changed_props, changed_secret_refs)
490            .await
491    }
492
493    async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
494        self.0.list_hosted_iceberg_tables().await
495    }
496
497    async fn get_fragment_by_id(
498        &self,
499        fragment_id: FragmentId,
500    ) -> Result<Option<FragmentDistribution>> {
501        self.0.get_fragment_by_id(fragment_id).await
502    }
503
504    async fn get_fragment_vnodes(
505        &self,
506        fragment_id: FragmentId,
507    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
508        self.0.get_fragment_vnodes(fragment_id).await
509    }
510
511    async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
512        self.0.get_actor_vnodes(actor_id).await
513    }
514
515    fn worker_id(&self) -> WorkerId {
516        self.0.worker_id()
517    }
518
519    async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
520        self.0.set_sync_log_store_aligned(job_id, aligned).await
521    }
522
523    async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
524        self.0.compact_iceberg_table(sink_id).await
525    }
526
527    async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
528        self.0.expire_iceberg_table_snapshots(sink_id).await
529    }
530
531    async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
532        self.0.refresh(request).await
533    }
534
535    fn cluster_id(&self) -> &str {
536        self.0.cluster_id()
537    }
538
539    async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
540        self.0.list_unmigrated_tables().await
541    }
542
543    async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
544        self.0.list_refresh_table_states().await
545    }
546}