1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55 PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77 subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::{
80 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
81};
82use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
83use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
84use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
85use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
86use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
87use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
88use risingwave_pb::meta::list_actor_states_response::ActorState;
89use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
90use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
91use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
92use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
93use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
94use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
95use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
96use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
97use risingwave_pb::meta::serving_service_client::ServingServiceClient;
98use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
99use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
100use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
101use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
102use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
103use risingwave_pb::meta::{FragmentDistribution, *};
104use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
105use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
106use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
107use risingwave_pb::secret::PbSecretRef;
108use risingwave_pb::stream_plan::StreamFragmentGraph;
109use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
110use risingwave_pb::user::update_user_request::UpdateField;
111use risingwave_pb::user::user_service_client::UserServiceClient;
112use risingwave_pb::user::*;
113use thiserror_ext::AsReport;
114use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
115use tokio::sync::oneshot::Sender;
116use tokio::sync::{RwLock, mpsc, oneshot};
117use tokio::task::JoinHandle;
118use tokio::time::{self};
119use tokio_retry::strategy::{ExponentialBackoff, jitter};
120use tokio_stream::wrappers::UnboundedReceiverStream;
121use tonic::transport::Endpoint;
122use tonic::{Code, Request, Streaming};
123
124use crate::channel::{Channel, WrappedChannelExt};
125use crate::error::{Result, RpcError};
126use crate::hummock_meta_client::{
127 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
128 IcebergCompactionEventItem,
129};
130use crate::meta_rpc_client_method_impl;
131
132type ConnectionId = u32;
133type DatabaseId = u32;
134type SchemaId = u32;
135
136#[derive(Clone, Debug)]
138pub struct MetaClient {
139 worker_id: u32,
140 worker_type: WorkerType,
141 host_addr: HostAddr,
142 inner: GrpcMetaClient,
143 meta_config: Arc<MetaConfig>,
144 cluster_id: String,
145 shutting_down: Arc<AtomicBool>,
146}
147
148impl MetaClient {
149 pub fn worker_id(&self) -> u32 {
150 self.worker_id
151 }
152
153 pub fn host_addr(&self) -> &HostAddr {
154 &self.host_addr
155 }
156
157 pub fn worker_type(&self) -> WorkerType {
158 self.worker_type
159 }
160
161 pub fn cluster_id(&self) -> &str {
162 &self.cluster_id
163 }
164
165 pub async fn subscribe(
167 &self,
168 subscribe_type: SubscribeType,
169 ) -> Result<Streaming<SubscribeResponse>> {
170 let request = SubscribeRequest {
171 subscribe_type: subscribe_type as i32,
172 host: Some(self.host_addr.to_protobuf()),
173 worker_id: self.worker_id(),
174 };
175
176 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
177 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
178 true,
179 );
180
181 tokio_retry::Retry::spawn(retry_strategy, || async {
182 let request = request.clone();
183 self.inner.subscribe(request).await
184 })
185 .await
186 }
187
188 pub async fn create_connection(
189 &self,
190 connection_name: String,
191 database_id: u32,
192 schema_id: u32,
193 owner_id: u32,
194 req: create_connection_request::Payload,
195 ) -> Result<WaitVersion> {
196 let request = CreateConnectionRequest {
197 name: connection_name,
198 database_id,
199 schema_id,
200 owner_id,
201 payload: Some(req),
202 };
203 let resp = self.inner.create_connection(request).await?;
204 Ok(resp
205 .version
206 .ok_or_else(|| anyhow!("wait version not set"))?)
207 }
208
209 pub async fn create_secret(
210 &self,
211 secret_name: String,
212 database_id: u32,
213 schema_id: u32,
214 owner_id: u32,
215 value: Vec<u8>,
216 ) -> Result<WaitVersion> {
217 let request = CreateSecretRequest {
218 name: secret_name,
219 database_id,
220 schema_id,
221 owner_id,
222 value,
223 };
224 let resp = self.inner.create_secret(request).await?;
225 Ok(resp
226 .version
227 .ok_or_else(|| anyhow!("wait version not set"))?)
228 }
229
230 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
231 let request = ListConnectionsRequest {};
232 let resp = self.inner.list_connections(request).await?;
233 Ok(resp.connections)
234 }
235
236 pub async fn drop_connection(
237 &self,
238 connection_id: ConnectionId,
239 cascade: bool,
240 ) -> Result<WaitVersion> {
241 let request = DropConnectionRequest {
242 connection_id,
243 cascade,
244 };
245 let resp = self.inner.drop_connection(request).await?;
246 Ok(resp
247 .version
248 .ok_or_else(|| anyhow!("wait version not set"))?)
249 }
250
251 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
252 let request = DropSecretRequest {
253 secret_id: secret_id.into(),
254 };
255 let resp = self.inner.drop_secret(request).await?;
256 Ok(resp
257 .version
258 .ok_or_else(|| anyhow!("wait version not set"))?)
259 }
260
261 pub async fn register_new(
265 addr_strategy: MetaAddressStrategy,
266 worker_type: WorkerType,
267 addr: &HostAddr,
268 property: Property,
269 meta_config: Arc<MetaConfig>,
270 ) -> (Self, SystemParamsReader) {
271 let ret =
272 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
273
274 match ret {
275 Ok(ret) => ret,
276 Err(err) => {
277 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
278 std::process::exit(1);
279 }
280 }
281 }
282
283 async fn register_new_inner(
284 addr_strategy: MetaAddressStrategy,
285 worker_type: WorkerType,
286 addr: &HostAddr,
287 property: Property,
288 meta_config: Arc<MetaConfig>,
289 ) -> Result<(Self, SystemParamsReader)> {
290 tracing::info!("register meta client using strategy: {}", addr_strategy);
291
292 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
294 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
295 true,
296 );
297
298 if property.is_unschedulable {
299 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
300 }
301 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
302 retry_strategy,
303 || async {
304 let grpc_meta_client =
305 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
306
307 let add_worker_resp = grpc_meta_client
308 .add_worker_node(AddWorkerNodeRequest {
309 worker_type: worker_type as i32,
310 host: Some(addr.to_protobuf()),
311 property: Some(property.clone()),
312 resource: Some(risingwave_pb::common::worker_node::Resource {
313 rw_version: RW_VERSION.to_owned(),
314 total_memory_bytes: system_memory_available_bytes() as _,
315 total_cpu_cores: total_cpu_available() as _,
316 }),
317 })
318 .await
319 .context("failed to add worker node")?;
320
321 let system_params_resp = grpc_meta_client
322 .get_system_params(GetSystemParamsRequest {})
323 .await
324 .context("failed to get initial system params")?;
325
326 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
327 },
328 |e: &RpcError| !e.is_from_tonic_server_impl(),
331 )
332 .await;
333
334 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
335 let worker_id = add_worker_resp
336 .node_id
337 .expect("AddWorkerNodeResponse::node_id is empty");
338
339 let meta_client = Self {
340 worker_id,
341 worker_type,
342 host_addr: addr.clone(),
343 inner: grpc_meta_client,
344 meta_config: meta_config.clone(),
345 cluster_id: add_worker_resp.cluster_id,
346 shutting_down: Arc::new(false.into()),
347 };
348
349 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
350 REPORT_PANIC.call_once(|| {
351 let meta_client_clone = meta_client.clone();
352 std::panic::update_hook(move |default_hook, info| {
353 meta_client_clone.try_add_panic_event_blocking(info, None);
355 default_hook(info);
356 });
357 });
358
359 Ok((meta_client, system_params_resp.params.unwrap().into()))
360 }
361
362 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
364 let request = ActivateWorkerNodeRequest {
365 host: Some(addr.to_protobuf()),
366 node_id: self.worker_id,
367 };
368 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
369 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
370 true,
371 );
372 tokio_retry::Retry::spawn(retry_strategy, || async {
373 let request = request.clone();
374 self.inner.activate_worker_node(request).await
375 })
376 .await?;
377
378 Ok(())
379 }
380
381 pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
383 let request = HeartbeatRequest { node_id };
384 let resp = self.inner.heartbeat(request).await?;
385 if let Some(status) = resp.status
386 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
387 {
388 if !self.shutting_down.load(Relaxed) {
391 tracing::error!(message = status.message, "worker expired");
392 std::process::exit(1);
393 }
394 }
395 Ok(())
396 }
397
398 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
399 let request = CreateDatabaseRequest { db: Some(db) };
400 let resp = self.inner.create_database(request).await?;
401 Ok(resp
403 .version
404 .ok_or_else(|| anyhow!("wait version not set"))?)
405 }
406
407 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
408 let request = CreateSchemaRequest {
409 schema: Some(schema),
410 };
411 let resp = self.inner.create_schema(request).await?;
412 Ok(resp
414 .version
415 .ok_or_else(|| anyhow!("wait version not set"))?)
416 }
417
418 pub async fn create_materialized_view(
419 &self,
420 table: PbTable,
421 graph: StreamFragmentGraph,
422 dependencies: HashSet<ObjectId>,
423 specific_resource_group: Option<String>,
424 if_not_exists: bool,
425 ) -> Result<WaitVersion> {
426 let request = CreateMaterializedViewRequest {
427 materialized_view: Some(table),
428 fragment_graph: Some(graph),
429 backfill: PbBackfillType::Regular as _,
430 dependencies: dependencies.into_iter().collect(),
431 specific_resource_group,
432 if_not_exists,
433 };
434 let resp = self.inner.create_materialized_view(request).await?;
435 Ok(resp
437 .version
438 .ok_or_else(|| anyhow!("wait version not set"))?)
439 }
440
441 pub async fn drop_materialized_view(
442 &self,
443 table_id: TableId,
444 cascade: bool,
445 ) -> Result<WaitVersion> {
446 let request = DropMaterializedViewRequest {
447 table_id: table_id.table_id(),
448 cascade,
449 };
450
451 let resp = self.inner.drop_materialized_view(request).await?;
452 Ok(resp
453 .version
454 .ok_or_else(|| anyhow!("wait version not set"))?)
455 }
456
457 pub async fn create_source(
458 &self,
459 source: PbSource,
460 graph: Option<StreamFragmentGraph>,
461 if_not_exists: bool,
462 ) -> Result<WaitVersion> {
463 let request = CreateSourceRequest {
464 source: Some(source),
465 fragment_graph: graph,
466 if_not_exists,
467 };
468
469 let resp = self.inner.create_source(request).await?;
470 Ok(resp
471 .version
472 .ok_or_else(|| anyhow!("wait version not set"))?)
473 }
474
475 pub async fn create_sink(
476 &self,
477 sink: PbSink,
478 graph: StreamFragmentGraph,
479 dependencies: HashSet<ObjectId>,
480 if_not_exists: bool,
481 ) -> Result<WaitVersion> {
482 let request = CreateSinkRequest {
483 sink: Some(sink),
484 fragment_graph: Some(graph),
485 dependencies: dependencies.into_iter().collect(),
486 if_not_exists,
487 };
488
489 let resp = self.inner.create_sink(request).await?;
490 Ok(resp
491 .version
492 .ok_or_else(|| anyhow!("wait version not set"))?)
493 }
494
495 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
496 let request = CreateSubscriptionRequest {
497 subscription: Some(subscription),
498 };
499
500 let resp = self.inner.create_subscription(request).await?;
501 Ok(resp
502 .version
503 .ok_or_else(|| anyhow!("wait version not set"))?)
504 }
505
506 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
507 let request = CreateFunctionRequest {
508 function: Some(function),
509 };
510 let resp = self.inner.create_function(request).await?;
511 Ok(resp
512 .version
513 .ok_or_else(|| anyhow!("wait version not set"))?)
514 }
515
516 pub async fn create_table(
517 &self,
518 source: Option<PbSource>,
519 table: PbTable,
520 graph: StreamFragmentGraph,
521 job_type: PbTableJobType,
522 if_not_exists: bool,
523 dependencies: HashSet<ObjectId>,
524 ) -> Result<WaitVersion> {
525 let request = CreateTableRequest {
526 materialized_view: Some(table),
527 fragment_graph: Some(graph),
528 source,
529 job_type: job_type as _,
530 if_not_exists,
531 dependencies: dependencies.into_iter().collect(),
532 };
533 let resp = self.inner.create_table(request).await?;
534 Ok(resp
536 .version
537 .ok_or_else(|| anyhow!("wait version not set"))?)
538 }
539
540 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
541 let request = CommentOnRequest {
542 comment: Some(comment),
543 };
544 let resp = self.inner.comment_on(request).await?;
545 Ok(resp
546 .version
547 .ok_or_else(|| anyhow!("wait version not set"))?)
548 }
549
550 pub async fn alter_name(
551 &self,
552 object: alter_name_request::Object,
553 name: &str,
554 ) -> Result<WaitVersion> {
555 let request = AlterNameRequest {
556 object: Some(object),
557 new_name: name.to_owned(),
558 };
559 let resp = self.inner.alter_name(request).await?;
560 Ok(resp
561 .version
562 .ok_or_else(|| anyhow!("wait version not set"))?)
563 }
564
565 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
566 let request = AlterOwnerRequest {
567 object: Some(object),
568 owner_id,
569 };
570 let resp = self.inner.alter_owner(request).await?;
571 Ok(resp
572 .version
573 .ok_or_else(|| anyhow!("wait version not set"))?)
574 }
575
576 pub async fn alter_database_param(
577 &self,
578 database_id: DatabaseId,
579 param: AlterDatabaseParam,
580 ) -> Result<WaitVersion> {
581 let request = match param {
582 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
583 let barrier_interval_ms = OptionalUint32 {
584 value: barrier_interval_ms,
585 };
586 AlterDatabaseParamRequest {
587 database_id,
588 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
589 barrier_interval_ms,
590 )),
591 }
592 }
593 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
594 let checkpoint_frequency = OptionalUint64 {
595 value: checkpoint_frequency,
596 };
597 AlterDatabaseParamRequest {
598 database_id,
599 param: Some(alter_database_param_request::Param::CheckpointFrequency(
600 checkpoint_frequency,
601 )),
602 }
603 }
604 };
605 let resp = self.inner.alter_database_param(request).await?;
606 Ok(resp
607 .version
608 .ok_or_else(|| anyhow!("wait version not set"))?)
609 }
610
611 pub async fn alter_set_schema(
612 &self,
613 object: alter_set_schema_request::Object,
614 new_schema_id: u32,
615 ) -> Result<WaitVersion> {
616 let request = AlterSetSchemaRequest {
617 new_schema_id,
618 object: Some(object),
619 };
620 let resp = self.inner.alter_set_schema(request).await?;
621 Ok(resp
622 .version
623 .ok_or_else(|| anyhow!("wait version not set"))?)
624 }
625
626 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
627 let request = AlterSourceRequest {
628 source: Some(source),
629 };
630 let resp = self.inner.alter_source(request).await?;
631 Ok(resp
632 .version
633 .ok_or_else(|| anyhow!("wait version not set"))?)
634 }
635
636 pub async fn alter_parallelism(
637 &self,
638 table_id: u32,
639 parallelism: PbTableParallelism,
640 deferred: bool,
641 ) -> Result<()> {
642 let request = AlterParallelismRequest {
643 table_id,
644 parallelism: Some(parallelism),
645 deferred,
646 };
647
648 self.inner.alter_parallelism(request).await?;
649 Ok(())
650 }
651
652 pub async fn alter_cdc_table_backfill_parallelism(
653 &self,
654 table_id: u32,
655 parallelism: PbTableParallelism,
656 ) -> Result<()> {
657 let request = AlterCdcTableBackfillParallelismRequest {
658 table_id,
659 parallelism: Some(parallelism),
660 };
661 self.inner
662 .alter_cdc_table_backfill_parallelism(request)
663 .await?;
664 Ok(())
665 }
666
667 pub async fn alter_resource_group(
668 &self,
669 table_id: u32,
670 resource_group: Option<String>,
671 deferred: bool,
672 ) -> Result<()> {
673 let request = AlterResourceGroupRequest {
674 table_id,
675 resource_group,
676 deferred,
677 };
678
679 self.inner.alter_resource_group(request).await?;
680 Ok(())
681 }
682
683 pub async fn alter_swap_rename(
684 &self,
685 object: alter_swap_rename_request::Object,
686 ) -> Result<WaitVersion> {
687 let request = AlterSwapRenameRequest {
688 object: Some(object),
689 };
690 let resp = self.inner.alter_swap_rename(request).await?;
691 Ok(resp
692 .version
693 .ok_or_else(|| anyhow!("wait version not set"))?)
694 }
695
696 pub async fn alter_secret(
697 &self,
698 secret_id: u32,
699 secret_name: String,
700 database_id: u32,
701 schema_id: u32,
702 owner_id: u32,
703 value: Vec<u8>,
704 ) -> Result<WaitVersion> {
705 let request = AlterSecretRequest {
706 secret_id,
707 name: secret_name,
708 database_id,
709 schema_id,
710 owner_id,
711 value,
712 };
713 let resp = self.inner.alter_secret(request).await?;
714 Ok(resp
715 .version
716 .ok_or_else(|| anyhow!("wait version not set"))?)
717 }
718
719 pub async fn replace_job(
720 &self,
721 graph: StreamFragmentGraph,
722 replace_job: ReplaceJob,
723 ) -> Result<WaitVersion> {
724 let request = ReplaceJobPlanRequest {
725 plan: Some(ReplaceJobPlan {
726 fragment_graph: Some(graph),
727 replace_job: Some(replace_job),
728 }),
729 };
730 let resp = self.inner.replace_job_plan(request).await?;
731 Ok(resp
733 .version
734 .ok_or_else(|| anyhow!("wait version not set"))?)
735 }
736
737 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
738 let request = AutoSchemaChangeRequest {
739 schema_change: Some(schema_change),
740 };
741 let _ = self.inner.auto_schema_change(request).await?;
742 Ok(())
743 }
744
745 pub async fn create_view(
746 &self,
747 view: PbView,
748 dependencies: HashSet<ObjectId>,
749 ) -> Result<WaitVersion> {
750 let request = CreateViewRequest {
751 view: Some(view),
752 dependencies: dependencies.into_iter().collect(),
753 };
754 let resp = self.inner.create_view(request).await?;
755 Ok(resp
757 .version
758 .ok_or_else(|| anyhow!("wait version not set"))?)
759 }
760
761 pub async fn create_index(
762 &self,
763 index: PbIndex,
764 table: PbTable,
765 graph: StreamFragmentGraph,
766 if_not_exists: bool,
767 ) -> Result<WaitVersion> {
768 let request = CreateIndexRequest {
769 index: Some(index),
770 index_table: Some(table),
771 fragment_graph: Some(graph),
772 if_not_exists,
773 };
774 let resp = self.inner.create_index(request).await?;
775 Ok(resp
777 .version
778 .ok_or_else(|| anyhow!("wait version not set"))?)
779 }
780
781 pub async fn drop_table(
782 &self,
783 source_id: Option<u32>,
784 table_id: TableId,
785 cascade: bool,
786 ) -> Result<WaitVersion> {
787 let request = DropTableRequest {
788 source_id: source_id.map(SourceId::Id),
789 table_id: table_id.table_id(),
790 cascade,
791 };
792
793 let resp = self.inner.drop_table(request).await?;
794 Ok(resp
795 .version
796 .ok_or_else(|| anyhow!("wait version not set"))?)
797 }
798
799 pub async fn compact_iceberg_table(&self, sink_id: u32) -> Result<u64> {
800 let request = CompactIcebergTableRequest { sink_id };
801 let resp = self.inner.compact_iceberg_table(request).await?;
802 Ok(resp.task_id)
803 }
804
805 pub async fn expire_iceberg_table_snapshots(&self, sink_id: u32) -> Result<()> {
806 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
807 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
808 Ok(())
809 }
810
811 pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
812 let request = DropViewRequest { view_id, cascade };
813 let resp = self.inner.drop_view(request).await?;
814 Ok(resp
815 .version
816 .ok_or_else(|| anyhow!("wait version not set"))?)
817 }
818
819 pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
820 let request = DropSourceRequest { source_id, cascade };
821 let resp = self.inner.drop_source(request).await?;
822 Ok(resp
823 .version
824 .ok_or_else(|| anyhow!("wait version not set"))?)
825 }
826
827 pub async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<WaitVersion> {
828 let request = DropSinkRequest { sink_id, cascade };
829 let resp = self.inner.drop_sink(request).await?;
830 Ok(resp
831 .version
832 .ok_or_else(|| anyhow!("wait version not set"))?)
833 }
834
835 pub async fn drop_subscription(
836 &self,
837 subscription_id: u32,
838 cascade: bool,
839 ) -> Result<WaitVersion> {
840 let request = DropSubscriptionRequest {
841 subscription_id,
842 cascade,
843 };
844 let resp = self.inner.drop_subscription(request).await?;
845 Ok(resp
846 .version
847 .ok_or_else(|| anyhow!("wait version not set"))?)
848 }
849
850 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
851 let request = DropIndexRequest {
852 index_id: index_id.index_id,
853 cascade,
854 };
855 let resp = self.inner.drop_index(request).await?;
856 Ok(resp
857 .version
858 .ok_or_else(|| anyhow!("wait version not set"))?)
859 }
860
861 pub async fn drop_function(
862 &self,
863 function_id: FunctionId,
864 cascade: bool,
865 ) -> Result<WaitVersion> {
866 let request = DropFunctionRequest {
867 function_id: function_id.0,
868 cascade,
869 };
870 let resp = self.inner.drop_function(request).await?;
871 Ok(resp
872 .version
873 .ok_or_else(|| anyhow!("wait version not set"))?)
874 }
875
876 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
877 let request = DropDatabaseRequest { database_id };
878 let resp = self.inner.drop_database(request).await?;
879 Ok(resp
880 .version
881 .ok_or_else(|| anyhow!("wait version not set"))?)
882 }
883
884 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
885 let request = DropSchemaRequest { schema_id, cascade };
886 let resp = self.inner.drop_schema(request).await?;
887 Ok(resp
888 .version
889 .ok_or_else(|| anyhow!("wait version not set"))?)
890 }
891
892 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
894 let request = CreateUserRequest { user: Some(user) };
895 let resp = self.inner.create_user(request).await?;
896 Ok(resp.version)
897 }
898
899 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
900 let request = DropUserRequest { user_id };
901 let resp = self.inner.drop_user(request).await?;
902 Ok(resp.version)
903 }
904
905 pub async fn update_user(
906 &self,
907 user: UserInfo,
908 update_fields: Vec<UpdateField>,
909 ) -> Result<u64> {
910 let request = UpdateUserRequest {
911 user: Some(user),
912 update_fields: update_fields
913 .into_iter()
914 .map(|field| field as i32)
915 .collect::<Vec<_>>(),
916 };
917 let resp = self.inner.update_user(request).await?;
918 Ok(resp.version)
919 }
920
921 pub async fn grant_privilege(
922 &self,
923 user_ids: Vec<u32>,
924 privileges: Vec<GrantPrivilege>,
925 with_grant_option: bool,
926 granted_by: u32,
927 ) -> Result<u64> {
928 let request = GrantPrivilegeRequest {
929 user_ids,
930 privileges,
931 with_grant_option,
932 granted_by,
933 };
934 let resp = self.inner.grant_privilege(request).await?;
935 Ok(resp.version)
936 }
937
938 pub async fn revoke_privilege(
939 &self,
940 user_ids: Vec<u32>,
941 privileges: Vec<GrantPrivilege>,
942 granted_by: u32,
943 revoke_by: u32,
944 revoke_grant_option: bool,
945 cascade: bool,
946 ) -> Result<u64> {
947 let request = RevokePrivilegeRequest {
948 user_ids,
949 privileges,
950 granted_by,
951 revoke_by,
952 revoke_grant_option,
953 cascade,
954 };
955 let resp = self.inner.revoke_privilege(request).await?;
956 Ok(resp.version)
957 }
958
959 pub async fn alter_default_privilege(
960 &self,
961 user_ids: Vec<u32>,
962 database_id: DatabaseId,
963 schema_ids: Vec<SchemaId>,
964 operation: AlterDefaultPrivilegeOperation,
965 granted_by: u32,
966 ) -> Result<()> {
967 let request = AlterDefaultPrivilegeRequest {
968 user_ids,
969 database_id,
970 schema_ids,
971 operation: Some(operation),
972 granted_by,
973 };
974 self.inner.alter_default_privilege(request).await?;
975 Ok(())
976 }
977
978 pub async fn unregister(&self) -> Result<()> {
980 let request = DeleteWorkerNodeRequest {
981 host: Some(self.host_addr.to_protobuf()),
982 };
983 self.inner.delete_worker_node(request).await?;
984 self.shutting_down.store(true, Relaxed);
985 Ok(())
986 }
987
988 pub async fn try_unregister(&self) {
990 match self.unregister().await {
991 Ok(_) => {
992 tracing::info!(
993 worker_id = self.worker_id(),
994 "successfully unregistered from meta service",
995 )
996 }
997 Err(e) => {
998 tracing::warn!(
999 error = %e.as_report(),
1000 worker_id = self.worker_id(),
1001 "failed to unregister from meta service",
1002 );
1003 }
1004 }
1005 }
1006
1007 pub async fn update_schedulability(
1008 &self,
1009 worker_ids: &[u32],
1010 schedulability: Schedulability,
1011 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1012 let request = UpdateWorkerNodeSchedulabilityRequest {
1013 worker_ids: worker_ids.to_vec(),
1014 schedulability: schedulability.into(),
1015 };
1016 let resp = self
1017 .inner
1018 .update_worker_node_schedulability(request)
1019 .await?;
1020 Ok(resp)
1021 }
1022
1023 pub async fn list_worker_nodes(
1024 &self,
1025 worker_type: Option<WorkerType>,
1026 ) -> Result<Vec<WorkerNode>> {
1027 let request = ListAllNodesRequest {
1028 worker_type: worker_type.map(Into::into),
1029 include_starting_nodes: true,
1030 };
1031 let resp = self.inner.list_all_nodes(request).await?;
1032 Ok(resp.nodes)
1033 }
1034
1035 pub fn start_heartbeat_loop(
1037 meta_client: MetaClient,
1038 min_interval: Duration,
1039 ) -> (JoinHandle<()>, Sender<()>) {
1040 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1041 let join_handle = tokio::spawn(async move {
1042 let mut min_interval_ticker = tokio::time::interval(min_interval);
1043 loop {
1044 tokio::select! {
1045 biased;
1046 _ = &mut shutdown_rx => {
1048 tracing::info!("Heartbeat loop is stopped");
1049 return;
1050 }
1051 _ = min_interval_ticker.tick() => {},
1053 }
1054 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1055 match tokio::time::timeout(
1056 min_interval * 3,
1058 meta_client.send_heartbeat(meta_client.worker_id()),
1059 )
1060 .await
1061 {
1062 Ok(Ok(_)) => {}
1063 Ok(Err(err)) => {
1064 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1065 }
1066 Err(_) => {
1067 tracing::warn!("Failed to send_heartbeat: timeout");
1068 }
1069 }
1070 }
1071 });
1072 (join_handle, shutdown_tx)
1073 }
1074
1075 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1076 let request = RisectlListStateTablesRequest {};
1077 let resp = self.inner.risectl_list_state_tables(request).await?;
1078 Ok(resp.tables)
1079 }
1080
1081 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1082 let request = FlushRequest { database_id };
1083 let resp = self.inner.flush(request).await?;
1084 Ok(HummockVersionId::new(resp.hummock_version_id))
1085 }
1086
1087 pub async fn wait(&self) -> Result<()> {
1088 let request = WaitRequest {};
1089 self.inner.wait(request).await?;
1090 Ok(())
1091 }
1092
1093 pub async fn recover(&self) -> Result<()> {
1094 let request = RecoverRequest {};
1095 self.inner.recover(request).await?;
1096 Ok(())
1097 }
1098
1099 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1100 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1101 let resp = self.inner.cancel_creating_jobs(request).await?;
1102 Ok(resp.canceled_jobs)
1103 }
1104
1105 pub async fn list_table_fragments(
1106 &self,
1107 table_ids: &[u32],
1108 ) -> Result<HashMap<u32, TableFragmentInfo>> {
1109 let request = ListTableFragmentsRequest {
1110 table_ids: table_ids.to_vec(),
1111 };
1112 let resp = self.inner.list_table_fragments(request).await?;
1113 Ok(resp.table_fragments)
1114 }
1115
1116 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1117 let resp = self
1118 .inner
1119 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1120 .await?;
1121 Ok(resp.states)
1122 }
1123
1124 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1125 let resp = self
1126 .inner
1127 .list_fragment_distribution(ListFragmentDistributionRequest {})
1128 .await?;
1129 Ok(resp.distributions)
1130 }
1131
1132 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1133 let resp = self
1134 .inner
1135 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1136 .await?;
1137 Ok(resp.distributions)
1138 }
1139
1140 pub async fn get_fragment_by_id(
1141 &self,
1142 fragment_id: u32,
1143 ) -> Result<Option<FragmentDistribution>> {
1144 let resp = self
1145 .inner
1146 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1147 .await?;
1148 Ok(resp.distribution)
1149 }
1150
1151 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1152 let resp = self
1153 .inner
1154 .list_actor_states(ListActorStatesRequest {})
1155 .await?;
1156 Ok(resp.states)
1157 }
1158
1159 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1160 let resp = self
1161 .inner
1162 .list_actor_splits(ListActorSplitsRequest {})
1163 .await?;
1164
1165 Ok(resp.actor_splits)
1166 }
1167
1168 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1169 let resp = self
1170 .inner
1171 .list_object_dependencies(ListObjectDependenciesRequest {})
1172 .await?;
1173 Ok(resp.dependencies)
1174 }
1175
1176 pub async fn pause(&self) -> Result<PauseResponse> {
1177 let request = PauseRequest {};
1178 let resp = self.inner.pause(request).await?;
1179 Ok(resp)
1180 }
1181
1182 pub async fn resume(&self) -> Result<ResumeResponse> {
1183 let request = ResumeRequest {};
1184 let resp = self.inner.resume(request).await?;
1185 Ok(resp)
1186 }
1187
1188 pub async fn apply_throttle(
1189 &self,
1190 kind: PbThrottleTarget,
1191 id: u32,
1192 rate: Option<u32>,
1193 ) -> Result<ApplyThrottleResponse> {
1194 let request = ApplyThrottleRequest {
1195 kind: kind as i32,
1196 id,
1197 rate,
1198 };
1199 let resp = self.inner.apply_throttle(request).await?;
1200 Ok(resp)
1201 }
1202
1203 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1204 let resp = self
1205 .inner
1206 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1207 .await?;
1208 Ok(resp.get_status().unwrap())
1209 }
1210
1211 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1212 let request = GetClusterInfoRequest {};
1213 let resp = self.inner.get_cluster_info(request).await?;
1214 Ok(resp)
1215 }
1216
1217 pub async fn reschedule(
1218 &self,
1219 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1220 revision: u64,
1221 resolve_no_shuffle_upstream: bool,
1222 ) -> Result<(bool, u64)> {
1223 let request = RescheduleRequest {
1224 revision,
1225 resolve_no_shuffle_upstream,
1226 worker_reschedules,
1227 };
1228 let resp = self.inner.reschedule(request).await?;
1229 Ok((resp.success, resp.revision))
1230 }
1231
1232 pub async fn risectl_get_pinned_versions_summary(
1233 &self,
1234 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1235 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1236 self.inner
1237 .rise_ctl_get_pinned_versions_summary(request)
1238 .await
1239 }
1240
1241 pub async fn risectl_get_checkpoint_hummock_version(
1242 &self,
1243 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1244 let request = RiseCtlGetCheckpointVersionRequest {};
1245 self.inner.rise_ctl_get_checkpoint_version(request).await
1246 }
1247
1248 pub async fn risectl_pause_hummock_version_checkpoint(
1249 &self,
1250 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1251 let request = RiseCtlPauseVersionCheckpointRequest {};
1252 self.inner.rise_ctl_pause_version_checkpoint(request).await
1253 }
1254
1255 pub async fn risectl_resume_hummock_version_checkpoint(
1256 &self,
1257 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1258 let request = RiseCtlResumeVersionCheckpointRequest {};
1259 self.inner.rise_ctl_resume_version_checkpoint(request).await
1260 }
1261
1262 pub async fn init_metadata_for_replay(
1263 &self,
1264 tables: Vec<PbTable>,
1265 compaction_groups: Vec<CompactionGroupInfo>,
1266 ) -> Result<()> {
1267 let req = InitMetadataForReplayRequest {
1268 tables,
1269 compaction_groups,
1270 };
1271 let _resp = self.inner.init_metadata_for_replay(req).await?;
1272 Ok(())
1273 }
1274
1275 pub async fn replay_version_delta(
1276 &self,
1277 version_delta: HummockVersionDelta,
1278 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1279 let req = ReplayVersionDeltaRequest {
1280 version_delta: Some(version_delta.into()),
1281 };
1282 let resp = self.inner.replay_version_delta(req).await?;
1283 Ok((
1284 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1285 resp.modified_compaction_groups,
1286 ))
1287 }
1288
1289 pub async fn list_version_deltas(
1290 &self,
1291 start_id: HummockVersionId,
1292 num_limit: u32,
1293 committed_epoch_limit: HummockEpoch,
1294 ) -> Result<Vec<HummockVersionDelta>> {
1295 let req = ListVersionDeltasRequest {
1296 start_id: start_id.to_u64(),
1297 num_limit,
1298 committed_epoch_limit,
1299 };
1300 Ok(self
1301 .inner
1302 .list_version_deltas(req)
1303 .await?
1304 .version_deltas
1305 .unwrap()
1306 .version_deltas
1307 .iter()
1308 .map(HummockVersionDelta::from_rpc_protobuf)
1309 .collect())
1310 }
1311
1312 pub async fn trigger_compaction_deterministic(
1313 &self,
1314 version_id: HummockVersionId,
1315 compaction_groups: Vec<CompactionGroupId>,
1316 ) -> Result<()> {
1317 let req = TriggerCompactionDeterministicRequest {
1318 version_id: version_id.to_u64(),
1319 compaction_groups,
1320 };
1321 self.inner.trigger_compaction_deterministic(req).await?;
1322 Ok(())
1323 }
1324
1325 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1326 let req = DisableCommitEpochRequest {};
1327 Ok(HummockVersion::from_rpc_protobuf(
1328 &self
1329 .inner
1330 .disable_commit_epoch(req)
1331 .await?
1332 .current_version
1333 .unwrap(),
1334 ))
1335 }
1336
1337 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1338 let req = GetAssignedCompactTaskNumRequest {};
1339 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1340 Ok(resp.num_tasks as usize)
1341 }
1342
1343 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1344 let req = RiseCtlListCompactionGroupRequest {};
1345 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1346 Ok(resp.compaction_groups)
1347 }
1348
1349 pub async fn risectl_update_compaction_config(
1350 &self,
1351 compaction_groups: &[CompactionGroupId],
1352 configs: &[MutableConfig],
1353 ) -> Result<()> {
1354 let req = RiseCtlUpdateCompactionConfigRequest {
1355 compaction_group_ids: compaction_groups.to_vec(),
1356 configs: configs
1357 .iter()
1358 .map(
1359 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1360 mutable_config: Some(c.clone()),
1361 },
1362 )
1363 .collect(),
1364 };
1365 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1366 Ok(())
1367 }
1368
1369 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1370 let req = BackupMetaRequest { remarks };
1371 let resp = self.inner.backup_meta(req).await?;
1372 Ok(resp.job_id)
1373 }
1374
1375 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1376 let req = GetBackupJobStatusRequest { job_id };
1377 let resp = self.inner.get_backup_job_status(req).await?;
1378 Ok((resp.job_status(), resp.message))
1379 }
1380
1381 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1382 let req = DeleteMetaSnapshotRequest {
1383 snapshot_ids: snapshot_ids.to_vec(),
1384 };
1385 let _resp = self.inner.delete_meta_snapshot(req).await?;
1386 Ok(())
1387 }
1388
1389 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1390 let req = GetMetaSnapshotManifestRequest {};
1391 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1392 Ok(resp.manifest.expect("should exist"))
1393 }
1394
1395 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1396 let req = GetTelemetryInfoRequest {};
1397 let resp = self.inner.get_telemetry_info(req).await?;
1398 Ok(resp)
1399 }
1400
1401 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1402 let req = GetMetaStoreInfoRequest {};
1403 let resp = self.inner.get_meta_store_info(req).await?;
1404 Ok(resp.meta_store_endpoint)
1405 }
1406
1407 pub async fn alter_sink_props(
1408 &self,
1409 sink_id: u32,
1410 changed_props: BTreeMap<String, String>,
1411 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1412 connector_conn_ref: Option<u32>,
1413 ) -> Result<()> {
1414 let req = AlterConnectorPropsRequest {
1415 object_id: sink_id,
1416 changed_props: changed_props.into_iter().collect(),
1417 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1418 connector_conn_ref,
1419 object_type: AlterConnectorPropsObject::Sink as i32,
1420 extra_options: None,
1421 };
1422 let _resp = self.inner.alter_connector_props(req).await?;
1423 Ok(())
1424 }
1425
1426 pub async fn alter_iceberg_table_props(
1427 &self,
1428 table_id: u32,
1429 sink_id: u32,
1430 source_id: u32,
1431 changed_props: BTreeMap<String, String>,
1432 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1433 connector_conn_ref: Option<u32>,
1434 ) -> Result<()> {
1435 let req = AlterConnectorPropsRequest {
1436 object_id: table_id,
1437 changed_props: changed_props.into_iter().collect(),
1438 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1439 connector_conn_ref,
1440 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1441 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1442 sink_id: sink_id as i32,
1443 source_id: source_id as i32,
1444 })),
1445 };
1446 let _resp = self.inner.alter_connector_props(req).await?;
1447 Ok(())
1448 }
1449
1450 pub async fn alter_source_connector_props(
1451 &self,
1452 source_id: u32,
1453 changed_props: BTreeMap<String, String>,
1454 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1455 connector_conn_ref: Option<u32>,
1456 ) -> Result<()> {
1457 let req = AlterConnectorPropsRequest {
1458 object_id: source_id,
1459 changed_props: changed_props.into_iter().collect(),
1460 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1461 connector_conn_ref,
1462 object_type: AlterConnectorPropsObject::Source as i32,
1463 extra_options: None,
1464 };
1465 let _resp = self.inner.alter_connector_props(req).await?;
1466 Ok(())
1467 }
1468
1469 pub async fn set_system_param(
1470 &self,
1471 param: String,
1472 value: Option<String>,
1473 ) -> Result<Option<SystemParamsReader>> {
1474 let req = SetSystemParamRequest { param, value };
1475 let resp = self.inner.set_system_param(req).await?;
1476 Ok(resp.params.map(SystemParamsReader::from))
1477 }
1478
1479 pub async fn get_session_params(&self) -> Result<String> {
1480 let req = GetSessionParamsRequest {};
1481 let resp = self.inner.get_session_params(req).await?;
1482 Ok(resp.params)
1483 }
1484
1485 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1486 let req = SetSessionParamRequest { param, value };
1487 let resp = self.inner.set_session_param(req).await?;
1488 Ok(resp.param)
1489 }
1490
1491 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1492 let req = GetDdlProgressRequest {};
1493 let resp = self.inner.get_ddl_progress(req).await?;
1494 Ok(resp.ddl_progress)
1495 }
1496
1497 pub async fn split_compaction_group(
1498 &self,
1499 group_id: CompactionGroupId,
1500 table_ids_to_new_group: &[StateTableId],
1501 partition_vnode_count: u32,
1502 ) -> Result<CompactionGroupId> {
1503 let req = SplitCompactionGroupRequest {
1504 group_id,
1505 table_ids: table_ids_to_new_group.to_vec(),
1506 partition_vnode_count,
1507 };
1508 let resp = self.inner.split_compaction_group(req).await?;
1509 Ok(resp.new_group_id)
1510 }
1511
1512 pub async fn get_tables(
1513 &self,
1514 table_ids: &[u32],
1515 include_dropped_tables: bool,
1516 ) -> Result<HashMap<u32, Table>> {
1517 let req = GetTablesRequest {
1518 table_ids: table_ids.to_vec(),
1519 include_dropped_tables,
1520 };
1521 let resp = self.inner.get_tables(req).await?;
1522 Ok(resp.tables)
1523 }
1524
1525 pub async fn list_serving_vnode_mappings(
1526 &self,
1527 ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1528 let req = GetServingVnodeMappingsRequest {};
1529 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1530 let mappings = resp
1531 .worker_slot_mappings
1532 .into_iter()
1533 .map(|p| {
1534 (
1535 p.fragment_id,
1536 (
1537 resp.fragment_to_table
1538 .get(&p.fragment_id)
1539 .cloned()
1540 .unwrap_or(0),
1541 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1542 ),
1543 )
1544 })
1545 .collect();
1546 Ok(mappings)
1547 }
1548
1549 pub async fn risectl_list_compaction_status(
1550 &self,
1551 ) -> Result<(
1552 Vec<CompactStatus>,
1553 Vec<CompactTaskAssignment>,
1554 Vec<CompactTaskProgress>,
1555 )> {
1556 let req = RiseCtlListCompactionStatusRequest {};
1557 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1558 Ok((
1559 resp.compaction_statuses,
1560 resp.task_assignment,
1561 resp.task_progress,
1562 ))
1563 }
1564
1565 pub async fn get_compaction_score(
1566 &self,
1567 compaction_group_id: CompactionGroupId,
1568 ) -> Result<Vec<PickerInfo>> {
1569 let req = GetCompactionScoreRequest {
1570 compaction_group_id,
1571 };
1572 let resp = self.inner.get_compaction_score(req).await?;
1573 Ok(resp.scores)
1574 }
1575
1576 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1577 let req = RiseCtlRebuildTableStatsRequest {};
1578 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1579 Ok(())
1580 }
1581
1582 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1583 let req = ListBranchedObjectRequest {};
1584 let resp = self.inner.list_branched_object(req).await?;
1585 Ok(resp.branched_objects)
1586 }
1587
1588 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1589 let req = ListActiveWriteLimitRequest {};
1590 let resp = self.inner.list_active_write_limit(req).await?;
1591 Ok(resp.write_limits)
1592 }
1593
1594 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1595 let req = ListHummockMetaConfigRequest {};
1596 let resp = self.inner.list_hummock_meta_config(req).await?;
1597 Ok(resp.configs)
1598 }
1599
1600 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1601 let _resp = self
1602 .inner
1603 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1604 .await?;
1605
1606 Ok(())
1607 }
1608
1609 pub async fn rw_cloud_validate_source(
1610 &self,
1611 source_type: SourceType,
1612 source_config: HashMap<String, String>,
1613 ) -> Result<RwCloudValidateSourceResponse> {
1614 let req = RwCloudValidateSourceRequest {
1615 source_type: source_type.into(),
1616 source_config,
1617 };
1618 let resp = self.inner.rw_cloud_validate_source(req).await?;
1619 Ok(resp)
1620 }
1621
1622 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1623 self.inner.core.read().await.sink_coordinate_client.clone()
1624 }
1625
1626 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1627 let req = ListCompactTaskAssignmentRequest {};
1628 let resp = self.inner.list_compact_task_assignment(req).await?;
1629 Ok(resp.task_assignment)
1630 }
1631
1632 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1633 let req = ListEventLogRequest::default();
1634 let resp = self.inner.list_event_log(req).await?;
1635 Ok(resp.event_logs)
1636 }
1637
1638 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1639 let req = ListCompactTaskProgressRequest {};
1640 let resp = self.inner.list_compact_task_progress(req).await?;
1641 Ok(resp.task_progress)
1642 }
1643
1644 #[cfg(madsim)]
1645 pub fn try_add_panic_event_blocking(
1646 &self,
1647 panic_info: impl Display,
1648 timeout_millis: Option<u64>,
1649 ) {
1650 }
1651
1652 #[cfg(not(madsim))]
1654 pub fn try_add_panic_event_blocking(
1655 &self,
1656 panic_info: impl Display,
1657 timeout_millis: Option<u64>,
1658 ) {
1659 let event = event_log::EventWorkerNodePanic {
1660 worker_id: self.worker_id,
1661 worker_type: self.worker_type.into(),
1662 host_addr: Some(self.host_addr.to_protobuf()),
1663 panic_info: format!("{panic_info}"),
1664 };
1665 let grpc_meta_client = self.inner.clone();
1666 let _ = thread::spawn(move || {
1667 let rt = tokio::runtime::Builder::new_current_thread()
1668 .enable_all()
1669 .build()
1670 .unwrap();
1671 let req = AddEventLogRequest {
1672 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1673 };
1674 rt.block_on(async {
1675 let _ = tokio::time::timeout(
1676 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1677 grpc_meta_client.add_event_log(req),
1678 )
1679 .await;
1680 });
1681 })
1682 .join();
1683 }
1684
1685 pub async fn add_sink_fail_evet(
1686 &self,
1687 sink_id: u32,
1688 sink_name: String,
1689 connector: String,
1690 error: String,
1691 ) -> Result<()> {
1692 let event = event_log::EventSinkFail {
1693 sink_id,
1694 sink_name,
1695 connector,
1696 error,
1697 };
1698 let req = AddEventLogRequest {
1699 event: Some(add_event_log_request::Event::SinkFail(event)),
1700 };
1701 self.inner.add_event_log(req).await?;
1702 Ok(())
1703 }
1704
1705 pub async fn add_cdc_auto_schema_change_fail_event(
1706 &self,
1707 table_id: u32,
1708 table_name: String,
1709 cdc_table_id: String,
1710 upstream_ddl: String,
1711 fail_info: String,
1712 ) -> Result<()> {
1713 let event = event_log::EventAutoSchemaChangeFail {
1714 table_id,
1715 table_name,
1716 cdc_table_id,
1717 upstream_ddl,
1718 fail_info,
1719 };
1720 let req = AddEventLogRequest {
1721 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1722 };
1723 self.inner.add_event_log(req).await?;
1724 Ok(())
1725 }
1726
1727 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1728 let req = CancelCompactTaskRequest {
1729 task_id,
1730 task_status: task_status as _,
1731 };
1732 let resp = self.inner.cancel_compact_task(req).await?;
1733 Ok(resp.ret)
1734 }
1735
1736 pub async fn get_version_by_epoch(
1737 &self,
1738 epoch: HummockEpoch,
1739 table_id: u32,
1740 ) -> Result<PbHummockVersion> {
1741 let req = GetVersionByEpochRequest { epoch, table_id };
1742 let resp = self.inner.get_version_by_epoch(req).await?;
1743 Ok(resp.version.unwrap())
1744 }
1745
1746 pub async fn get_cluster_limits(
1747 &self,
1748 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1749 let req = GetClusterLimitsRequest {};
1750 let resp = self.inner.get_cluster_limits(req).await?;
1751 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1752 }
1753
1754 pub async fn merge_compaction_group(
1755 &self,
1756 left_group_id: CompactionGroupId,
1757 right_group_id: CompactionGroupId,
1758 ) -> Result<()> {
1759 let req = MergeCompactionGroupRequest {
1760 left_group_id,
1761 right_group_id,
1762 };
1763 self.inner.merge_compaction_group(req).await?;
1764 Ok(())
1765 }
1766
1767 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1769 let request = ListRateLimitsRequest {};
1770 let resp = self.inner.list_rate_limits(request).await?;
1771 Ok(resp.rate_limits)
1772 }
1773
1774 pub async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>> {
1775 let request = ListCdcProgressRequest {};
1776 let resp = self.inner.list_cdc_progress(request).await?;
1777 Ok(resp.cdc_progress)
1778 }
1779
1780 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1781 let request = ListIcebergTablesRequest {};
1782 let resp = self.inner.list_iceberg_tables(request).await?;
1783 Ok(resp.iceberg_tables)
1784 }
1785
1786 pub async fn get_cluster_stack_trace(
1788 &self,
1789 actor_traces_format: ActorTracesFormat,
1790 ) -> Result<StackTraceResponse> {
1791 let request = StackTraceRequest {
1792 actor_traces_format: actor_traces_format as i32,
1793 };
1794 let resp = self.inner.stack_trace(request).await?;
1795 Ok(resp)
1796 }
1797
1798 pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1799 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1800 self.inner.set_sync_log_store_aligned(request).await?;
1801 Ok(())
1802 }
1803
1804 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1805 self.inner.refresh(request).await
1806 }
1807}
1808
1809#[async_trait]
1810impl HummockMetaClient for MetaClient {
1811 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1812 let req = UnpinVersionBeforeRequest {
1813 context_id: self.worker_id(),
1814 unpin_version_before: unpin_version_before.to_u64(),
1815 };
1816 self.inner.unpin_version_before(req).await?;
1817 Ok(())
1818 }
1819
1820 async fn get_current_version(&self) -> Result<HummockVersion> {
1821 let req = GetCurrentVersionRequest::default();
1822 Ok(HummockVersion::from_rpc_protobuf(
1823 &self
1824 .inner
1825 .get_current_version(req)
1826 .await?
1827 .current_version
1828 .unwrap(),
1829 ))
1830 }
1831
1832 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1833 let resp = self
1834 .inner
1835 .get_new_object_ids(GetNewObjectIdsRequest { number })
1836 .await?;
1837 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1838 }
1839
1840 async fn commit_epoch_with_change_log(
1841 &self,
1842 _epoch: HummockEpoch,
1843 _sync_result: SyncResult,
1844 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1845 ) -> Result<()> {
1846 panic!("Only meta service can commit_epoch in production.")
1847 }
1848
1849 async fn trigger_manual_compaction(
1850 &self,
1851 compaction_group_id: u64,
1852 table_id: u32,
1853 level: u32,
1854 sst_ids: Vec<u64>,
1855 ) -> Result<()> {
1856 let req = TriggerManualCompactionRequest {
1858 compaction_group_id,
1859 table_id,
1860 level,
1863 sst_ids,
1864 ..Default::default()
1865 };
1866
1867 self.inner.trigger_manual_compaction(req).await?;
1868 Ok(())
1869 }
1870
1871 async fn trigger_full_gc(
1872 &self,
1873 sst_retention_time_sec: u64,
1874 prefix: Option<String>,
1875 ) -> Result<()> {
1876 self.inner
1877 .trigger_full_gc(TriggerFullGcRequest {
1878 sst_retention_time_sec,
1879 prefix,
1880 })
1881 .await?;
1882 Ok(())
1883 }
1884
1885 async fn subscribe_compaction_event(
1886 &self,
1887 ) -> Result<(
1888 UnboundedSender<SubscribeCompactionEventRequest>,
1889 BoxStream<'static, CompactionEventItem>,
1890 )> {
1891 let (request_sender, request_receiver) =
1892 unbounded_channel::<SubscribeCompactionEventRequest>();
1893 request_sender
1894 .send(SubscribeCompactionEventRequest {
1895 event: Some(subscribe_compaction_event_request::Event::Register(
1896 Register {
1897 context_id: self.worker_id(),
1898 },
1899 )),
1900 create_at: SystemTime::now()
1901 .duration_since(SystemTime::UNIX_EPOCH)
1902 .expect("Clock may have gone backwards")
1903 .as_millis() as u64,
1904 })
1905 .context("Failed to subscribe compaction event")?;
1906
1907 let stream = self
1908 .inner
1909 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1910 request_receiver,
1911 )))
1912 .await?;
1913
1914 Ok((request_sender, Box::pin(stream)))
1915 }
1916
1917 async fn get_version_by_epoch(
1918 &self,
1919 epoch: HummockEpoch,
1920 table_id: u32,
1921 ) -> Result<PbHummockVersion> {
1922 self.get_version_by_epoch(epoch, table_id).await
1923 }
1924
1925 async fn subscribe_iceberg_compaction_event(
1926 &self,
1927 ) -> Result<(
1928 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1929 BoxStream<'static, IcebergCompactionEventItem>,
1930 )> {
1931 let (request_sender, request_receiver) =
1932 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1933 request_sender
1934 .send(SubscribeIcebergCompactionEventRequest {
1935 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1936 IcebergRegister {
1937 context_id: self.worker_id(),
1938 },
1939 )),
1940 create_at: SystemTime::now()
1941 .duration_since(SystemTime::UNIX_EPOCH)
1942 .expect("Clock may have gone backwards")
1943 .as_millis() as u64,
1944 })
1945 .context("Failed to subscribe compaction event")?;
1946
1947 let stream = self
1948 .inner
1949 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1950 request_receiver,
1951 )))
1952 .await?;
1953
1954 Ok((request_sender, Box::pin(stream)))
1955 }
1956}
1957
1958#[async_trait]
1959impl TelemetryInfoFetcher for MetaClient {
1960 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1961 let resp = self
1962 .get_telemetry_info()
1963 .await
1964 .map_err(|e| e.to_report_string())?;
1965 let tracking_id = resp.get_tracking_id().ok();
1966 Ok(tracking_id.map(|id| id.to_owned()))
1967 }
1968}
1969
1970pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1971
1972#[derive(Debug, Clone)]
1973struct GrpcMetaClientCore {
1974 cluster_client: ClusterServiceClient<Channel>,
1975 meta_member_client: MetaMemberServiceClient<Channel>,
1976 heartbeat_client: HeartbeatServiceClient<Channel>,
1977 ddl_client: DdlServiceClient<Channel>,
1978 hummock_client: HummockManagerServiceClient<Channel>,
1979 notification_client: NotificationServiceClient<Channel>,
1980 stream_client: StreamManagerServiceClient<Channel>,
1981 user_client: UserServiceClient<Channel>,
1982 scale_client: ScaleServiceClient<Channel>,
1983 backup_client: BackupServiceClient<Channel>,
1984 telemetry_client: TelemetryInfoServiceClient<Channel>,
1985 system_params_client: SystemParamsServiceClient<Channel>,
1986 session_params_client: SessionParamServiceClient<Channel>,
1987 serving_client: ServingServiceClient<Channel>,
1988 cloud_client: CloudServiceClient<Channel>,
1989 sink_coordinate_client: SinkCoordinationRpcClient,
1990 event_log_client: EventLogServiceClient<Channel>,
1991 cluster_limit_client: ClusterLimitServiceClient<Channel>,
1992 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1993 monitor_client: MonitorServiceClient<Channel>,
1994}
1995
1996impl GrpcMetaClientCore {
1997 pub(crate) fn new(channel: Channel) -> Self {
1998 let cluster_client = ClusterServiceClient::new(channel.clone());
1999 let meta_member_client = MetaMemberClient::new(channel.clone());
2000 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2001 let ddl_client =
2002 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2003 let hummock_client =
2004 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2005 let notification_client =
2006 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2007 let stream_client =
2008 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2009 let user_client = UserServiceClient::new(channel.clone());
2010 let scale_client =
2011 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2012 let backup_client = BackupServiceClient::new(channel.clone());
2013 let telemetry_client =
2014 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2015 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2016 let session_params_client = SessionParamServiceClient::new(channel.clone());
2017 let serving_client = ServingServiceClient::new(channel.clone());
2018 let cloud_client = CloudServiceClient::new(channel.clone());
2019 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
2020 let event_log_client = EventLogServiceClient::new(channel.clone());
2021 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2022 let hosted_iceberg_catalog_service_client =
2023 HostedIcebergCatalogServiceClient::new(channel.clone());
2024 let monitor_client = MonitorServiceClient::new(channel);
2025
2026 GrpcMetaClientCore {
2027 cluster_client,
2028 meta_member_client,
2029 heartbeat_client,
2030 ddl_client,
2031 hummock_client,
2032 notification_client,
2033 stream_client,
2034 user_client,
2035 scale_client,
2036 backup_client,
2037 telemetry_client,
2038 system_params_client,
2039 session_params_client,
2040 serving_client,
2041 cloud_client,
2042 sink_coordinate_client,
2043 event_log_client,
2044 cluster_limit_client,
2045 hosted_iceberg_catalog_service_client,
2046 monitor_client,
2047 }
2048 }
2049}
2050
2051#[derive(Debug, Clone)]
2055struct GrpcMetaClient {
2056 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2057 core: Arc<RwLock<GrpcMetaClientCore>>,
2058}
2059
2060type MetaMemberClient = MetaMemberServiceClient<Channel>;
2061
2062struct MetaMemberGroup {
2063 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2064}
2065
2066struct MetaMemberManagement {
2067 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2068 members: Either<MetaMemberClient, MetaMemberGroup>,
2069 current_leader: http::Uri,
2070 meta_config: Arc<MetaConfig>,
2071}
2072
2073impl MetaMemberManagement {
2074 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2075
2076 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2077 format!("http://{}:{}", addr.host, addr.port)
2078 .parse()
2079 .unwrap()
2080 }
2081
2082 async fn recreate_core(&self, channel: Channel) {
2083 let mut core = self.core_ref.write().await;
2084 *core = GrpcMetaClientCore::new(channel);
2085 }
2086
2087 async fn refresh_members(&mut self) -> Result<()> {
2088 let leader_addr = match self.members.as_mut() {
2089 Either::Left(client) => {
2090 let resp = client
2091 .to_owned()
2092 .members(MembersRequest {})
2093 .await
2094 .map_err(RpcError::from_meta_status)?;
2095 let resp = resp.into_inner();
2096 resp.members.into_iter().find(|member| member.is_leader)
2097 }
2098 Either::Right(member_group) => {
2099 let mut fetched_members = None;
2100
2101 for (addr, client) in &mut member_group.members {
2102 let members: Result<_> = try {
2103 let mut client = match client {
2104 Some(cached_client) => cached_client.to_owned(),
2105 None => {
2106 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2107 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2108 .await
2109 .context("failed to create client")?;
2110 let new_client: MetaMemberClient =
2111 MetaMemberServiceClient::new(channel);
2112 *client = Some(new_client.clone());
2113
2114 new_client
2115 }
2116 };
2117
2118 let resp = client
2119 .members(MembersRequest {})
2120 .await
2121 .context("failed to fetch members")?;
2122
2123 resp.into_inner().members
2124 };
2125
2126 let fetched = members.is_ok();
2127 fetched_members = Some(members);
2128 if fetched {
2129 break;
2130 }
2131 }
2132
2133 let members = fetched_members
2134 .context("no member available in the list")?
2135 .context("could not refresh members")?;
2136
2137 let mut leader = None;
2139 for member in members {
2140 if member.is_leader {
2141 leader = Some(member.clone());
2142 }
2143
2144 let addr = Self::host_address_to_uri(member.address.unwrap());
2145 if !member_group.members.contains(&addr) {
2147 tracing::info!("new meta member joined: {}", addr);
2148 member_group.members.put(addr, None);
2149 }
2150 }
2151
2152 leader
2153 }
2154 };
2155
2156 if let Some(leader) = leader_addr {
2157 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2158
2159 if discovered_leader != self.current_leader {
2160 tracing::info!("new meta leader {} discovered", discovered_leader);
2161
2162 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2163 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2164 false,
2165 );
2166
2167 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2168 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2169 GrpcMetaClient::connect_to_endpoint(endpoint).await
2170 })
2171 .await?;
2172
2173 self.recreate_core(channel).await;
2174 self.current_leader = discovered_leader;
2175 }
2176 }
2177
2178 Ok(())
2179 }
2180}
2181
2182impl GrpcMetaClient {
2183 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2185 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2187 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2189 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2191
2192 fn start_meta_member_monitor(
2193 &self,
2194 init_leader_addr: http::Uri,
2195 members: Either<MetaMemberClient, MetaMemberGroup>,
2196 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2197 meta_config: Arc<MetaConfig>,
2198 ) -> Result<()> {
2199 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2200 let current_leader = init_leader_addr;
2201
2202 let enable_period_tick = matches!(members, Either::Right(_));
2203
2204 let member_management = MetaMemberManagement {
2205 core_ref,
2206 members,
2207 current_leader,
2208 meta_config,
2209 };
2210
2211 let mut force_refresh_receiver = force_refresh_receiver;
2212
2213 tokio::spawn(async move {
2214 let mut member_management = member_management;
2215 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2216
2217 loop {
2218 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2219 tokio::select! {
2220 _ = ticker.tick() => None,
2221 result_sender = force_refresh_receiver.recv() => {
2222 if result_sender.is_none() {
2223 break;
2224 }
2225
2226 result_sender
2227 },
2228 }
2229 } else {
2230 let result_sender = force_refresh_receiver.recv().await;
2231
2232 if result_sender.is_none() {
2233 break;
2234 }
2235
2236 result_sender
2237 };
2238
2239 let tick_result = member_management.refresh_members().await;
2240 if let Err(e) = tick_result.as_ref() {
2241 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2242 }
2243
2244 if let Some(sender) = event {
2245 let _resp = sender.send(tick_result);
2247 }
2248 }
2249 });
2250
2251 Ok(())
2252 }
2253
2254 async fn force_refresh_leader(&self) -> Result<()> {
2255 let (sender, receiver) = oneshot::channel();
2256
2257 self.member_monitor_event_sender
2258 .send(sender)
2259 .await
2260 .map_err(|e| anyhow!(e))?;
2261
2262 receiver.await.map_err(|e| anyhow!(e))?
2263 }
2264
2265 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2267 let (channel, addr) = match strategy {
2268 MetaAddressStrategy::LoadBalance(addr) => {
2269 Self::try_build_rpc_channel(vec![addr.clone()]).await
2270 }
2271 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2272 }?;
2273 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2274 let client = GrpcMetaClient {
2275 member_monitor_event_sender: force_refresh_sender,
2276 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2277 };
2278
2279 let meta_member_client = client.core.read().await.meta_member_client.clone();
2280 let members = match strategy {
2281 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2282 MetaAddressStrategy::List(addrs) => {
2283 let mut members = LruCache::new(20);
2284 for addr in addrs {
2285 members.put(addr.clone(), None);
2286 }
2287 members.put(addr.clone(), Some(meta_member_client));
2288
2289 Either::Right(MetaMemberGroup { members })
2290 }
2291 };
2292
2293 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2294
2295 client.force_refresh_leader().await?;
2296
2297 Ok(client)
2298 }
2299
2300 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2301 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2302 }
2303
2304 pub(crate) async fn try_build_rpc_channel(
2305 addrs: impl IntoIterator<Item = http::Uri>,
2306 ) -> Result<(Channel, http::Uri)> {
2307 let endpoints: Vec<_> = addrs
2308 .into_iter()
2309 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2310 .collect();
2311
2312 let mut last_error = None;
2313
2314 for (endpoint, addr) in endpoints {
2315 match Self::connect_to_endpoint(endpoint).await {
2316 Ok(channel) => {
2317 tracing::info!("Connect to meta server {} successfully", addr);
2318 return Ok((channel, addr));
2319 }
2320 Err(e) => {
2321 tracing::warn!(
2322 error = %e.as_report(),
2323 "Failed to connect to meta server {}, trying again",
2324 addr,
2325 );
2326 last_error = Some(e);
2327 }
2328 }
2329 }
2330
2331 if let Some(last_error) = last_error {
2332 Err(anyhow::anyhow!(last_error)
2333 .context("failed to connect to all meta servers")
2334 .into())
2335 } else {
2336 bail!("no meta server address provided")
2337 }
2338 }
2339
2340 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2341 let channel = endpoint
2342 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2343 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2344 .connect_timeout(Duration::from_secs(5))
2345 .monitored_connect("grpc-meta-client", Default::default())
2346 .await?
2347 .wrapped();
2348
2349 Ok(channel)
2350 }
2351
2352 pub(crate) fn retry_strategy_to_bound(
2353 high_bound: Duration,
2354 exceed: bool,
2355 ) -> impl Iterator<Item = Duration> {
2356 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2357 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2358 .map(jitter);
2359
2360 let mut sum = Duration::default();
2361
2362 iter.take_while(move |duration| {
2363 sum += *duration;
2364
2365 if exceed {
2366 sum < high_bound + *duration
2367 } else {
2368 sum < high_bound
2369 }
2370 })
2371 }
2372}
2373
2374macro_rules! for_all_meta_rpc {
2375 ($macro:ident) => {
2376 $macro! {
2377 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2378 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2379 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2380 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2381 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2382 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2383 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2384 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2385 ,{ stream_client, flush, FlushRequest, FlushResponse }
2386 ,{ stream_client, pause, PauseRequest, PauseResponse }
2387 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2388 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2389 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2390 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2391 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2392 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2393 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2394 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2395 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2396 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2397 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2398 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2399 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2400 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2401 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2402 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2403 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2404 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2405 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2406 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2407 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2408 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2409 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2410 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2411 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2412 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2413 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2414 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2415 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2416 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2417 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2418 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2419 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2420 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2421 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2422 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2423 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2424 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2425 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2426 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2427 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2428 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2429 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2430 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2431 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2432 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2433 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2434 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2435 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2436 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2437 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2438 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2439 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2440 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2441 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2442 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2443 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2444 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2445 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2446 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2447 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2448 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2449 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2450 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2451 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2452 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2453 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2454 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2455 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2456 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2457 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2458 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2459 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2460 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2461 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2462 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2463 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2464 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2465 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2466 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2467 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2468 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2469 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2470 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2471 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2472 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2473 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2474 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2475 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2476 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2477 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2478 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2479 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2480 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2481 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2482 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2483 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2484 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2485 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2486 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2487 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2488 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2489 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2490 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2491 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2492 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2493 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2494 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2495 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2496 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2497 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2498 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2499 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2500 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2501 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2502 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2503 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2504 }
2505 };
2506}
2507
2508impl GrpcMetaClient {
2509 async fn refresh_client_if_needed(&self, code: Code) {
2510 if matches!(
2511 code,
2512 Code::Unknown | Code::Unimplemented | Code::Unavailable
2513 ) {
2514 tracing::debug!("matching tonic code {}", code);
2515 let (result_sender, result_receiver) = oneshot::channel();
2516 if self
2517 .member_monitor_event_sender
2518 .try_send(result_sender)
2519 .is_ok()
2520 {
2521 if let Ok(Err(e)) = result_receiver.await {
2522 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2523 }
2524 } else {
2525 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2526 }
2527 }
2528 }
2529}
2530
2531impl GrpcMetaClient {
2532 for_all_meta_rpc! { meta_rpc_client_method_impl }
2533}