risingwave_frontend/
meta_client.rs1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::Context;
18use risingwave_common::session_config::SessionConfig;
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_common::util::cluster_limit::ClusterLimit;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
23use risingwave_pb::backup_service::MetaSnapshotMetadata;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::write_limits::WriteLimit;
28use risingwave_pb::hummock::{
29 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
30};
31use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
32use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
33use risingwave_pb::meta::list_actor_states_response::ActorState;
34use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
35use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
36use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
37use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
38use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
39use risingwave_pb::meta::{EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus};
40use risingwave_pb::secret::PbSecretRef;
41use risingwave_rpc_client::error::Result;
42use risingwave_rpc_client::{HummockMetaClient, MetaClient};
43
44use crate::catalog::DatabaseId;
45
46#[async_trait::async_trait]
52pub trait FrontendMetaClient: Send + Sync {
53 async fn try_unregister(&self);
54
55 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;
56
57 async fn wait(&self) -> Result<()>;
58
59 async fn recover(&self) -> Result<()>;
60
61 async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;
62
63 async fn list_table_fragments(
64 &self,
65 table_ids: &[u32],
66 ) -> Result<HashMap<u32, TableFragmentInfo>>;
67
68 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
69
70 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
71
72 async fn list_creating_stream_scan_fragment_distribution(
73 &self,
74 ) -> Result<Vec<FragmentDistribution>>;
75
76 async fn list_actor_states(&self) -> Result<Vec<ActorState>>;
77
78 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;
79
80 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;
81
82 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
83
84 async fn get_system_params(&self) -> Result<SystemParamsReader>;
85
86 async fn set_system_param(
87 &self,
88 param: String,
89 value: Option<String>,
90 ) -> Result<Option<SystemParamsReader>>;
91
92 async fn get_session_params(&self) -> Result<SessionConfig>;
93
94 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;
95
96 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
97
98 async fn get_tables(
99 &self,
100 table_ids: &[u32],
101 include_dropped_table: bool,
102 ) -> Result<HashMap<u32, Table>>;
103
104 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;
106
107 async fn get_hummock_current_version(&self) -> Result<HummockVersion>;
108
109 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
110
111 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;
112
113 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
114
115 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;
116
117 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;
118
119 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
120
121 async fn list_event_log(&self) -> Result<Vec<EventLog>>;
122 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
123
124 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
125
126 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>>;
127
128 async fn apply_throttle(
129 &self,
130 kind: PbThrottleTarget,
131 id: u32,
132 rate_limit: Option<u32>,
133 ) -> Result<()>;
134
135 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
136
137 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
138
139 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
140
141 async fn get_meta_store_endpoint(&self) -> Result<String>;
142
143 async fn alter_sink_props(
144 &self,
145 sink_id: u32,
146 changed_props: BTreeMap<String, String>,
147 changed_secret_refs: BTreeMap<String, PbSecretRef>,
148 connector_conn_ref: Option<u32>,
149 ) -> Result<()>;
150
151 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>>;
152
153 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>>;
154
155 fn worker_id(&self) -> u32;
156}
157
158pub struct FrontendMetaClientImpl(pub MetaClient);
159
160#[async_trait::async_trait]
161impl FrontendMetaClient for FrontendMetaClientImpl {
162 async fn try_unregister(&self) {
163 self.0.try_unregister().await;
164 }
165
166 async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
167 self.0.flush(database_id).await
168 }
169
170 async fn wait(&self) -> Result<()> {
171 self.0.wait().await
172 }
173
174 async fn recover(&self) -> Result<()> {
175 self.0.recover().await
176 }
177
178 async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
179 self.0.cancel_creating_jobs(infos).await
180 }
181
182 async fn list_table_fragments(
183 &self,
184 table_ids: &[u32],
185 ) -> Result<HashMap<u32, TableFragmentInfo>> {
186 self.0.list_table_fragments(table_ids).await
187 }
188
189 async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
190 self.0.list_streaming_job_states().await
191 }
192
193 async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
194 self.0.list_fragment_distributions().await
195 }
196
197 async fn list_creating_stream_scan_fragment_distribution(
198 &self,
199 ) -> Result<Vec<FragmentDistribution>> {
200 self.0
201 .list_creating_stream_scan_fragment_distribution()
202 .await
203 }
204
205 async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
206 self.0.list_actor_states().await
207 }
208
209 async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
210 self.0.list_actor_splits().await
211 }
212
213 async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
214 self.0.list_object_dependencies().await
215 }
216
217 async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
218 let manifest = self.0.get_meta_snapshot_manifest().await?;
219 Ok(manifest.snapshot_metadata)
220 }
221
222 async fn get_system_params(&self) -> Result<SystemParamsReader> {
223 self.0.get_system_params().await
224 }
225
226 async fn set_system_param(
227 &self,
228 param: String,
229 value: Option<String>,
230 ) -> Result<Option<SystemParamsReader>> {
231 self.0.set_system_param(param, value).await
232 }
233
234 async fn get_session_params(&self) -> Result<SessionConfig> {
235 let session_config: SessionConfig =
236 serde_json::from_str(&self.0.get_session_params().await?)
237 .context("failed to parse session config")?;
238 Ok(session_config)
239 }
240
241 async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
242 self.0.set_session_param(param, value).await
243 }
244
245 async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
246 let ddl_progress = self.0.get_ddl_progress().await?;
247 Ok(ddl_progress)
248 }
249
250 async fn get_tables(
251 &self,
252 table_ids: &[u32],
253 include_dropped_tables: bool,
254 ) -> Result<HashMap<u32, Table>> {
255 let tables = self.0.get_tables(table_ids, include_dropped_tables).await?;
256 Ok(tables)
257 }
258
259 async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
260 let pinned_versions = self
261 .0
262 .risectl_get_pinned_versions_summary()
263 .await?
264 .summary
265 .unwrap()
266 .pinned_versions;
267 let ret = pinned_versions
268 .into_iter()
269 .map(|v| (v.context_id, v.min_pinned_id))
270 .collect();
271 Ok(ret)
272 }
273
274 async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
275 self.0.get_current_version().await
276 }
277
278 async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
279 self.0
280 .risectl_get_checkpoint_hummock_version()
281 .await
282 .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
283 }
284
285 async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
286 self.0
288 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
289 .await
290 }
291
292 async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
293 self.0.list_branched_object().await
294 }
295
296 async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
297 self.0.risectl_list_compaction_group().await
298 }
299
300 async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
301 self.0.list_active_write_limit().await
302 }
303
304 async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
305 self.0.list_hummock_meta_config().await
306 }
307
308 async fn list_event_log(&self) -> Result<Vec<EventLog>> {
309 self.0.list_event_log().await
310 }
311
312 async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
313 self.0.list_compact_task_assignment().await
314 }
315
316 async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>> {
317 self.0.list_worker_nodes(None).await
318 }
319
320 async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
321 self.0.list_compact_task_progress().await
322 }
323
324 async fn apply_throttle(
325 &self,
326 kind: PbThrottleTarget,
327 id: u32,
328 rate_limit: Option<u32>,
329 ) -> Result<()> {
330 self.0
331 .apply_throttle(kind, id, rate_limit)
332 .await
333 .map(|_| ())
334 }
335
336 async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
337 self.0.get_cluster_recovery_status().await
338 }
339
340 async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
341 self.0.get_cluster_limits().await
342 }
343
344 async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
345 self.0.list_rate_limits().await
346 }
347
348 async fn get_meta_store_endpoint(&self) -> Result<String> {
349 self.0.get_meta_store_endpoint().await
350 }
351
352 async fn alter_sink_props(
353 &self,
354 sink_id: u32,
355 changed_props: BTreeMap<String, String>,
356 changed_secret_refs: BTreeMap<String, PbSecretRef>,
357 connector_conn_ref: Option<u32>,
358 ) -> Result<()> {
359 self.0
360 .alter_sink_props(
361 sink_id,
362 changed_props,
363 changed_secret_refs,
364 connector_conn_ref,
365 )
366 .await
367 }
368
369 async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
370 self.0.list_hosted_iceberg_tables().await
371 }
372
373 async fn get_fragment_by_id(&self, fragment_id: u32) -> Result<Option<FragmentDistribution>> {
374 self.0.get_fragment_by_id(fragment_id).await
375 }
376
377 fn worker_id(&self) -> u32 {
378 self.0.worker_id()
379 }
380}