1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38 ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58 PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
68use risingwave_pb::ddl_service::*;
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
71use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
72use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
73use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
74use risingwave_pb::hummock::write_limits::WriteLimit;
75use risingwave_pb::hummock::*;
76use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
77use risingwave_pb::iceberg_compaction::{
78 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
79 subscribe_iceberg_compaction_event_request,
80};
81use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
82use risingwave_pb::meta::alter_connector_props_request::{
83 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
84};
85use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
86use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
87use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
88use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
89use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
90use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
91use risingwave_pb::meta::list_actor_states_response::ActorState;
92use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
93use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
94use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
95use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
96use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
97use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
98use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
99use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
100use risingwave_pb::meta::serving_service_client::ServingServiceClient;
101use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
102use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
103use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
104use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
105use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
106use risingwave_pb::meta::{FragmentDistribution, *};
107use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
108use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
109use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
110use risingwave_pb::secret::PbSecretRef;
111use risingwave_pb::stream_plan::StreamFragmentGraph;
112use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
113use risingwave_pb::user::update_user_request::UpdateField;
114use risingwave_pb::user::user_service_client::UserServiceClient;
115use risingwave_pb::user::*;
116use thiserror_ext::AsReport;
117use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
118use tokio::sync::oneshot::Sender;
119use tokio::sync::{RwLock, mpsc, oneshot};
120use tokio::task::JoinHandle;
121use tokio::time::{self};
122use tokio_retry::strategy::{ExponentialBackoff, jitter};
123use tokio_stream::wrappers::UnboundedReceiverStream;
124use tonic::transport::Endpoint;
125use tonic::{Code, Request, Streaming};
126
127use crate::channel::{Channel, WrappedChannelExt};
128use crate::error::{Result, RpcError};
129use crate::hummock_meta_client::{
130 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
131 IcebergCompactionEventItem,
132};
133use crate::meta_rpc_client_method_impl;
134
135#[derive(Clone, Debug)]
137pub struct MetaClient {
138 worker_id: WorkerId,
139 worker_type: WorkerType,
140 host_addr: HostAddr,
141 inner: GrpcMetaClient,
142 meta_config: Arc<MetaConfig>,
143 cluster_id: String,
144 shutting_down: Arc<AtomicBool>,
145}
146
147impl MetaClient {
148 pub fn worker_id(&self) -> WorkerId {
149 self.worker_id
150 }
151
152 pub fn host_addr(&self) -> &HostAddr {
153 &self.host_addr
154 }
155
156 pub fn worker_type(&self) -> WorkerType {
157 self.worker_type
158 }
159
160 pub fn cluster_id(&self) -> &str {
161 &self.cluster_id
162 }
163
164 pub async fn subscribe(
166 &self,
167 subscribe_type: SubscribeType,
168 ) -> Result<Streaming<SubscribeResponse>> {
169 let request = SubscribeRequest {
170 subscribe_type: subscribe_type as i32,
171 host: Some(self.host_addr.to_protobuf()),
172 worker_id: self.worker_id(),
173 };
174
175 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
176 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
177 true,
178 );
179
180 tokio_retry::Retry::spawn(retry_strategy, || async {
181 let request = request.clone();
182 self.inner.subscribe(request).await
183 })
184 .await
185 }
186
187 pub async fn create_connection(
188 &self,
189 connection_name: String,
190 database_id: DatabaseId,
191 schema_id: SchemaId,
192 owner_id: UserId,
193 req: create_connection_request::Payload,
194 ) -> Result<WaitVersion> {
195 let request = CreateConnectionRequest {
196 name: connection_name,
197 database_id,
198 schema_id,
199 owner_id,
200 payload: Some(req),
201 };
202 let resp = self.inner.create_connection(request).await?;
203 Ok(resp
204 .version
205 .ok_or_else(|| anyhow!("wait version not set"))?)
206 }
207
208 pub async fn create_secret(
209 &self,
210 secret_name: String,
211 database_id: DatabaseId,
212 schema_id: SchemaId,
213 owner_id: UserId,
214 value: Vec<u8>,
215 ) -> Result<WaitVersion> {
216 let request = CreateSecretRequest {
217 name: secret_name,
218 database_id,
219 schema_id,
220 owner_id,
221 value,
222 };
223 let resp = self.inner.create_secret(request).await?;
224 Ok(resp
225 .version
226 .ok_or_else(|| anyhow!("wait version not set"))?)
227 }
228
229 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
230 let request = ListConnectionsRequest {};
231 let resp = self.inner.list_connections(request).await?;
232 Ok(resp.connections)
233 }
234
235 pub async fn drop_connection(
236 &self,
237 connection_id: ConnectionId,
238 cascade: bool,
239 ) -> Result<WaitVersion> {
240 let request = DropConnectionRequest {
241 connection_id,
242 cascade,
243 };
244 let resp = self.inner.drop_connection(request).await?;
245 Ok(resp
246 .version
247 .ok_or_else(|| anyhow!("wait version not set"))?)
248 }
249
250 pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
251 let request = DropSecretRequest { secret_id, cascade };
252 let resp = self.inner.drop_secret(request).await?;
253 Ok(resp
254 .version
255 .ok_or_else(|| anyhow!("wait version not set"))?)
256 }
257
258 pub async fn register_new(
262 addr_strategy: MetaAddressStrategy,
263 worker_type: WorkerType,
264 addr: &HostAddr,
265 property: Property,
266 meta_config: Arc<MetaConfig>,
267 ) -> (Self, SystemParamsReader) {
268 let ret =
269 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
270
271 match ret {
272 Ok(ret) => ret,
273 Err(err) => {
274 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
275 std::process::exit(1);
276 }
277 }
278 }
279
280 async fn register_new_inner(
281 addr_strategy: MetaAddressStrategy,
282 worker_type: WorkerType,
283 addr: &HostAddr,
284 property: Property,
285 meta_config: Arc<MetaConfig>,
286 ) -> Result<(Self, SystemParamsReader)> {
287 tracing::info!("register meta client using strategy: {}", addr_strategy);
288
289 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
291 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
292 true,
293 );
294
295 if property.is_unschedulable {
296 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
297 }
298 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299 retry_strategy,
300 || async {
301 let grpc_meta_client =
302 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304 let add_worker_resp = grpc_meta_client
305 .add_worker_node(AddWorkerNodeRequest {
306 worker_type: worker_type as i32,
307 host: Some(addr.to_protobuf()),
308 property: Some(property.clone()),
309 resource: Some(risingwave_pb::common::worker_node::Resource {
310 rw_version: RW_VERSION.to_owned(),
311 total_memory_bytes: system_memory_available_bytes() as _,
312 total_cpu_cores: total_cpu_available() as _,
313 hostname: hostname(),
314 }),
315 })
316 .await
317 .context("failed to add worker node")?;
318
319 let system_params_resp = grpc_meta_client
320 .get_system_params(GetSystemParamsRequest {})
321 .await
322 .context("failed to get initial system params")?;
323
324 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325 },
326 |e: &RpcError| !e.is_from_tonic_server_impl(),
329 )
330 .await;
331
332 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333 let worker_id = add_worker_resp
334 .node_id
335 .expect("AddWorkerNodeResponse::node_id is empty");
336
337 let meta_client = Self {
338 worker_id,
339 worker_type,
340 host_addr: addr.clone(),
341 inner: grpc_meta_client,
342 meta_config: meta_config.clone(),
343 cluster_id: add_worker_resp.cluster_id,
344 shutting_down: Arc::new(false.into()),
345 };
346
347 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348 REPORT_PANIC.call_once(|| {
349 let meta_client_clone = meta_client.clone();
350 std::panic::update_hook(move |default_hook, info| {
351 meta_client_clone.try_add_panic_event_blocking(info, None);
353 default_hook(info);
354 });
355 });
356
357 Ok((meta_client, system_params_resp.params.unwrap().into()))
358 }
359
360 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362 let request = ActivateWorkerNodeRequest {
363 host: Some(addr.to_protobuf()),
364 node_id: self.worker_id,
365 };
366 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368 true,
369 );
370 tokio_retry::Retry::spawn(retry_strategy, || async {
371 let request = request.clone();
372 self.inner.activate_worker_node(request).await
373 })
374 .await?;
375
376 Ok(())
377 }
378
379 pub async fn send_heartbeat(&self, node_id: WorkerId) -> Result<()> {
381 let request = HeartbeatRequest { node_id };
382 let resp = self.inner.heartbeat(request).await?;
383 if let Some(status) = resp.status
384 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
385 {
386 if !self.shutting_down.load(Relaxed) {
389 tracing::error!(message = status.message, "worker expired");
390 std::process::exit(1);
391 }
392 }
393 Ok(())
394 }
395
396 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
397 let request = CreateDatabaseRequest { db: Some(db) };
398 let resp = self.inner.create_database(request).await?;
399 Ok(resp
401 .version
402 .ok_or_else(|| anyhow!("wait version not set"))?)
403 }
404
405 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
406 let request = CreateSchemaRequest {
407 schema: Some(schema),
408 };
409 let resp = self.inner.create_schema(request).await?;
410 Ok(resp
412 .version
413 .ok_or_else(|| anyhow!("wait version not set"))?)
414 }
415
416 pub async fn create_materialized_view(
417 &self,
418 table: PbTable,
419 graph: StreamFragmentGraph,
420 dependencies: HashSet<ObjectId>,
421 resource_type: streaming_job_resource_type::ResourceType,
422 if_not_exists: bool,
423 ) -> Result<WaitVersion> {
424 let request = CreateMaterializedViewRequest {
425 materialized_view: Some(table),
426 fragment_graph: Some(graph),
427 resource_type: Some(PbStreamingJobResourceType {
428 resource_type: Some(resource_type),
429 }),
430 dependencies: dependencies.into_iter().collect(),
431 if_not_exists,
432 };
433 let resp = self.inner.create_materialized_view(request).await?;
434 Ok(resp
436 .version
437 .ok_or_else(|| anyhow!("wait version not set"))?)
438 }
439
440 pub async fn drop_materialized_view(
441 &self,
442 table_id: TableId,
443 cascade: bool,
444 ) -> Result<WaitVersion> {
445 let request = DropMaterializedViewRequest { table_id, cascade };
446
447 let resp = self.inner.drop_materialized_view(request).await?;
448 Ok(resp
449 .version
450 .ok_or_else(|| anyhow!("wait version not set"))?)
451 }
452
453 pub async fn create_source(
454 &self,
455 source: PbSource,
456 graph: Option<StreamFragmentGraph>,
457 if_not_exists: bool,
458 ) -> Result<WaitVersion> {
459 let request = CreateSourceRequest {
460 source: Some(source),
461 fragment_graph: graph,
462 if_not_exists,
463 };
464
465 let resp = self.inner.create_source(request).await?;
466 Ok(resp
467 .version
468 .ok_or_else(|| anyhow!("wait version not set"))?)
469 }
470
471 pub async fn create_sink(
472 &self,
473 sink: PbSink,
474 graph: StreamFragmentGraph,
475 dependencies: HashSet<ObjectId>,
476 if_not_exists: bool,
477 ) -> Result<WaitVersion> {
478 let request = CreateSinkRequest {
479 sink: Some(sink),
480 fragment_graph: Some(graph),
481 dependencies: dependencies.into_iter().collect(),
482 if_not_exists,
483 };
484
485 let resp = self.inner.create_sink(request).await?;
486 Ok(resp
487 .version
488 .ok_or_else(|| anyhow!("wait version not set"))?)
489 }
490
491 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
492 let request = CreateSubscriptionRequest {
493 subscription: Some(subscription),
494 };
495
496 let resp = self.inner.create_subscription(request).await?;
497 Ok(resp
498 .version
499 .ok_or_else(|| anyhow!("wait version not set"))?)
500 }
501
502 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
503 let request = CreateFunctionRequest {
504 function: Some(function),
505 };
506 let resp = self.inner.create_function(request).await?;
507 Ok(resp
508 .version
509 .ok_or_else(|| anyhow!("wait version not set"))?)
510 }
511
512 pub async fn create_table(
513 &self,
514 source: Option<PbSource>,
515 table: PbTable,
516 graph: StreamFragmentGraph,
517 job_type: PbTableJobType,
518 if_not_exists: bool,
519 dependencies: HashSet<ObjectId>,
520 ) -> Result<WaitVersion> {
521 let request = CreateTableRequest {
522 materialized_view: Some(table),
523 fragment_graph: Some(graph),
524 source,
525 job_type: job_type as _,
526 if_not_exists,
527 dependencies: dependencies.into_iter().collect(),
528 };
529 let resp = self.inner.create_table(request).await?;
530 Ok(resp
532 .version
533 .ok_or_else(|| anyhow!("wait version not set"))?)
534 }
535
536 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
537 let request = CommentOnRequest {
538 comment: Some(comment),
539 };
540 let resp = self.inner.comment_on(request).await?;
541 Ok(resp
542 .version
543 .ok_or_else(|| anyhow!("wait version not set"))?)
544 }
545
546 pub async fn alter_name(
547 &self,
548 object: alter_name_request::Object,
549 name: &str,
550 ) -> Result<WaitVersion> {
551 let request = AlterNameRequest {
552 object: Some(object),
553 new_name: name.to_owned(),
554 };
555 let resp = self.inner.alter_name(request).await?;
556 Ok(resp
557 .version
558 .ok_or_else(|| anyhow!("wait version not set"))?)
559 }
560
561 pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
562 let request = AlterOwnerRequest {
563 object: Some(object),
564 owner_id,
565 };
566 let resp = self.inner.alter_owner(request).await?;
567 Ok(resp
568 .version
569 .ok_or_else(|| anyhow!("wait version not set"))?)
570 }
571
572 pub async fn alter_database_param(
573 &self,
574 database_id: DatabaseId,
575 param: AlterDatabaseParam,
576 ) -> Result<WaitVersion> {
577 let request = match param {
578 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
579 let barrier_interval_ms = OptionalUint32 {
580 value: barrier_interval_ms,
581 };
582 AlterDatabaseParamRequest {
583 database_id,
584 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
585 barrier_interval_ms,
586 )),
587 }
588 }
589 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
590 let checkpoint_frequency = OptionalUint64 {
591 value: checkpoint_frequency,
592 };
593 AlterDatabaseParamRequest {
594 database_id,
595 param: Some(alter_database_param_request::Param::CheckpointFrequency(
596 checkpoint_frequency,
597 )),
598 }
599 }
600 };
601 let resp = self.inner.alter_database_param(request).await?;
602 Ok(resp
603 .version
604 .ok_or_else(|| anyhow!("wait version not set"))?)
605 }
606
607 pub async fn alter_set_schema(
608 &self,
609 object: alter_set_schema_request::Object,
610 new_schema_id: SchemaId,
611 ) -> Result<WaitVersion> {
612 let request = AlterSetSchemaRequest {
613 new_schema_id,
614 object: Some(object),
615 };
616 let resp = self.inner.alter_set_schema(request).await?;
617 Ok(resp
618 .version
619 .ok_or_else(|| anyhow!("wait version not set"))?)
620 }
621
622 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
623 let request = AlterSourceRequest {
624 source: Some(source),
625 };
626 let resp = self.inner.alter_source(request).await?;
627 Ok(resp
628 .version
629 .ok_or_else(|| anyhow!("wait version not set"))?)
630 }
631
632 pub async fn alter_parallelism(
633 &self,
634 job_id: JobId,
635 parallelism: PbTableParallelism,
636 deferred: bool,
637 ) -> Result<()> {
638 let request = AlterParallelismRequest {
639 table_id: job_id,
640 parallelism: Some(parallelism),
641 deferred,
642 };
643
644 self.inner.alter_parallelism(request).await?;
645 Ok(())
646 }
647
648 pub async fn alter_streaming_job_config(
649 &self,
650 job_id: JobId,
651 entries_to_add: HashMap<String, String>,
652 keys_to_remove: Vec<String>,
653 ) -> Result<()> {
654 let request = AlterStreamingJobConfigRequest {
655 job_id,
656 entries_to_add,
657 keys_to_remove,
658 };
659
660 self.inner.alter_streaming_job_config(request).await?;
661 Ok(())
662 }
663
664 pub async fn alter_fragment_parallelism(
665 &self,
666 fragment_ids: Vec<FragmentId>,
667 parallelism: Option<PbTableParallelism>,
668 ) -> Result<()> {
669 let request = AlterFragmentParallelismRequest {
670 fragment_ids,
671 parallelism,
672 };
673
674 self.inner.alter_fragment_parallelism(request).await?;
675 Ok(())
676 }
677
678 pub async fn alter_cdc_table_backfill_parallelism(
679 &self,
680 table_id: JobId,
681 parallelism: PbTableParallelism,
682 ) -> Result<()> {
683 let request = AlterCdcTableBackfillParallelismRequest {
684 table_id,
685 parallelism: Some(parallelism),
686 };
687 self.inner
688 .alter_cdc_table_backfill_parallelism(request)
689 .await?;
690 Ok(())
691 }
692
693 pub async fn alter_resource_group(
694 &self,
695 table_id: TableId,
696 resource_group: Option<String>,
697 deferred: bool,
698 ) -> Result<()> {
699 let request = AlterResourceGroupRequest {
700 table_id,
701 resource_group,
702 deferred,
703 };
704
705 self.inner.alter_resource_group(request).await?;
706 Ok(())
707 }
708
709 pub async fn alter_swap_rename(
710 &self,
711 object: alter_swap_rename_request::Object,
712 ) -> Result<WaitVersion> {
713 let request = AlterSwapRenameRequest {
714 object: Some(object),
715 };
716 let resp = self.inner.alter_swap_rename(request).await?;
717 Ok(resp
718 .version
719 .ok_or_else(|| anyhow!("wait version not set"))?)
720 }
721
722 pub async fn alter_secret(
723 &self,
724 secret_id: SecretId,
725 secret_name: String,
726 database_id: DatabaseId,
727 schema_id: SchemaId,
728 owner_id: UserId,
729 value: Vec<u8>,
730 ) -> Result<WaitVersion> {
731 let request = AlterSecretRequest {
732 secret_id,
733 name: secret_name,
734 database_id,
735 schema_id,
736 owner_id,
737 value,
738 };
739 let resp = self.inner.alter_secret(request).await?;
740 Ok(resp
741 .version
742 .ok_or_else(|| anyhow!("wait version not set"))?)
743 }
744
745 pub async fn replace_job(
746 &self,
747 graph: StreamFragmentGraph,
748 replace_job: ReplaceJob,
749 ) -> Result<WaitVersion> {
750 let request = ReplaceJobPlanRequest {
751 plan: Some(ReplaceJobPlan {
752 fragment_graph: Some(graph),
753 replace_job: Some(replace_job),
754 }),
755 };
756 let resp = self.inner.replace_job_plan(request).await?;
757 Ok(resp
759 .version
760 .ok_or_else(|| anyhow!("wait version not set"))?)
761 }
762
763 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
764 let request = AutoSchemaChangeRequest {
765 schema_change: Some(schema_change),
766 };
767 let _ = self.inner.auto_schema_change(request).await?;
768 Ok(())
769 }
770
771 pub async fn create_view(
772 &self,
773 view: PbView,
774 dependencies: HashSet<ObjectId>,
775 ) -> Result<WaitVersion> {
776 let request = CreateViewRequest {
777 view: Some(view),
778 dependencies: dependencies.into_iter().collect(),
779 };
780 let resp = self.inner.create_view(request).await?;
781 Ok(resp
783 .version
784 .ok_or_else(|| anyhow!("wait version not set"))?)
785 }
786
787 pub async fn create_index(
788 &self,
789 index: PbIndex,
790 table: PbTable,
791 graph: StreamFragmentGraph,
792 if_not_exists: bool,
793 ) -> Result<WaitVersion> {
794 let request = CreateIndexRequest {
795 index: Some(index),
796 index_table: Some(table),
797 fragment_graph: Some(graph),
798 if_not_exists,
799 };
800 let resp = self.inner.create_index(request).await?;
801 Ok(resp
803 .version
804 .ok_or_else(|| anyhow!("wait version not set"))?)
805 }
806
807 pub async fn drop_table(
808 &self,
809 source_id: Option<SourceId>,
810 table_id: TableId,
811 cascade: bool,
812 ) -> Result<WaitVersion> {
813 let request = DropTableRequest {
814 source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
815 table_id,
816 cascade,
817 };
818
819 let resp = self.inner.drop_table(request).await?;
820 Ok(resp
821 .version
822 .ok_or_else(|| anyhow!("wait version not set"))?)
823 }
824
825 pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
826 let request = CompactIcebergTableRequest { sink_id };
827 let resp = self.inner.compact_iceberg_table(request).await?;
828 Ok(resp.task_id)
829 }
830
831 pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
832 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
833 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
834 Ok(())
835 }
836
837 pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
838 let request = DropViewRequest { view_id, cascade };
839 let resp = self.inner.drop_view(request).await?;
840 Ok(resp
841 .version
842 .ok_or_else(|| anyhow!("wait version not set"))?)
843 }
844
845 pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
846 let request = DropSourceRequest { source_id, cascade };
847 let resp = self.inner.drop_source(request).await?;
848 Ok(resp
849 .version
850 .ok_or_else(|| anyhow!("wait version not set"))?)
851 }
852
853 pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
854 let request = ResetSourceRequest { source_id };
855 let resp = self.inner.reset_source(request).await?;
856 Ok(resp
857 .version
858 .ok_or_else(|| anyhow!("wait version not set"))?)
859 }
860
861 pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
862 let request = DropSinkRequest { sink_id, cascade };
863 let resp = self.inner.drop_sink(request).await?;
864 Ok(resp
865 .version
866 .ok_or_else(|| anyhow!("wait version not set"))?)
867 }
868
869 pub async fn drop_subscription(
870 &self,
871 subscription_id: SubscriptionId,
872 cascade: bool,
873 ) -> Result<WaitVersion> {
874 let request = DropSubscriptionRequest {
875 subscription_id,
876 cascade,
877 };
878 let resp = self.inner.drop_subscription(request).await?;
879 Ok(resp
880 .version
881 .ok_or_else(|| anyhow!("wait version not set"))?)
882 }
883
884 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
885 let request = DropIndexRequest { index_id, cascade };
886 let resp = self.inner.drop_index(request).await?;
887 Ok(resp
888 .version
889 .ok_or_else(|| anyhow!("wait version not set"))?)
890 }
891
892 pub async fn drop_function(
893 &self,
894 function_id: FunctionId,
895 cascade: bool,
896 ) -> Result<WaitVersion> {
897 let request = DropFunctionRequest {
898 function_id,
899 cascade,
900 };
901 let resp = self.inner.drop_function(request).await?;
902 Ok(resp
903 .version
904 .ok_or_else(|| anyhow!("wait version not set"))?)
905 }
906
907 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
908 let request = DropDatabaseRequest { database_id };
909 let resp = self.inner.drop_database(request).await?;
910 Ok(resp
911 .version
912 .ok_or_else(|| anyhow!("wait version not set"))?)
913 }
914
915 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
916 let request = DropSchemaRequest { schema_id, cascade };
917 let resp = self.inner.drop_schema(request).await?;
918 Ok(resp
919 .version
920 .ok_or_else(|| anyhow!("wait version not set"))?)
921 }
922
923 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
925 let request = CreateUserRequest { user: Some(user) };
926 let resp = self.inner.create_user(request).await?;
927 Ok(resp.version)
928 }
929
930 pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
931 let request = DropUserRequest { user_id };
932 let resp = self.inner.drop_user(request).await?;
933 Ok(resp.version)
934 }
935
936 pub async fn update_user(
937 &self,
938 user: UserInfo,
939 update_fields: Vec<UpdateField>,
940 ) -> Result<u64> {
941 let request = UpdateUserRequest {
942 user: Some(user),
943 update_fields: update_fields
944 .into_iter()
945 .map(|field| field as i32)
946 .collect::<Vec<_>>(),
947 };
948 let resp = self.inner.update_user(request).await?;
949 Ok(resp.version)
950 }
951
952 pub async fn grant_privilege(
953 &self,
954 user_ids: Vec<UserId>,
955 privileges: Vec<GrantPrivilege>,
956 with_grant_option: bool,
957 granted_by: UserId,
958 ) -> Result<u64> {
959 let request = GrantPrivilegeRequest {
960 user_ids,
961 privileges,
962 with_grant_option,
963 granted_by,
964 };
965 let resp = self.inner.grant_privilege(request).await?;
966 Ok(resp.version)
967 }
968
969 pub async fn revoke_privilege(
970 &self,
971 user_ids: Vec<UserId>,
972 privileges: Vec<GrantPrivilege>,
973 granted_by: UserId,
974 revoke_by: UserId,
975 revoke_grant_option: bool,
976 cascade: bool,
977 ) -> Result<u64> {
978 let request = RevokePrivilegeRequest {
979 user_ids,
980 privileges,
981 granted_by,
982 revoke_by,
983 revoke_grant_option,
984 cascade,
985 };
986 let resp = self.inner.revoke_privilege(request).await?;
987 Ok(resp.version)
988 }
989
990 pub async fn alter_default_privilege(
991 &self,
992 user_ids: Vec<UserId>,
993 database_id: DatabaseId,
994 schema_ids: Vec<SchemaId>,
995 operation: AlterDefaultPrivilegeOperation,
996 granted_by: UserId,
997 ) -> Result<()> {
998 let request = AlterDefaultPrivilegeRequest {
999 user_ids,
1000 database_id,
1001 schema_ids,
1002 operation: Some(operation),
1003 granted_by,
1004 };
1005 self.inner.alter_default_privilege(request).await?;
1006 Ok(())
1007 }
1008
1009 pub async fn unregister(&self) -> Result<()> {
1011 let request = DeleteWorkerNodeRequest {
1012 host: Some(self.host_addr.to_protobuf()),
1013 };
1014 self.inner.delete_worker_node(request).await?;
1015 self.shutting_down.store(true, Relaxed);
1016 Ok(())
1017 }
1018
1019 pub async fn try_unregister(&self) {
1021 match self.unregister().await {
1022 Ok(_) => {
1023 tracing::info!(
1024 worker_id = %self.worker_id(),
1025 "successfully unregistered from meta service",
1026 )
1027 }
1028 Err(e) => {
1029 tracing::warn!(
1030 error = %e.as_report(),
1031 worker_id = %self.worker_id(),
1032 "failed to unregister from meta service",
1033 );
1034 }
1035 }
1036 }
1037
1038 pub async fn update_schedulability(
1039 &self,
1040 worker_ids: &[WorkerId],
1041 schedulability: Schedulability,
1042 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1043 let request = UpdateWorkerNodeSchedulabilityRequest {
1044 worker_ids: worker_ids.to_vec(),
1045 schedulability: schedulability.into(),
1046 };
1047 let resp = self
1048 .inner
1049 .update_worker_node_schedulability(request)
1050 .await?;
1051 Ok(resp)
1052 }
1053
1054 pub async fn list_worker_nodes(
1055 &self,
1056 worker_type: Option<WorkerType>,
1057 ) -> Result<Vec<WorkerNode>> {
1058 let request = ListAllNodesRequest {
1059 worker_type: worker_type.map(Into::into),
1060 include_starting_nodes: true,
1061 };
1062 let resp = self.inner.list_all_nodes(request).await?;
1063 Ok(resp.nodes)
1064 }
1065
1066 pub fn start_heartbeat_loop(
1068 meta_client: MetaClient,
1069 min_interval: Duration,
1070 ) -> (JoinHandle<()>, Sender<()>) {
1071 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1072 let join_handle = tokio::spawn(async move {
1073 let mut min_interval_ticker = tokio::time::interval(min_interval);
1074 loop {
1075 tokio::select! {
1076 biased;
1077 _ = &mut shutdown_rx => {
1079 tracing::info!("Heartbeat loop is stopped");
1080 return;
1081 }
1082 _ = min_interval_ticker.tick() => {},
1084 }
1085 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1086 match tokio::time::timeout(
1087 min_interval * 3,
1089 meta_client.send_heartbeat(meta_client.worker_id()),
1090 )
1091 .await
1092 {
1093 Ok(Ok(_)) => {}
1094 Ok(Err(err)) => {
1095 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1096 }
1097 Err(_) => {
1098 tracing::warn!("Failed to send_heartbeat: timeout");
1099 }
1100 }
1101 }
1102 });
1103 (join_handle, shutdown_tx)
1104 }
1105
1106 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1107 let request = RisectlListStateTablesRequest {};
1108 let resp = self.inner.risectl_list_state_tables(request).await?;
1109 Ok(resp.tables)
1110 }
1111
1112 pub async fn risectl_resume_backfill(
1113 &self,
1114 request: RisectlResumeBackfillRequest,
1115 ) -> Result<()> {
1116 self.inner.risectl_resume_backfill(request).await?;
1117 Ok(())
1118 }
1119
1120 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1121 let request = FlushRequest { database_id };
1122 let resp = self.inner.flush(request).await?;
1123 Ok(resp.hummock_version_id)
1124 }
1125
1126 pub async fn wait(&self) -> Result<()> {
1127 let request = WaitRequest {};
1128 self.inner.wait(request).await?;
1129 Ok(())
1130 }
1131
1132 pub async fn recover(&self) -> Result<()> {
1133 let request = RecoverRequest {};
1134 self.inner.recover(request).await?;
1135 Ok(())
1136 }
1137
1138 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1139 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1140 let resp = self.inner.cancel_creating_jobs(request).await?;
1141 Ok(resp.canceled_jobs)
1142 }
1143
1144 pub async fn list_table_fragments(
1145 &self,
1146 job_ids: &[JobId],
1147 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1148 let request = ListTableFragmentsRequest {
1149 table_ids: job_ids.to_vec(),
1150 };
1151 let resp = self.inner.list_table_fragments(request).await?;
1152 Ok(resp.table_fragments)
1153 }
1154
1155 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1156 let resp = self
1157 .inner
1158 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1159 .await?;
1160 Ok(resp.states)
1161 }
1162
1163 pub async fn list_fragment_distributions(
1164 &self,
1165 include_node: bool,
1166 ) -> Result<Vec<FragmentDistribution>> {
1167 let resp = self
1168 .inner
1169 .list_fragment_distribution(ListFragmentDistributionRequest {
1170 include_node: Some(include_node),
1171 })
1172 .await?;
1173 Ok(resp.distributions)
1174 }
1175
1176 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1177 let resp = self
1178 .inner
1179 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1180 include_node: Some(true),
1181 })
1182 .await?;
1183 Ok(resp.distributions)
1184 }
1185
1186 pub async fn get_fragment_by_id(
1187 &self,
1188 fragment_id: FragmentId,
1189 ) -> Result<Option<FragmentDistribution>> {
1190 let resp = self
1191 .inner
1192 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1193 .await?;
1194 Ok(resp.distribution)
1195 }
1196
1197 pub async fn get_fragment_vnodes(
1198 &self,
1199 fragment_id: FragmentId,
1200 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1201 let resp = self
1202 .inner
1203 .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1204 .await?;
1205 Ok(resp
1206 .actor_vnodes
1207 .into_iter()
1208 .map(|actor| (actor.actor_id, actor.vnode_indices))
1209 .collect())
1210 }
1211
1212 pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1213 let resp = self
1214 .inner
1215 .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1216 .await?;
1217 Ok(resp.vnode_indices)
1218 }
1219
1220 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1221 let resp = self
1222 .inner
1223 .list_actor_states(ListActorStatesRequest {})
1224 .await?;
1225 Ok(resp.states)
1226 }
1227
1228 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1229 let resp = self
1230 .inner
1231 .list_actor_splits(ListActorSplitsRequest {})
1232 .await?;
1233
1234 Ok(resp.actor_splits)
1235 }
1236
1237 pub async fn pause(&self) -> Result<PauseResponse> {
1238 let request = PauseRequest {};
1239 let resp = self.inner.pause(request).await?;
1240 Ok(resp)
1241 }
1242
1243 pub async fn resume(&self) -> Result<ResumeResponse> {
1244 let request = ResumeRequest {};
1245 let resp = self.inner.resume(request).await?;
1246 Ok(resp)
1247 }
1248
1249 pub async fn apply_throttle(
1250 &self,
1251 throttle_target: PbThrottleTarget,
1252 throttle_type: risingwave_pb::common::PbThrottleType,
1253 id: u32,
1254 rate: Option<u32>,
1255 ) -> Result<ApplyThrottleResponse> {
1256 let request = ApplyThrottleRequest {
1257 throttle_target: throttle_target as i32,
1258 throttle_type: throttle_type as i32,
1259 id,
1260 rate,
1261 };
1262 let resp = self.inner.apply_throttle(request).await?;
1263 Ok(resp)
1264 }
1265
1266 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1267 let resp = self
1268 .inner
1269 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1270 .await?;
1271 Ok(resp.get_status().unwrap())
1272 }
1273
1274 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1275 let request = GetClusterInfoRequest {};
1276 let resp = self.inner.get_cluster_info(request).await?;
1277 Ok(resp)
1278 }
1279
1280 pub async fn reschedule(
1281 &self,
1282 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1283 revision: u64,
1284 resolve_no_shuffle_upstream: bool,
1285 ) -> Result<(bool, u64)> {
1286 let request = RescheduleRequest {
1287 revision,
1288 resolve_no_shuffle_upstream,
1289 worker_reschedules,
1290 };
1291 let resp = self.inner.reschedule(request).await?;
1292 Ok((resp.success, resp.revision))
1293 }
1294
1295 pub async fn risectl_get_pinned_versions_summary(
1296 &self,
1297 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1298 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1299 self.inner
1300 .rise_ctl_get_pinned_versions_summary(request)
1301 .await
1302 }
1303
1304 pub async fn risectl_get_checkpoint_hummock_version(
1305 &self,
1306 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1307 let request = RiseCtlGetCheckpointVersionRequest {};
1308 self.inner.rise_ctl_get_checkpoint_version(request).await
1309 }
1310
1311 pub async fn risectl_pause_hummock_version_checkpoint(
1312 &self,
1313 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1314 let request = RiseCtlPauseVersionCheckpointRequest {};
1315 self.inner.rise_ctl_pause_version_checkpoint(request).await
1316 }
1317
1318 pub async fn risectl_resume_hummock_version_checkpoint(
1319 &self,
1320 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1321 let request = RiseCtlResumeVersionCheckpointRequest {};
1322 self.inner.rise_ctl_resume_version_checkpoint(request).await
1323 }
1324
1325 pub async fn init_metadata_for_replay(
1326 &self,
1327 tables: Vec<PbTable>,
1328 compaction_groups: Vec<CompactionGroupInfo>,
1329 ) -> Result<()> {
1330 let req = InitMetadataForReplayRequest {
1331 tables,
1332 compaction_groups,
1333 };
1334 let _resp = self.inner.init_metadata_for_replay(req).await?;
1335 Ok(())
1336 }
1337
1338 pub async fn replay_version_delta(
1339 &self,
1340 version_delta: HummockVersionDelta,
1341 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1342 let req = ReplayVersionDeltaRequest {
1343 version_delta: Some(version_delta.into()),
1344 };
1345 let resp = self.inner.replay_version_delta(req).await?;
1346 Ok((
1347 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1348 resp.modified_compaction_groups,
1349 ))
1350 }
1351
1352 pub async fn list_version_deltas(
1353 &self,
1354 start_id: HummockVersionId,
1355 num_limit: u32,
1356 committed_epoch_limit: HummockEpoch,
1357 ) -> Result<Vec<HummockVersionDelta>> {
1358 let req = ListVersionDeltasRequest {
1359 start_id,
1360 num_limit,
1361 committed_epoch_limit,
1362 };
1363 Ok(self
1364 .inner
1365 .list_version_deltas(req)
1366 .await?
1367 .version_deltas
1368 .unwrap()
1369 .version_deltas
1370 .iter()
1371 .map(HummockVersionDelta::from_rpc_protobuf)
1372 .collect())
1373 }
1374
1375 pub async fn trigger_compaction_deterministic(
1376 &self,
1377 version_id: HummockVersionId,
1378 compaction_groups: Vec<CompactionGroupId>,
1379 ) -> Result<()> {
1380 let req = TriggerCompactionDeterministicRequest {
1381 version_id,
1382 compaction_groups,
1383 };
1384 self.inner.trigger_compaction_deterministic(req).await?;
1385 Ok(())
1386 }
1387
1388 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1389 let req = DisableCommitEpochRequest {};
1390 Ok(HummockVersion::from_rpc_protobuf(
1391 &self
1392 .inner
1393 .disable_commit_epoch(req)
1394 .await?
1395 .current_version
1396 .unwrap(),
1397 ))
1398 }
1399
1400 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1401 let req = GetAssignedCompactTaskNumRequest {};
1402 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1403 Ok(resp.num_tasks as usize)
1404 }
1405
1406 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1407 let req = RiseCtlListCompactionGroupRequest {};
1408 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1409 Ok(resp.compaction_groups)
1410 }
1411
1412 pub async fn risectl_update_compaction_config(
1413 &self,
1414 compaction_groups: &[CompactionGroupId],
1415 configs: &[MutableConfig],
1416 ) -> Result<()> {
1417 let req = RiseCtlUpdateCompactionConfigRequest {
1418 compaction_group_ids: compaction_groups.to_vec(),
1419 configs: configs
1420 .iter()
1421 .map(
1422 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1423 mutable_config: Some(c.clone()),
1424 },
1425 )
1426 .collect(),
1427 };
1428 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1429 Ok(())
1430 }
1431
1432 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1433 let req = BackupMetaRequest { remarks };
1434 let resp = self.inner.backup_meta(req).await?;
1435 Ok(resp.job_id)
1436 }
1437
1438 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1439 let req = GetBackupJobStatusRequest { job_id };
1440 let resp = self.inner.get_backup_job_status(req).await?;
1441 Ok((resp.job_status(), resp.message))
1442 }
1443
1444 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1445 let req = DeleteMetaSnapshotRequest {
1446 snapshot_ids: snapshot_ids.to_vec(),
1447 };
1448 let _resp = self.inner.delete_meta_snapshot(req).await?;
1449 Ok(())
1450 }
1451
1452 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1453 let req = GetMetaSnapshotManifestRequest {};
1454 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1455 Ok(resp.manifest.expect("should exist"))
1456 }
1457
1458 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1459 let req = GetTelemetryInfoRequest {};
1460 let resp = self.inner.get_telemetry_info(req).await?;
1461 Ok(resp)
1462 }
1463
1464 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1465 let req = GetMetaStoreInfoRequest {};
1466 let resp = self.inner.get_meta_store_info(req).await?;
1467 Ok(resp.meta_store_endpoint)
1468 }
1469
1470 pub async fn alter_sink_props(
1471 &self,
1472 sink_id: SinkId,
1473 changed_props: BTreeMap<String, String>,
1474 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1475 connector_conn_ref: Option<ConnectionId>,
1476 ) -> Result<()> {
1477 let req = AlterConnectorPropsRequest {
1478 object_id: sink_id.as_raw_id(),
1479 changed_props: changed_props.into_iter().collect(),
1480 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1481 connector_conn_ref,
1482 object_type: AlterConnectorPropsObject::Sink as i32,
1483 extra_options: None,
1484 };
1485 let _resp = self.inner.alter_connector_props(req).await?;
1486 Ok(())
1487 }
1488
1489 pub async fn alter_iceberg_table_props(
1490 &self,
1491 table_id: TableId,
1492 sink_id: SinkId,
1493 source_id: SourceId,
1494 changed_props: BTreeMap<String, String>,
1495 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1496 connector_conn_ref: Option<ConnectionId>,
1497 ) -> Result<()> {
1498 let req = AlterConnectorPropsRequest {
1499 object_id: table_id.as_raw_id(),
1500 changed_props: changed_props.into_iter().collect(),
1501 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1502 connector_conn_ref,
1503 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1504 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1505 sink_id,
1506 source_id,
1507 })),
1508 };
1509 let _resp = self.inner.alter_connector_props(req).await?;
1510 Ok(())
1511 }
1512
1513 pub async fn alter_source_connector_props(
1514 &self,
1515 source_id: SourceId,
1516 changed_props: BTreeMap<String, String>,
1517 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1518 connector_conn_ref: Option<ConnectionId>,
1519 ) -> Result<()> {
1520 let req = AlterConnectorPropsRequest {
1521 object_id: source_id.as_raw_id(),
1522 changed_props: changed_props.into_iter().collect(),
1523 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1524 connector_conn_ref,
1525 object_type: AlterConnectorPropsObject::Source as i32,
1526 extra_options: None,
1527 };
1528 let _resp = self.inner.alter_connector_props(req).await?;
1529 Ok(())
1530 }
1531
1532 pub async fn alter_connection_connector_props(
1533 &self,
1534 connection_id: u32,
1535 changed_props: BTreeMap<String, String>,
1536 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1537 ) -> Result<()> {
1538 let req = AlterConnectorPropsRequest {
1539 object_id: connection_id,
1540 changed_props: changed_props.into_iter().collect(),
1541 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1542 connector_conn_ref: None, object_type: AlterConnectorPropsObject::Connection as i32,
1544 extra_options: None,
1545 };
1546 let _resp = self.inner.alter_connector_props(req).await?;
1547 Ok(())
1548 }
1549
1550 pub async fn set_system_param(
1551 &self,
1552 param: String,
1553 value: Option<String>,
1554 ) -> Result<Option<SystemParamsReader>> {
1555 let req = SetSystemParamRequest { param, value };
1556 let resp = self.inner.set_system_param(req).await?;
1557 Ok(resp.params.map(SystemParamsReader::from))
1558 }
1559
1560 pub async fn get_session_params(&self) -> Result<String> {
1561 let req = GetSessionParamsRequest {};
1562 let resp = self.inner.get_session_params(req).await?;
1563 Ok(resp.params)
1564 }
1565
1566 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1567 let req = SetSessionParamRequest { param, value };
1568 let resp = self.inner.set_session_param(req).await?;
1569 Ok(resp.param)
1570 }
1571
1572 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1573 let req = GetDdlProgressRequest {};
1574 let resp = self.inner.get_ddl_progress(req).await?;
1575 Ok(resp.ddl_progress)
1576 }
1577
1578 pub async fn split_compaction_group(
1579 &self,
1580 group_id: CompactionGroupId,
1581 table_ids_to_new_group: &[TableId],
1582 partition_vnode_count: u32,
1583 ) -> Result<CompactionGroupId> {
1584 let req = SplitCompactionGroupRequest {
1585 group_id,
1586 table_ids: table_ids_to_new_group.to_vec(),
1587 partition_vnode_count,
1588 };
1589 let resp = self.inner.split_compaction_group(req).await?;
1590 Ok(resp.new_group_id)
1591 }
1592
1593 pub async fn get_tables(
1594 &self,
1595 table_ids: Vec<TableId>,
1596 include_dropped_tables: bool,
1597 ) -> Result<HashMap<TableId, Table>> {
1598 let req = GetTablesRequest {
1599 table_ids,
1600 include_dropped_tables,
1601 };
1602 let resp = self.inner.get_tables(req).await?;
1603 Ok(resp.tables)
1604 }
1605
1606 pub async fn list_serving_vnode_mappings(
1607 &self,
1608 ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1609 let req = GetServingVnodeMappingsRequest {};
1610 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1611 let mappings = resp
1612 .worker_slot_mappings
1613 .into_iter()
1614 .map(|p| {
1615 (
1616 p.fragment_id,
1617 (
1618 resp.fragment_to_table
1619 .get(&p.fragment_id)
1620 .cloned()
1621 .unwrap_or_default(),
1622 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1623 ),
1624 )
1625 })
1626 .collect();
1627 Ok(mappings)
1628 }
1629
1630 pub async fn risectl_list_compaction_status(
1631 &self,
1632 ) -> Result<(
1633 Vec<CompactStatus>,
1634 Vec<CompactTaskAssignment>,
1635 Vec<CompactTaskProgress>,
1636 )> {
1637 let req = RiseCtlListCompactionStatusRequest {};
1638 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1639 Ok((
1640 resp.compaction_statuses,
1641 resp.task_assignment,
1642 resp.task_progress,
1643 ))
1644 }
1645
1646 pub async fn get_compaction_score(
1647 &self,
1648 compaction_group_id: CompactionGroupId,
1649 ) -> Result<Vec<PickerInfo>> {
1650 let req = GetCompactionScoreRequest {
1651 compaction_group_id,
1652 };
1653 let resp = self.inner.get_compaction_score(req).await?;
1654 Ok(resp.scores)
1655 }
1656
1657 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1658 let req = RiseCtlRebuildTableStatsRequest {};
1659 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1660 Ok(())
1661 }
1662
1663 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1664 let req = ListBranchedObjectRequest {};
1665 let resp = self.inner.list_branched_object(req).await?;
1666 Ok(resp.branched_objects)
1667 }
1668
1669 pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1670 let req = ListActiveWriteLimitRequest {};
1671 let resp = self.inner.list_active_write_limit(req).await?;
1672 Ok(resp.write_limits)
1673 }
1674
1675 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1676 let req = ListHummockMetaConfigRequest {};
1677 let resp = self.inner.list_hummock_meta_config(req).await?;
1678 Ok(resp.configs)
1679 }
1680
1681 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1682 let _resp = self
1683 .inner
1684 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1685 .await?;
1686
1687 Ok(())
1688 }
1689
1690 pub async fn rw_cloud_validate_source(
1691 &self,
1692 source_type: SourceType,
1693 source_config: HashMap<String, String>,
1694 ) -> Result<RwCloudValidateSourceResponse> {
1695 let req = RwCloudValidateSourceRequest {
1696 source_type: source_type.into(),
1697 source_config,
1698 };
1699 let resp = self.inner.rw_cloud_validate_source(req).await?;
1700 Ok(resp)
1701 }
1702
1703 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1704 self.inner.core.read().await.sink_coordinate_client.clone()
1705 }
1706
1707 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1708 let req = ListCompactTaskAssignmentRequest {};
1709 let resp = self.inner.list_compact_task_assignment(req).await?;
1710 Ok(resp.task_assignment)
1711 }
1712
1713 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1714 let req = ListEventLogRequest::default();
1715 let resp = self.inner.list_event_log(req).await?;
1716 Ok(resp.event_logs)
1717 }
1718
1719 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1720 let req = ListCompactTaskProgressRequest {};
1721 let resp = self.inner.list_compact_task_progress(req).await?;
1722 Ok(resp.task_progress)
1723 }
1724
1725 #[cfg(madsim)]
1726 pub fn try_add_panic_event_blocking(
1727 &self,
1728 panic_info: impl Display,
1729 timeout_millis: Option<u64>,
1730 ) {
1731 }
1732
1733 #[cfg(not(madsim))]
1735 pub fn try_add_panic_event_blocking(
1736 &self,
1737 panic_info: impl Display,
1738 timeout_millis: Option<u64>,
1739 ) {
1740 let event = event_log::EventWorkerNodePanic {
1741 worker_id: self.worker_id,
1742 worker_type: self.worker_type.into(),
1743 host_addr: Some(self.host_addr.to_protobuf()),
1744 panic_info: format!("{panic_info}"),
1745 };
1746 let grpc_meta_client = self.inner.clone();
1747 let _ = thread::spawn(move || {
1748 let rt = tokio::runtime::Builder::new_current_thread()
1749 .enable_all()
1750 .build()
1751 .unwrap();
1752 let req = AddEventLogRequest {
1753 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1754 };
1755 rt.block_on(async {
1756 let _ = tokio::time::timeout(
1757 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1758 grpc_meta_client.add_event_log(req),
1759 )
1760 .await;
1761 });
1762 })
1763 .join();
1764 }
1765
1766 pub async fn add_sink_fail_evet(
1767 &self,
1768 sink_id: SinkId,
1769 sink_name: String,
1770 connector: String,
1771 error: String,
1772 ) -> Result<()> {
1773 let event = event_log::EventSinkFail {
1774 sink_id,
1775 sink_name,
1776 connector,
1777 error,
1778 };
1779 let req = AddEventLogRequest {
1780 event: Some(add_event_log_request::Event::SinkFail(event)),
1781 };
1782 self.inner.add_event_log(req).await?;
1783 Ok(())
1784 }
1785
1786 pub async fn add_cdc_auto_schema_change_fail_event(
1787 &self,
1788 source_id: SourceId,
1789 table_name: String,
1790 cdc_table_id: String,
1791 upstream_ddl: String,
1792 fail_info: String,
1793 ) -> Result<()> {
1794 let event = event_log::EventAutoSchemaChangeFail {
1795 table_id: source_id.as_cdc_table_id(),
1796 table_name,
1797 cdc_table_id,
1798 upstream_ddl,
1799 fail_info,
1800 };
1801 let req = AddEventLogRequest {
1802 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1803 };
1804 self.inner.add_event_log(req).await?;
1805 Ok(())
1806 }
1807
1808 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1809 let req = CancelCompactTaskRequest {
1810 task_id,
1811 task_status: task_status as _,
1812 };
1813 let resp = self.inner.cancel_compact_task(req).await?;
1814 Ok(resp.ret)
1815 }
1816
1817 pub async fn get_version_by_epoch(
1818 &self,
1819 epoch: HummockEpoch,
1820 table_id: TableId,
1821 ) -> Result<PbHummockVersion> {
1822 let req = GetVersionByEpochRequest { epoch, table_id };
1823 let resp = self.inner.get_version_by_epoch(req).await?;
1824 Ok(resp.version.unwrap())
1825 }
1826
1827 pub async fn get_cluster_limits(
1828 &self,
1829 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1830 let req = GetClusterLimitsRequest {};
1831 let resp = self.inner.get_cluster_limits(req).await?;
1832 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1833 }
1834
1835 pub async fn merge_compaction_group(
1836 &self,
1837 left_group_id: CompactionGroupId,
1838 right_group_id: CompactionGroupId,
1839 ) -> Result<()> {
1840 let req = MergeCompactionGroupRequest {
1841 left_group_id,
1842 right_group_id,
1843 };
1844 self.inner.merge_compaction_group(req).await?;
1845 Ok(())
1846 }
1847
1848 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1850 let request = ListRateLimitsRequest {};
1851 let resp = self.inner.list_rate_limits(request).await?;
1852 Ok(resp.rate_limits)
1853 }
1854
1855 pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1856 let request = ListCdcProgressRequest {};
1857 let resp = self.inner.list_cdc_progress(request).await?;
1858 Ok(resp.cdc_progress)
1859 }
1860
1861 pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1862 let request = ListRefreshTableStatesRequest {};
1863 let resp = self.inner.list_refresh_table_states(request).await?;
1864 Ok(resp.states)
1865 }
1866
1867 pub async fn list_sink_log_store_tables(
1868 &self,
1869 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1870 let request = ListSinkLogStoreTablesRequest {};
1871 let resp = self.inner.list_sink_log_store_tables(request).await?;
1872 Ok(resp.tables)
1873 }
1874
1875 pub async fn create_iceberg_table(
1876 &self,
1877 table_job_info: PbTableJobInfo,
1878 sink_job_info: PbSinkJobInfo,
1879 iceberg_source: PbSource,
1880 if_not_exists: bool,
1881 ) -> Result<WaitVersion> {
1882 let request = CreateIcebergTableRequest {
1883 table_info: Some(table_job_info),
1884 sink_info: Some(sink_job_info),
1885 iceberg_source: Some(iceberg_source),
1886 if_not_exists,
1887 };
1888
1889 let resp = self.inner.create_iceberg_table(request).await?;
1890 Ok(resp
1891 .version
1892 .ok_or_else(|| anyhow!("wait version not set"))?)
1893 }
1894
1895 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1896 let request = ListIcebergTablesRequest {};
1897 let resp = self.inner.list_iceberg_tables(request).await?;
1898 Ok(resp.iceberg_tables)
1899 }
1900
1901 pub async fn get_cluster_stack_trace(
1903 &self,
1904 actor_traces_format: ActorTracesFormat,
1905 ) -> Result<StackTraceResponse> {
1906 let request = StackTraceRequest {
1907 actor_traces_format: actor_traces_format as i32,
1908 };
1909 let resp = self.inner.stack_trace(request).await?;
1910 Ok(resp)
1911 }
1912
1913 pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
1914 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1915 self.inner.set_sync_log_store_aligned(request).await?;
1916 Ok(())
1917 }
1918
1919 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1920 self.inner.refresh(request).await
1921 }
1922
1923 pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
1924 let request = ListUnmigratedTablesRequest {};
1925 let resp = self.inner.list_unmigrated_tables(request).await?;
1926 Ok(resp
1927 .tables
1928 .into_iter()
1929 .map(|table| (table.table_id, table.table_name))
1930 .collect())
1931 }
1932}
1933
1934#[async_trait]
1935impl HummockMetaClient for MetaClient {
1936 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1937 let req = UnpinVersionBeforeRequest {
1938 context_id: self.worker_id(),
1939 unpin_version_before,
1940 };
1941 self.inner.unpin_version_before(req).await?;
1942 Ok(())
1943 }
1944
1945 async fn get_current_version(&self) -> Result<HummockVersion> {
1946 let req = GetCurrentVersionRequest::default();
1947 Ok(HummockVersion::from_rpc_protobuf(
1948 &self
1949 .inner
1950 .get_current_version(req)
1951 .await?
1952 .current_version
1953 .unwrap(),
1954 ))
1955 }
1956
1957 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1958 let resp = self
1959 .inner
1960 .get_new_object_ids(GetNewObjectIdsRequest { number })
1961 .await?;
1962 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1963 }
1964
1965 async fn commit_epoch_with_change_log(
1966 &self,
1967 _epoch: HummockEpoch,
1968 _sync_result: SyncResult,
1969 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1970 ) -> Result<()> {
1971 panic!("Only meta service can commit_epoch in production.")
1972 }
1973
1974 async fn trigger_manual_compaction(
1975 &self,
1976 compaction_group_id: CompactionGroupId,
1977 table_id: JobId,
1978 level: u32,
1979 sst_ids: Vec<HummockSstableId>,
1980 ) -> Result<()> {
1981 let req = TriggerManualCompactionRequest {
1983 compaction_group_id,
1984 table_id,
1985 level,
1988 sst_ids,
1989 ..Default::default()
1990 };
1991
1992 self.inner.trigger_manual_compaction(req).await?;
1993 Ok(())
1994 }
1995
1996 async fn trigger_full_gc(
1997 &self,
1998 sst_retention_time_sec: u64,
1999 prefix: Option<String>,
2000 ) -> Result<()> {
2001 self.inner
2002 .trigger_full_gc(TriggerFullGcRequest {
2003 sst_retention_time_sec,
2004 prefix,
2005 })
2006 .await?;
2007 Ok(())
2008 }
2009
2010 async fn subscribe_compaction_event(
2011 &self,
2012 ) -> Result<(
2013 UnboundedSender<SubscribeCompactionEventRequest>,
2014 BoxStream<'static, CompactionEventItem>,
2015 )> {
2016 let (request_sender, request_receiver) =
2017 unbounded_channel::<SubscribeCompactionEventRequest>();
2018 request_sender
2019 .send(SubscribeCompactionEventRequest {
2020 event: Some(subscribe_compaction_event_request::Event::Register(
2021 Register {
2022 context_id: self.worker_id(),
2023 },
2024 )),
2025 create_at: SystemTime::now()
2026 .duration_since(SystemTime::UNIX_EPOCH)
2027 .expect("Clock may have gone backwards")
2028 .as_millis() as u64,
2029 })
2030 .context("Failed to subscribe compaction event")?;
2031
2032 let stream = self
2033 .inner
2034 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2035 request_receiver,
2036 )))
2037 .await?;
2038
2039 Ok((request_sender, Box::pin(stream)))
2040 }
2041
2042 async fn get_version_by_epoch(
2043 &self,
2044 epoch: HummockEpoch,
2045 table_id: TableId,
2046 ) -> Result<PbHummockVersion> {
2047 self.get_version_by_epoch(epoch, table_id).await
2048 }
2049
2050 async fn subscribe_iceberg_compaction_event(
2051 &self,
2052 ) -> Result<(
2053 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2054 BoxStream<'static, IcebergCompactionEventItem>,
2055 )> {
2056 let (request_sender, request_receiver) =
2057 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2058 request_sender
2059 .send(SubscribeIcebergCompactionEventRequest {
2060 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2061 IcebergRegister {
2062 context_id: self.worker_id(),
2063 },
2064 )),
2065 create_at: SystemTime::now()
2066 .duration_since(SystemTime::UNIX_EPOCH)
2067 .expect("Clock may have gone backwards")
2068 .as_millis() as u64,
2069 })
2070 .context("Failed to subscribe compaction event")?;
2071
2072 let stream = self
2073 .inner
2074 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2075 request_receiver,
2076 )))
2077 .await?;
2078
2079 Ok((request_sender, Box::pin(stream)))
2080 }
2081}
2082
2083#[async_trait]
2084impl TelemetryInfoFetcher for MetaClient {
2085 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2086 let resp = self
2087 .get_telemetry_info()
2088 .await
2089 .map_err(|e| e.to_report_string())?;
2090 let tracking_id = resp.get_tracking_id().ok();
2091 Ok(tracking_id.map(|id| id.to_owned()))
2092 }
2093}
2094
2095pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2096
2097#[derive(Debug, Clone)]
2098struct GrpcMetaClientCore {
2099 cluster_client: ClusterServiceClient<Channel>,
2100 meta_member_client: MetaMemberServiceClient<Channel>,
2101 heartbeat_client: HeartbeatServiceClient<Channel>,
2102 ddl_client: DdlServiceClient<Channel>,
2103 hummock_client: HummockManagerServiceClient<Channel>,
2104 notification_client: NotificationServiceClient<Channel>,
2105 stream_client: StreamManagerServiceClient<Channel>,
2106 user_client: UserServiceClient<Channel>,
2107 scale_client: ScaleServiceClient<Channel>,
2108 backup_client: BackupServiceClient<Channel>,
2109 telemetry_client: TelemetryInfoServiceClient<Channel>,
2110 system_params_client: SystemParamsServiceClient<Channel>,
2111 session_params_client: SessionParamServiceClient<Channel>,
2112 serving_client: ServingServiceClient<Channel>,
2113 cloud_client: CloudServiceClient<Channel>,
2114 sink_coordinate_client: SinkCoordinationRpcClient,
2115 event_log_client: EventLogServiceClient<Channel>,
2116 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2117 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2118 monitor_client: MonitorServiceClient<Channel>,
2119}
2120
2121impl GrpcMetaClientCore {
2122 pub(crate) fn new(channel: Channel) -> Self {
2123 let cluster_client = ClusterServiceClient::new(channel.clone());
2124 let meta_member_client = MetaMemberClient::new(channel.clone());
2125 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2126 let ddl_client =
2127 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2128 let hummock_client =
2129 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2130 let notification_client =
2131 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2132 let stream_client =
2133 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2134 let user_client = UserServiceClient::new(channel.clone());
2135 let scale_client =
2136 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2137 let backup_client = BackupServiceClient::new(channel.clone());
2138 let telemetry_client =
2139 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2140 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2141 let session_params_client = SessionParamServiceClient::new(channel.clone());
2142 let serving_client = ServingServiceClient::new(channel.clone());
2143 let cloud_client = CloudServiceClient::new(channel.clone());
2144 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2145 .max_decoding_message_size(usize::MAX);
2146 let event_log_client = EventLogServiceClient::new(channel.clone());
2147 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2148 let hosted_iceberg_catalog_service_client =
2149 HostedIcebergCatalogServiceClient::new(channel.clone());
2150 let monitor_client = MonitorServiceClient::new(channel);
2151
2152 GrpcMetaClientCore {
2153 cluster_client,
2154 meta_member_client,
2155 heartbeat_client,
2156 ddl_client,
2157 hummock_client,
2158 notification_client,
2159 stream_client,
2160 user_client,
2161 scale_client,
2162 backup_client,
2163 telemetry_client,
2164 system_params_client,
2165 session_params_client,
2166 serving_client,
2167 cloud_client,
2168 sink_coordinate_client,
2169 event_log_client,
2170 cluster_limit_client,
2171 hosted_iceberg_catalog_service_client,
2172 monitor_client,
2173 }
2174 }
2175}
2176
2177#[derive(Debug, Clone)]
2181struct GrpcMetaClient {
2182 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2183 core: Arc<RwLock<GrpcMetaClientCore>>,
2184}
2185
2186type MetaMemberClient = MetaMemberServiceClient<Channel>;
2187
2188struct MetaMemberGroup {
2189 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2190}
2191
2192struct MetaMemberManagement {
2193 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2194 members: Either<MetaMemberClient, MetaMemberGroup>,
2195 current_leader: http::Uri,
2196 meta_config: Arc<MetaConfig>,
2197}
2198
2199impl MetaMemberManagement {
2200 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2201
2202 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2203 format!("http://{}:{}", addr.host, addr.port)
2204 .parse()
2205 .unwrap()
2206 }
2207
2208 async fn recreate_core(&self, channel: Channel) {
2209 let mut core = self.core_ref.write().await;
2210 *core = GrpcMetaClientCore::new(channel);
2211 }
2212
2213 async fn refresh_members(&mut self) -> Result<()> {
2214 let leader_addr = match self.members.as_mut() {
2215 Either::Left(client) => {
2216 let resp = client
2217 .to_owned()
2218 .members(MembersRequest {})
2219 .await
2220 .map_err(RpcError::from_meta_status)?;
2221 let resp = resp.into_inner();
2222 resp.members.into_iter().find(|member| member.is_leader)
2223 }
2224 Either::Right(member_group) => {
2225 let mut fetched_members = None;
2226
2227 for (addr, client) in &mut member_group.members {
2228 let members: Result<_> = try {
2229 let mut client = match client {
2230 Some(cached_client) => cached_client.to_owned(),
2231 None => {
2232 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2233 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2234 .await
2235 .context("failed to create client")?;
2236 let new_client: MetaMemberClient =
2237 MetaMemberServiceClient::new(channel);
2238 *client = Some(new_client.clone());
2239
2240 new_client
2241 }
2242 };
2243
2244 let resp = client
2245 .members(MembersRequest {})
2246 .await
2247 .context("failed to fetch members")?;
2248
2249 resp.into_inner().members
2250 };
2251
2252 let fetched = members.is_ok();
2253 fetched_members = Some(members);
2254 if fetched {
2255 break;
2256 }
2257 }
2258
2259 let members = fetched_members
2260 .context("no member available in the list")?
2261 .context("could not refresh members")?;
2262
2263 let mut leader = None;
2265 for member in members {
2266 if member.is_leader {
2267 leader = Some(member.clone());
2268 }
2269
2270 let addr = Self::host_address_to_uri(member.address.unwrap());
2271 if !member_group.members.contains(&addr) {
2273 tracing::info!("new meta member joined: {}", addr);
2274 member_group.members.put(addr, None);
2275 }
2276 }
2277
2278 leader
2279 }
2280 };
2281
2282 if let Some(leader) = leader_addr {
2283 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2284
2285 if discovered_leader != self.current_leader {
2286 tracing::info!("new meta leader {} discovered", discovered_leader);
2287
2288 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2289 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2290 false,
2291 );
2292
2293 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2294 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2295 GrpcMetaClient::connect_to_endpoint(endpoint).await
2296 })
2297 .await?;
2298
2299 self.recreate_core(channel).await;
2300 self.current_leader = discovered_leader;
2301 }
2302 }
2303
2304 Ok(())
2305 }
2306}
2307
2308impl GrpcMetaClient {
2309 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2311 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2313 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2315 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2317
2318 fn start_meta_member_monitor(
2319 &self,
2320 init_leader_addr: http::Uri,
2321 members: Either<MetaMemberClient, MetaMemberGroup>,
2322 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2323 meta_config: Arc<MetaConfig>,
2324 ) -> Result<()> {
2325 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2326 let current_leader = init_leader_addr;
2327
2328 let enable_period_tick = matches!(members, Either::Right(_));
2329
2330 let member_management = MetaMemberManagement {
2331 core_ref,
2332 members,
2333 current_leader,
2334 meta_config,
2335 };
2336
2337 let mut force_refresh_receiver = force_refresh_receiver;
2338
2339 tokio::spawn(async move {
2340 let mut member_management = member_management;
2341 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2342
2343 loop {
2344 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2345 tokio::select! {
2346 _ = ticker.tick() => None,
2347 result_sender = force_refresh_receiver.recv() => {
2348 if result_sender.is_none() {
2349 break;
2350 }
2351
2352 result_sender
2353 },
2354 }
2355 } else {
2356 let result_sender = force_refresh_receiver.recv().await;
2357
2358 if result_sender.is_none() {
2359 break;
2360 }
2361
2362 result_sender
2363 };
2364
2365 let tick_result = member_management.refresh_members().await;
2366 if let Err(e) = tick_result.as_ref() {
2367 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2368 }
2369
2370 if let Some(sender) = event {
2371 let _resp = sender.send(tick_result);
2373 }
2374 }
2375 });
2376
2377 Ok(())
2378 }
2379
2380 async fn force_refresh_leader(&self) -> Result<()> {
2381 let (sender, receiver) = oneshot::channel();
2382
2383 self.member_monitor_event_sender
2384 .send(sender)
2385 .await
2386 .map_err(|e| anyhow!(e))?;
2387
2388 receiver.await.map_err(|e| anyhow!(e))?
2389 }
2390
2391 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2393 let (channel, addr) = match strategy {
2394 MetaAddressStrategy::LoadBalance(addr) => {
2395 Self::try_build_rpc_channel(vec![addr.clone()]).await
2396 }
2397 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2398 }?;
2399 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2400 let client = GrpcMetaClient {
2401 member_monitor_event_sender: force_refresh_sender,
2402 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2403 };
2404
2405 let meta_member_client = client.core.read().await.meta_member_client.clone();
2406 let members = match strategy {
2407 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2408 MetaAddressStrategy::List(addrs) => {
2409 let mut members = LruCache::new(20.try_into().unwrap());
2410 for addr in addrs {
2411 members.put(addr.clone(), None);
2412 }
2413 members.put(addr.clone(), Some(meta_member_client));
2414
2415 Either::Right(MetaMemberGroup { members })
2416 }
2417 };
2418
2419 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2420
2421 client.force_refresh_leader().await?;
2422
2423 Ok(client)
2424 }
2425
2426 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2427 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2428 }
2429
2430 pub(crate) async fn try_build_rpc_channel(
2431 addrs: impl IntoIterator<Item = http::Uri>,
2432 ) -> Result<(Channel, http::Uri)> {
2433 let endpoints: Vec<_> = addrs
2434 .into_iter()
2435 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2436 .collect();
2437
2438 let mut last_error = None;
2439
2440 for (endpoint, addr) in endpoints {
2441 match Self::connect_to_endpoint(endpoint).await {
2442 Ok(channel) => {
2443 tracing::info!("Connect to meta server {} successfully", addr);
2444 return Ok((channel, addr));
2445 }
2446 Err(e) => {
2447 tracing::warn!(
2448 error = %e.as_report(),
2449 "Failed to connect to meta server {}, trying again",
2450 addr,
2451 );
2452 last_error = Some(e);
2453 }
2454 }
2455 }
2456
2457 if let Some(last_error) = last_error {
2458 Err(anyhow::anyhow!(last_error)
2459 .context("failed to connect to all meta servers")
2460 .into())
2461 } else {
2462 bail!("no meta server address provided")
2463 }
2464 }
2465
2466 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2467 let channel = endpoint
2468 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2469 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2470 .connect_timeout(Duration::from_secs(5))
2471 .monitored_connect("grpc-meta-client", Default::default())
2472 .await?
2473 .wrapped();
2474
2475 Ok(channel)
2476 }
2477
2478 pub(crate) fn retry_strategy_to_bound(
2479 high_bound: Duration,
2480 exceed: bool,
2481 ) -> impl Iterator<Item = Duration> {
2482 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2483 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2484 .map(jitter);
2485
2486 let mut sum = Duration::default();
2487
2488 iter.take_while(move |duration| {
2489 sum += *duration;
2490
2491 if exceed {
2492 sum < high_bound + *duration
2493 } else {
2494 sum < high_bound
2495 }
2496 })
2497 }
2498}
2499
2500macro_rules! for_all_meta_rpc {
2501 ($macro:ident) => {
2502 $macro! {
2503 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2504 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2505 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2506 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2507 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2508 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2509 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2510 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2511 ,{ stream_client, flush, FlushRequest, FlushResponse }
2512 ,{ stream_client, pause, PauseRequest, PauseResponse }
2513 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2514 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2515 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2516 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2517 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2518 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2519 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2520 ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2521 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2522 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2523 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2524 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2525 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2526 ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2527 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2528 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2529 ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2530 ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2531 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2532 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2533 ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2534 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2535 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2536 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2537 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2538 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2539 ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2540 ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2541 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2542 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2543 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2544 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2545 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2546 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2547 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2548 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2549 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2550 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2551 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2552 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2553 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2554 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2555 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2556 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2557 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2558 ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2559 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2560 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2561 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2562 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2563 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2564 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2565 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2566 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2567 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2568 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2569 ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2570 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2571 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2572 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2573 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2574 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2575 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2576 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2577 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2578 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2579 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2580 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2581 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2582 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2583 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2584 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2585 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2586 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2587 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2588 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2589 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2590 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2591 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2592 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2593 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2594 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2595 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2596 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2597 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2598 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2599 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2600 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2601 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2602 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2603 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2604 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2605 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2606 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2607 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2608 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2609 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2610 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2611 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2612 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2613 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2614 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2615 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2616 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2617 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2618 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2619 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2620 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2621 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2622 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2623 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2624 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2625 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2626 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2627 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2628 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2629 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2630 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2631 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2632 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2633 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2634 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2635 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2636 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2637 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2638 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2639 }
2640 };
2641}
2642
2643impl GrpcMetaClient {
2644 async fn refresh_client_if_needed(&self, code: Code) {
2645 if matches!(
2646 code,
2647 Code::Unknown | Code::Unimplemented | Code::Unavailable
2648 ) {
2649 tracing::debug!("matching tonic code {}", code);
2650 let (result_sender, result_receiver) = oneshot::channel();
2651 if self
2652 .member_monitor_event_sender
2653 .try_send(result_sender)
2654 .is_ok()
2655 {
2656 if let Ok(Err(e)) = result_receiver.await {
2657 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2658 }
2659 } else {
2660 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2661 }
2662 }
2663 }
2664}
2665
2666impl GrpcMetaClient {
2667 for_all_meta_rpc! { meta_rpc_client_method_impl }
2668}