risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55    PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77    subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
80use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
81use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
82use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
83use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
84use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
85use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
86use risingwave_pb::meta::list_actor_states_response::ActorState;
87use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
88use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
89use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
90use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
91use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
92use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
93use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
94use risingwave_pb::meta::serving_service_client::ServingServiceClient;
95use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
96use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
97use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
98use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
99use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
100use risingwave_pb::meta::{FragmentDistribution, *};
101use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
102use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
103use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
104use risingwave_pb::secret::PbSecretRef;
105use risingwave_pb::stream_plan::StreamFragmentGraph;
106use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
107use risingwave_pb::user::update_user_request::UpdateField;
108use risingwave_pb::user::user_service_client::UserServiceClient;
109use risingwave_pb::user::*;
110use thiserror_ext::AsReport;
111use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
112use tokio::sync::oneshot::Sender;
113use tokio::sync::{RwLock, mpsc, oneshot};
114use tokio::task::JoinHandle;
115use tokio::time::{self};
116use tokio_retry::strategy::{ExponentialBackoff, jitter};
117use tokio_stream::wrappers::UnboundedReceiverStream;
118use tonic::transport::Endpoint;
119use tonic::{Code, Request, Streaming};
120
121use crate::channel::{Channel, WrappedChannelExt};
122use crate::error::{Result, RpcError};
123use crate::hummock_meta_client::{
124    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
125    IcebergCompactionEventItem,
126};
127use crate::meta_rpc_client_method_impl;
128
129type ConnectionId = u32;
130type DatabaseId = u32;
131type SchemaId = u32;
132
133/// Client to meta server. Cloning the instance is lightweight.
134#[derive(Clone, Debug)]
135pub struct MetaClient {
136    worker_id: u32,
137    worker_type: WorkerType,
138    host_addr: HostAddr,
139    inner: GrpcMetaClient,
140    meta_config: MetaConfig,
141    cluster_id: String,
142    shutting_down: Arc<AtomicBool>,
143}
144
145impl MetaClient {
146    pub fn worker_id(&self) -> u32 {
147        self.worker_id
148    }
149
150    pub fn host_addr(&self) -> &HostAddr {
151        &self.host_addr
152    }
153
154    pub fn worker_type(&self) -> WorkerType {
155        self.worker_type
156    }
157
158    pub fn cluster_id(&self) -> &str {
159        &self.cluster_id
160    }
161
162    /// Subscribe to notification from meta.
163    pub async fn subscribe(
164        &self,
165        subscribe_type: SubscribeType,
166    ) -> Result<Streaming<SubscribeResponse>> {
167        let request = SubscribeRequest {
168            subscribe_type: subscribe_type as i32,
169            host: Some(self.host_addr.to_protobuf()),
170            worker_id: self.worker_id(),
171        };
172
173        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
174            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
175            true,
176        );
177
178        tokio_retry::Retry::spawn(retry_strategy, || async {
179            let request = request.clone();
180            self.inner.subscribe(request).await
181        })
182        .await
183    }
184
185    pub async fn create_connection(
186        &self,
187        connection_name: String,
188        database_id: u32,
189        schema_id: u32,
190        owner_id: u32,
191        req: create_connection_request::Payload,
192    ) -> Result<WaitVersion> {
193        let request = CreateConnectionRequest {
194            name: connection_name,
195            database_id,
196            schema_id,
197            owner_id,
198            payload: Some(req),
199        };
200        let resp = self.inner.create_connection(request).await?;
201        Ok(resp
202            .version
203            .ok_or_else(|| anyhow!("wait version not set"))?)
204    }
205
206    pub async fn create_secret(
207        &self,
208        secret_name: String,
209        database_id: u32,
210        schema_id: u32,
211        owner_id: u32,
212        value: Vec<u8>,
213    ) -> Result<WaitVersion> {
214        let request = CreateSecretRequest {
215            name: secret_name,
216            database_id,
217            schema_id,
218            owner_id,
219            value,
220        };
221        let resp = self.inner.create_secret(request).await?;
222        Ok(resp
223            .version
224            .ok_or_else(|| anyhow!("wait version not set"))?)
225    }
226
227    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
228        let request = ListConnectionsRequest {};
229        let resp = self.inner.list_connections(request).await?;
230        Ok(resp.connections)
231    }
232
233    pub async fn drop_connection(
234        &self,
235        connection_id: ConnectionId,
236        cascade: bool,
237    ) -> Result<WaitVersion> {
238        let request = DropConnectionRequest {
239            connection_id,
240            cascade,
241        };
242        let resp = self.inner.drop_connection(request).await?;
243        Ok(resp
244            .version
245            .ok_or_else(|| anyhow!("wait version not set"))?)
246    }
247
248    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
249        let request = DropSecretRequest {
250            secret_id: secret_id.into(),
251        };
252        let resp = self.inner.drop_secret(request).await?;
253        Ok(resp
254            .version
255            .ok_or_else(|| anyhow!("wait version not set"))?)
256    }
257
258    /// Register the current node to the cluster and set the corresponding worker id.
259    ///
260    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
261    pub async fn register_new(
262        addr_strategy: MetaAddressStrategy,
263        worker_type: WorkerType,
264        addr: &HostAddr,
265        property: Property,
266        meta_config: &MetaConfig,
267    ) -> (Self, SystemParamsReader) {
268        let ret =
269            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
270
271        match ret {
272            Ok(ret) => ret,
273            Err(err) => {
274                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
275                std::process::exit(1);
276            }
277        }
278    }
279
280    async fn register_new_inner(
281        addr_strategy: MetaAddressStrategy,
282        worker_type: WorkerType,
283        addr: &HostAddr,
284        property: Property,
285        meta_config: &MetaConfig,
286    ) -> Result<(Self, SystemParamsReader)> {
287        tracing::info!("register meta client using strategy: {}", addr_strategy);
288
289        // Retry until reaching `max_heartbeat_interval_secs`
290        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
291            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
292            true,
293        );
294
295        if property.is_unschedulable {
296            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
297        }
298        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299            retry_strategy,
300            || async {
301                let grpc_meta_client =
302                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304                let add_worker_resp = grpc_meta_client
305                    .add_worker_node(AddWorkerNodeRequest {
306                        worker_type: worker_type as i32,
307                        host: Some(addr.to_protobuf()),
308                        property: Some(property.clone()),
309                        resource: Some(risingwave_pb::common::worker_node::Resource {
310                            rw_version: RW_VERSION.to_owned(),
311                            total_memory_bytes: system_memory_available_bytes() as _,
312                            total_cpu_cores: total_cpu_available() as _,
313                        }),
314                    })
315                    .await
316                    .context("failed to add worker node")?;
317
318                let system_params_resp = grpc_meta_client
319                    .get_system_params(GetSystemParamsRequest {})
320                    .await
321                    .context("failed to get initial system params")?;
322
323                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
324            },
325            // Only retry if there's any transient connection issue.
326            // If the error is from our implementation or business, do not retry it.
327            |e: &RpcError| !e.is_from_tonic_server_impl(),
328        )
329        .await;
330
331        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
332        let worker_id = add_worker_resp
333            .node_id
334            .expect("AddWorkerNodeResponse::node_id is empty");
335
336        let meta_client = Self {
337            worker_id,
338            worker_type,
339            host_addr: addr.clone(),
340            inner: grpc_meta_client,
341            meta_config: meta_config.to_owned(),
342            cluster_id: add_worker_resp.cluster_id,
343            shutting_down: Arc::new(false.into()),
344        };
345
346        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
347        REPORT_PANIC.call_once(|| {
348            let meta_client_clone = meta_client.clone();
349            std::panic::update_hook(move |default_hook, info| {
350                // Try to report panic event to meta node.
351                meta_client_clone.try_add_panic_event_blocking(info, None);
352                default_hook(info);
353            });
354        });
355
356        Ok((meta_client, system_params_resp.params.unwrap().into()))
357    }
358
359    /// Activate the current node in cluster to confirm it's ready to serve.
360    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
361        let request = ActivateWorkerNodeRequest {
362            host: Some(addr.to_protobuf()),
363            node_id: self.worker_id,
364        };
365        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
366            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
367            true,
368        );
369        tokio_retry::Retry::spawn(retry_strategy, || async {
370            let request = request.clone();
371            self.inner.activate_worker_node(request).await
372        })
373        .await?;
374
375        Ok(())
376    }
377
378    /// Send heartbeat signal to meta service.
379    pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
380        let request = HeartbeatRequest { node_id };
381        let resp = self.inner.heartbeat(request).await?;
382        if let Some(status) = resp.status
383            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
384        {
385            // Ignore the error if we're already shutting down.
386            // Otherwise, exit the process.
387            if !self.shutting_down.load(Relaxed) {
388                tracing::error!(message = status.message, "worker expired");
389                std::process::exit(1);
390            }
391        }
392        Ok(())
393    }
394
395    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
396        let request = CreateDatabaseRequest { db: Some(db) };
397        let resp = self.inner.create_database(request).await?;
398        // TODO: handle error in `resp.status` here
399        Ok(resp
400            .version
401            .ok_or_else(|| anyhow!("wait version not set"))?)
402    }
403
404    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
405        let request = CreateSchemaRequest {
406            schema: Some(schema),
407        };
408        let resp = self.inner.create_schema(request).await?;
409        // TODO: handle error in `resp.status` here
410        Ok(resp
411            .version
412            .ok_or_else(|| anyhow!("wait version not set"))?)
413    }
414
415    pub async fn create_materialized_view(
416        &self,
417        table: PbTable,
418        graph: StreamFragmentGraph,
419        dependencies: HashSet<ObjectId>,
420        specific_resource_group: Option<String>,
421        if_not_exists: bool,
422    ) -> Result<WaitVersion> {
423        let request = CreateMaterializedViewRequest {
424            materialized_view: Some(table),
425            fragment_graph: Some(graph),
426            backfill: PbBackfillType::Regular as _,
427            dependencies: dependencies.into_iter().collect(),
428            specific_resource_group,
429            if_not_exists,
430        };
431        let resp = self.inner.create_materialized_view(request).await?;
432        // TODO: handle error in `resp.status` here
433        Ok(resp
434            .version
435            .ok_or_else(|| anyhow!("wait version not set"))?)
436    }
437
438    pub async fn drop_materialized_view(
439        &self,
440        table_id: TableId,
441        cascade: bool,
442    ) -> Result<WaitVersion> {
443        let request = DropMaterializedViewRequest {
444            table_id: table_id.table_id(),
445            cascade,
446        };
447
448        let resp = self.inner.drop_materialized_view(request).await?;
449        Ok(resp
450            .version
451            .ok_or_else(|| anyhow!("wait version not set"))?)
452    }
453
454    pub async fn create_source(
455        &self,
456        source: PbSource,
457        graph: Option<StreamFragmentGraph>,
458        if_not_exists: bool,
459    ) -> Result<WaitVersion> {
460        let request = CreateSourceRequest {
461            source: Some(source),
462            fragment_graph: graph,
463            if_not_exists,
464        };
465
466        let resp = self.inner.create_source(request).await?;
467        Ok(resp
468            .version
469            .ok_or_else(|| anyhow!("wait version not set"))?)
470    }
471
472    pub async fn create_sink(
473        &self,
474        sink: PbSink,
475        graph: StreamFragmentGraph,
476        affected_table_change: Option<ReplaceJobPlan>,
477        dependencies: HashSet<ObjectId>,
478        if_not_exists: bool,
479    ) -> Result<WaitVersion> {
480        let request = CreateSinkRequest {
481            sink: Some(sink),
482            fragment_graph: Some(graph),
483            affected_table_change,
484            dependencies: dependencies.into_iter().collect(),
485            if_not_exists,
486        };
487
488        let resp = self.inner.create_sink(request).await?;
489        Ok(resp
490            .version
491            .ok_or_else(|| anyhow!("wait version not set"))?)
492    }
493
494    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
495        let request = CreateSubscriptionRequest {
496            subscription: Some(subscription),
497        };
498
499        let resp = self.inner.create_subscription(request).await?;
500        Ok(resp
501            .version
502            .ok_or_else(|| anyhow!("wait version not set"))?)
503    }
504
505    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
506        let request = CreateFunctionRequest {
507            function: Some(function),
508        };
509        let resp = self.inner.create_function(request).await?;
510        Ok(resp
511            .version
512            .ok_or_else(|| anyhow!("wait version not set"))?)
513    }
514
515    pub async fn create_table(
516        &self,
517        source: Option<PbSource>,
518        table: PbTable,
519        graph: StreamFragmentGraph,
520        job_type: PbTableJobType,
521        if_not_exists: bool,
522        dependencies: HashSet<ObjectId>,
523    ) -> Result<WaitVersion> {
524        let request = CreateTableRequest {
525            materialized_view: Some(table),
526            fragment_graph: Some(graph),
527            source,
528            job_type: job_type as _,
529            if_not_exists,
530            dependencies: dependencies.into_iter().collect(),
531        };
532        let resp = self.inner.create_table(request).await?;
533        // TODO: handle error in `resp.status` here
534        Ok(resp
535            .version
536            .ok_or_else(|| anyhow!("wait version not set"))?)
537    }
538
539    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
540        let request = CommentOnRequest {
541            comment: Some(comment),
542        };
543        let resp = self.inner.comment_on(request).await?;
544        Ok(resp
545            .version
546            .ok_or_else(|| anyhow!("wait version not set"))?)
547    }
548
549    pub async fn alter_name(
550        &self,
551        object: alter_name_request::Object,
552        name: &str,
553    ) -> Result<WaitVersion> {
554        let request = AlterNameRequest {
555            object: Some(object),
556            new_name: name.to_owned(),
557        };
558        let resp = self.inner.alter_name(request).await?;
559        Ok(resp
560            .version
561            .ok_or_else(|| anyhow!("wait version not set"))?)
562    }
563
564    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
565        let request = AlterOwnerRequest {
566            object: Some(object),
567            owner_id,
568        };
569        let resp = self.inner.alter_owner(request).await?;
570        Ok(resp
571            .version
572            .ok_or_else(|| anyhow!("wait version not set"))?)
573    }
574
575    pub async fn alter_database_param(
576        &self,
577        database_id: DatabaseId,
578        param: AlterDatabaseParam,
579    ) -> Result<WaitVersion> {
580        let request = match param {
581            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
582                let barrier_interval_ms = OptionalUint32 {
583                    value: barrier_interval_ms,
584                };
585                AlterDatabaseParamRequest {
586                    database_id,
587                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
588                        barrier_interval_ms,
589                    )),
590                }
591            }
592            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
593                let checkpoint_frequency = OptionalUint64 {
594                    value: checkpoint_frequency,
595                };
596                AlterDatabaseParamRequest {
597                    database_id,
598                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
599                        checkpoint_frequency,
600                    )),
601                }
602            }
603        };
604        let resp = self.inner.alter_database_param(request).await?;
605        Ok(resp
606            .version
607            .ok_or_else(|| anyhow!("wait version not set"))?)
608    }
609
610    pub async fn alter_set_schema(
611        &self,
612        object: alter_set_schema_request::Object,
613        new_schema_id: u32,
614    ) -> Result<WaitVersion> {
615        let request = AlterSetSchemaRequest {
616            new_schema_id,
617            object: Some(object),
618        };
619        let resp = self.inner.alter_set_schema(request).await?;
620        Ok(resp
621            .version
622            .ok_or_else(|| anyhow!("wait version not set"))?)
623    }
624
625    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
626        let request = AlterSourceRequest {
627            source: Some(source),
628        };
629        let resp = self.inner.alter_source(request).await?;
630        Ok(resp
631            .version
632            .ok_or_else(|| anyhow!("wait version not set"))?)
633    }
634
635    pub async fn alter_parallelism(
636        &self,
637        table_id: u32,
638        parallelism: PbTableParallelism,
639        deferred: bool,
640    ) -> Result<()> {
641        let request = AlterParallelismRequest {
642            table_id,
643            parallelism: Some(parallelism),
644            deferred,
645        };
646
647        self.inner.alter_parallelism(request).await?;
648        Ok(())
649    }
650
651    pub async fn alter_cdc_table_backfill_parallelism(
652        &self,
653        table_id: u32,
654        parallelism: PbTableParallelism,
655    ) -> Result<()> {
656        let request = AlterCdcTableBackfillParallelismRequest {
657            table_id,
658            parallelism: Some(parallelism),
659        };
660        self.inner
661            .alter_cdc_table_backfill_parallelism(request)
662            .await?;
663        Ok(())
664    }
665
666    pub async fn alter_resource_group(
667        &self,
668        table_id: u32,
669        resource_group: Option<String>,
670        deferred: bool,
671    ) -> Result<()> {
672        let request = AlterResourceGroupRequest {
673            table_id,
674            resource_group,
675            deferred,
676        };
677
678        self.inner.alter_resource_group(request).await?;
679        Ok(())
680    }
681
682    pub async fn alter_swap_rename(
683        &self,
684        object: alter_swap_rename_request::Object,
685    ) -> Result<WaitVersion> {
686        let request = AlterSwapRenameRequest {
687            object: Some(object),
688        };
689        let resp = self.inner.alter_swap_rename(request).await?;
690        Ok(resp
691            .version
692            .ok_or_else(|| anyhow!("wait version not set"))?)
693    }
694
695    pub async fn alter_secret(
696        &self,
697        secret_id: u32,
698        secret_name: String,
699        database_id: u32,
700        schema_id: u32,
701        owner_id: u32,
702        value: Vec<u8>,
703    ) -> Result<WaitVersion> {
704        let request = AlterSecretRequest {
705            secret_id,
706            name: secret_name,
707            database_id,
708            schema_id,
709            owner_id,
710            value,
711        };
712        let resp = self.inner.alter_secret(request).await?;
713        Ok(resp
714            .version
715            .ok_or_else(|| anyhow!("wait version not set"))?)
716    }
717
718    pub async fn replace_job(
719        &self,
720        graph: StreamFragmentGraph,
721        replace_job: ReplaceJob,
722    ) -> Result<WaitVersion> {
723        let request = ReplaceJobPlanRequest {
724            plan: Some(ReplaceJobPlan {
725                fragment_graph: Some(graph),
726                replace_job: Some(replace_job),
727            }),
728        };
729        let resp = self.inner.replace_job_plan(request).await?;
730        // TODO: handle error in `resp.status` here
731        Ok(resp
732            .version
733            .ok_or_else(|| anyhow!("wait version not set"))?)
734    }
735
736    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
737        let request = AutoSchemaChangeRequest {
738            schema_change: Some(schema_change),
739        };
740        let _ = self.inner.auto_schema_change(request).await?;
741        Ok(())
742    }
743
744    pub async fn create_view(
745        &self,
746        view: PbView,
747        dependencies: HashSet<ObjectId>,
748    ) -> Result<WaitVersion> {
749        let request = CreateViewRequest {
750            view: Some(view),
751            dependencies: dependencies.into_iter().collect(),
752        };
753        let resp = self.inner.create_view(request).await?;
754        // TODO: handle error in `resp.status` here
755        Ok(resp
756            .version
757            .ok_or_else(|| anyhow!("wait version not set"))?)
758    }
759
760    pub async fn create_index(
761        &self,
762        index: PbIndex,
763        table: PbTable,
764        graph: StreamFragmentGraph,
765        if_not_exists: bool,
766    ) -> Result<WaitVersion> {
767        let request = CreateIndexRequest {
768            index: Some(index),
769            index_table: Some(table),
770            fragment_graph: Some(graph),
771            if_not_exists,
772        };
773        let resp = self.inner.create_index(request).await?;
774        // TODO: handle error in `resp.status` here
775        Ok(resp
776            .version
777            .ok_or_else(|| anyhow!("wait version not set"))?)
778    }
779
780    pub async fn drop_table(
781        &self,
782        source_id: Option<u32>,
783        table_id: TableId,
784        cascade: bool,
785    ) -> Result<WaitVersion> {
786        let request = DropTableRequest {
787            source_id: source_id.map(SourceId::Id),
788            table_id: table_id.table_id(),
789            cascade,
790        };
791
792        let resp = self.inner.drop_table(request).await?;
793        Ok(resp
794            .version
795            .ok_or_else(|| anyhow!("wait version not set"))?)
796    }
797
798    pub async fn compact_iceberg_table(&self, sink_id: u32) -> Result<u64> {
799        let request = CompactIcebergTableRequest { sink_id };
800        let resp = self.inner.compact_iceberg_table(request).await?;
801        Ok(resp.task_id)
802    }
803
804    pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
805        let request = DropViewRequest { view_id, cascade };
806        let resp = self.inner.drop_view(request).await?;
807        Ok(resp
808            .version
809            .ok_or_else(|| anyhow!("wait version not set"))?)
810    }
811
812    pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
813        let request = DropSourceRequest { source_id, cascade };
814        let resp = self.inner.drop_source(request).await?;
815        Ok(resp
816            .version
817            .ok_or_else(|| anyhow!("wait version not set"))?)
818    }
819
820    pub async fn drop_sink(
821        &self,
822        sink_id: u32,
823        cascade: bool,
824        affected_table_change: Option<ReplaceJobPlan>,
825    ) -> Result<WaitVersion> {
826        let request = DropSinkRequest {
827            sink_id,
828            cascade,
829            affected_table_change,
830        };
831        let resp = self.inner.drop_sink(request).await?;
832        Ok(resp
833            .version
834            .ok_or_else(|| anyhow!("wait version not set"))?)
835    }
836
837    pub async fn drop_subscription(
838        &self,
839        subscription_id: u32,
840        cascade: bool,
841    ) -> Result<WaitVersion> {
842        let request = DropSubscriptionRequest {
843            subscription_id,
844            cascade,
845        };
846        let resp = self.inner.drop_subscription(request).await?;
847        Ok(resp
848            .version
849            .ok_or_else(|| anyhow!("wait version not set"))?)
850    }
851
852    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
853        let request = DropIndexRequest {
854            index_id: index_id.index_id,
855            cascade,
856        };
857        let resp = self.inner.drop_index(request).await?;
858        Ok(resp
859            .version
860            .ok_or_else(|| anyhow!("wait version not set"))?)
861    }
862
863    pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
864        let request = DropFunctionRequest {
865            function_id: function_id.0,
866        };
867        let resp = self.inner.drop_function(request).await?;
868        Ok(resp
869            .version
870            .ok_or_else(|| anyhow!("wait version not set"))?)
871    }
872
873    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
874        let request = DropDatabaseRequest { database_id };
875        let resp = self.inner.drop_database(request).await?;
876        Ok(resp
877            .version
878            .ok_or_else(|| anyhow!("wait version not set"))?)
879    }
880
881    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
882        let request = DropSchemaRequest { schema_id, cascade };
883        let resp = self.inner.drop_schema(request).await?;
884        Ok(resp
885            .version
886            .ok_or_else(|| anyhow!("wait version not set"))?)
887    }
888
889    // TODO: using UserInfoVersion instead as return type.
890    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
891        let request = CreateUserRequest { user: Some(user) };
892        let resp = self.inner.create_user(request).await?;
893        Ok(resp.version)
894    }
895
896    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
897        let request = DropUserRequest { user_id };
898        let resp = self.inner.drop_user(request).await?;
899        Ok(resp.version)
900    }
901
902    pub async fn update_user(
903        &self,
904        user: UserInfo,
905        update_fields: Vec<UpdateField>,
906    ) -> Result<u64> {
907        let request = UpdateUserRequest {
908            user: Some(user),
909            update_fields: update_fields
910                .into_iter()
911                .map(|field| field as i32)
912                .collect::<Vec<_>>(),
913        };
914        let resp = self.inner.update_user(request).await?;
915        Ok(resp.version)
916    }
917
918    pub async fn grant_privilege(
919        &self,
920        user_ids: Vec<u32>,
921        privileges: Vec<GrantPrivilege>,
922        with_grant_option: bool,
923        granted_by: u32,
924    ) -> Result<u64> {
925        let request = GrantPrivilegeRequest {
926            user_ids,
927            privileges,
928            with_grant_option,
929            granted_by,
930        };
931        let resp = self.inner.grant_privilege(request).await?;
932        Ok(resp.version)
933    }
934
935    pub async fn revoke_privilege(
936        &self,
937        user_ids: Vec<u32>,
938        privileges: Vec<GrantPrivilege>,
939        granted_by: u32,
940        revoke_by: u32,
941        revoke_grant_option: bool,
942        cascade: bool,
943    ) -> Result<u64> {
944        let request = RevokePrivilegeRequest {
945            user_ids,
946            privileges,
947            granted_by,
948            revoke_by,
949            revoke_grant_option,
950            cascade,
951        };
952        let resp = self.inner.revoke_privilege(request).await?;
953        Ok(resp.version)
954    }
955
956    pub async fn alter_default_privilege(
957        &self,
958        user_ids: Vec<u32>,
959        database_id: DatabaseId,
960        schema_ids: Vec<SchemaId>,
961        operation: AlterDefaultPrivilegeOperation,
962        granted_by: u32,
963    ) -> Result<()> {
964        let request = AlterDefaultPrivilegeRequest {
965            user_ids,
966            database_id,
967            schema_ids,
968            operation: Some(operation),
969            granted_by,
970        };
971        self.inner.alter_default_privilege(request).await?;
972        Ok(())
973    }
974
975    /// Unregister the current node from the cluster.
976    pub async fn unregister(&self) -> Result<()> {
977        let request = DeleteWorkerNodeRequest {
978            host: Some(self.host_addr.to_protobuf()),
979        };
980        self.inner.delete_worker_node(request).await?;
981        self.shutting_down.store(true, Relaxed);
982        Ok(())
983    }
984
985    /// Try to unregister the current worker from the cluster with best effort. Log the result.
986    pub async fn try_unregister(&self) {
987        match self.unregister().await {
988            Ok(_) => {
989                tracing::info!(
990                    worker_id = self.worker_id(),
991                    "successfully unregistered from meta service",
992                )
993            }
994            Err(e) => {
995                tracing::warn!(
996                    error = %e.as_report(),
997                    worker_id = self.worker_id(),
998                    "failed to unregister from meta service",
999                );
1000            }
1001        }
1002    }
1003
1004    pub async fn update_schedulability(
1005        &self,
1006        worker_ids: &[u32],
1007        schedulability: Schedulability,
1008    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1009        let request = UpdateWorkerNodeSchedulabilityRequest {
1010            worker_ids: worker_ids.to_vec(),
1011            schedulability: schedulability.into(),
1012        };
1013        let resp = self
1014            .inner
1015            .update_worker_node_schedulability(request)
1016            .await?;
1017        Ok(resp)
1018    }
1019
1020    pub async fn list_worker_nodes(
1021        &self,
1022        worker_type: Option<WorkerType>,
1023    ) -> Result<Vec<WorkerNode>> {
1024        let request = ListAllNodesRequest {
1025            worker_type: worker_type.map(Into::into),
1026            include_starting_nodes: true,
1027        };
1028        let resp = self.inner.list_all_nodes(request).await?;
1029        Ok(resp.nodes)
1030    }
1031
1032    /// Starts a heartbeat worker.
1033    pub fn start_heartbeat_loop(
1034        meta_client: MetaClient,
1035        min_interval: Duration,
1036    ) -> (JoinHandle<()>, Sender<()>) {
1037        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1038        let join_handle = tokio::spawn(async move {
1039            let mut min_interval_ticker = tokio::time::interval(min_interval);
1040            loop {
1041                tokio::select! {
1042                    biased;
1043                    // Shutdown
1044                    _ = &mut shutdown_rx => {
1045                        tracing::info!("Heartbeat loop is stopped");
1046                        return;
1047                    }
1048                    // Wait for interval
1049                    _ = min_interval_ticker.tick() => {},
1050                }
1051                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1052                match tokio::time::timeout(
1053                    // TODO: decide better min_interval for timeout
1054                    min_interval * 3,
1055                    meta_client.send_heartbeat(meta_client.worker_id()),
1056                )
1057                .await
1058                {
1059                    Ok(Ok(_)) => {}
1060                    Ok(Err(err)) => {
1061                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1062                    }
1063                    Err(_) => {
1064                        tracing::warn!("Failed to send_heartbeat: timeout");
1065                    }
1066                }
1067            }
1068        });
1069        (join_handle, shutdown_tx)
1070    }
1071
1072    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1073        let request = RisectlListStateTablesRequest {};
1074        let resp = self.inner.risectl_list_state_tables(request).await?;
1075        Ok(resp.tables)
1076    }
1077
1078    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1079        let request = FlushRequest { database_id };
1080        let resp = self.inner.flush(request).await?;
1081        Ok(HummockVersionId::new(resp.hummock_version_id))
1082    }
1083
1084    pub async fn wait(&self) -> Result<()> {
1085        let request = WaitRequest {};
1086        self.inner.wait(request).await?;
1087        Ok(())
1088    }
1089
1090    pub async fn recover(&self) -> Result<()> {
1091        let request = RecoverRequest {};
1092        self.inner.recover(request).await?;
1093        Ok(())
1094    }
1095
1096    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1097        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1098        let resp = self.inner.cancel_creating_jobs(request).await?;
1099        Ok(resp.canceled_jobs)
1100    }
1101
1102    pub async fn list_table_fragments(
1103        &self,
1104        table_ids: &[u32],
1105    ) -> Result<HashMap<u32, TableFragmentInfo>> {
1106        let request = ListTableFragmentsRequest {
1107            table_ids: table_ids.to_vec(),
1108        };
1109        let resp = self.inner.list_table_fragments(request).await?;
1110        Ok(resp.table_fragments)
1111    }
1112
1113    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1114        let resp = self
1115            .inner
1116            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1117            .await?;
1118        Ok(resp.states)
1119    }
1120
1121    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1122        let resp = self
1123            .inner
1124            .list_fragment_distribution(ListFragmentDistributionRequest {})
1125            .await?;
1126        Ok(resp.distributions)
1127    }
1128
1129    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1130        let resp = self
1131            .inner
1132            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1133            .await?;
1134        Ok(resp.distributions)
1135    }
1136
1137    pub async fn get_fragment_by_id(
1138        &self,
1139        fragment_id: u32,
1140    ) -> Result<Option<FragmentDistribution>> {
1141        let resp = self
1142            .inner
1143            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1144            .await?;
1145        Ok(resp.distribution)
1146    }
1147
1148    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1149        let resp = self
1150            .inner
1151            .list_actor_states(ListActorStatesRequest {})
1152            .await?;
1153        Ok(resp.states)
1154    }
1155
1156    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1157        let resp = self
1158            .inner
1159            .list_actor_splits(ListActorSplitsRequest {})
1160            .await?;
1161
1162        Ok(resp.actor_splits)
1163    }
1164
1165    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1166        let resp = self
1167            .inner
1168            .list_object_dependencies(ListObjectDependenciesRequest {})
1169            .await?;
1170        Ok(resp.dependencies)
1171    }
1172
1173    pub async fn pause(&self) -> Result<PauseResponse> {
1174        let request = PauseRequest {};
1175        let resp = self.inner.pause(request).await?;
1176        Ok(resp)
1177    }
1178
1179    pub async fn resume(&self) -> Result<ResumeResponse> {
1180        let request = ResumeRequest {};
1181        let resp = self.inner.resume(request).await?;
1182        Ok(resp)
1183    }
1184
1185    pub async fn apply_throttle(
1186        &self,
1187        kind: PbThrottleTarget,
1188        id: u32,
1189        rate: Option<u32>,
1190    ) -> Result<ApplyThrottleResponse> {
1191        let request = ApplyThrottleRequest {
1192            kind: kind as i32,
1193            id,
1194            rate,
1195        };
1196        let resp = self.inner.apply_throttle(request).await?;
1197        Ok(resp)
1198    }
1199
1200    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1201        let resp = self
1202            .inner
1203            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1204            .await?;
1205        Ok(resp.get_status().unwrap())
1206    }
1207
1208    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1209        let request = GetClusterInfoRequest {};
1210        let resp = self.inner.get_cluster_info(request).await?;
1211        Ok(resp)
1212    }
1213
1214    pub async fn reschedule(
1215        &self,
1216        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1217        revision: u64,
1218        resolve_no_shuffle_upstream: bool,
1219    ) -> Result<(bool, u64)> {
1220        let request = RescheduleRequest {
1221            revision,
1222            resolve_no_shuffle_upstream,
1223            worker_reschedules,
1224        };
1225        let resp = self.inner.reschedule(request).await?;
1226        Ok((resp.success, resp.revision))
1227    }
1228
1229    pub async fn risectl_get_pinned_versions_summary(
1230        &self,
1231    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1232        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1233        self.inner
1234            .rise_ctl_get_pinned_versions_summary(request)
1235            .await
1236    }
1237
1238    pub async fn risectl_get_checkpoint_hummock_version(
1239        &self,
1240    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1241        let request = RiseCtlGetCheckpointVersionRequest {};
1242        self.inner.rise_ctl_get_checkpoint_version(request).await
1243    }
1244
1245    pub async fn risectl_pause_hummock_version_checkpoint(
1246        &self,
1247    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1248        let request = RiseCtlPauseVersionCheckpointRequest {};
1249        self.inner.rise_ctl_pause_version_checkpoint(request).await
1250    }
1251
1252    pub async fn risectl_resume_hummock_version_checkpoint(
1253        &self,
1254    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1255        let request = RiseCtlResumeVersionCheckpointRequest {};
1256        self.inner.rise_ctl_resume_version_checkpoint(request).await
1257    }
1258
1259    pub async fn init_metadata_for_replay(
1260        &self,
1261        tables: Vec<PbTable>,
1262        compaction_groups: Vec<CompactionGroupInfo>,
1263    ) -> Result<()> {
1264        let req = InitMetadataForReplayRequest {
1265            tables,
1266            compaction_groups,
1267        };
1268        let _resp = self.inner.init_metadata_for_replay(req).await?;
1269        Ok(())
1270    }
1271
1272    pub async fn replay_version_delta(
1273        &self,
1274        version_delta: HummockVersionDelta,
1275    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1276        let req = ReplayVersionDeltaRequest {
1277            version_delta: Some(version_delta.into()),
1278        };
1279        let resp = self.inner.replay_version_delta(req).await?;
1280        Ok((
1281            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1282            resp.modified_compaction_groups,
1283        ))
1284    }
1285
1286    pub async fn list_version_deltas(
1287        &self,
1288        start_id: HummockVersionId,
1289        num_limit: u32,
1290        committed_epoch_limit: HummockEpoch,
1291    ) -> Result<Vec<HummockVersionDelta>> {
1292        let req = ListVersionDeltasRequest {
1293            start_id: start_id.to_u64(),
1294            num_limit,
1295            committed_epoch_limit,
1296        };
1297        Ok(self
1298            .inner
1299            .list_version_deltas(req)
1300            .await?
1301            .version_deltas
1302            .unwrap()
1303            .version_deltas
1304            .iter()
1305            .map(HummockVersionDelta::from_rpc_protobuf)
1306            .collect())
1307    }
1308
1309    pub async fn trigger_compaction_deterministic(
1310        &self,
1311        version_id: HummockVersionId,
1312        compaction_groups: Vec<CompactionGroupId>,
1313    ) -> Result<()> {
1314        let req = TriggerCompactionDeterministicRequest {
1315            version_id: version_id.to_u64(),
1316            compaction_groups,
1317        };
1318        self.inner.trigger_compaction_deterministic(req).await?;
1319        Ok(())
1320    }
1321
1322    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1323        let req = DisableCommitEpochRequest {};
1324        Ok(HummockVersion::from_rpc_protobuf(
1325            &self
1326                .inner
1327                .disable_commit_epoch(req)
1328                .await?
1329                .current_version
1330                .unwrap(),
1331        ))
1332    }
1333
1334    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1335        let req = GetAssignedCompactTaskNumRequest {};
1336        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1337        Ok(resp.num_tasks as usize)
1338    }
1339
1340    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1341        let req = RiseCtlListCompactionGroupRequest {};
1342        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1343        Ok(resp.compaction_groups)
1344    }
1345
1346    pub async fn risectl_update_compaction_config(
1347        &self,
1348        compaction_groups: &[CompactionGroupId],
1349        configs: &[MutableConfig],
1350    ) -> Result<()> {
1351        let req = RiseCtlUpdateCompactionConfigRequest {
1352            compaction_group_ids: compaction_groups.to_vec(),
1353            configs: configs
1354                .iter()
1355                .map(
1356                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1357                        mutable_config: Some(c.clone()),
1358                    },
1359                )
1360                .collect(),
1361        };
1362        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1363        Ok(())
1364    }
1365
1366    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1367        let req = BackupMetaRequest { remarks };
1368        let resp = self.inner.backup_meta(req).await?;
1369        Ok(resp.job_id)
1370    }
1371
1372    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1373        let req = GetBackupJobStatusRequest { job_id };
1374        let resp = self.inner.get_backup_job_status(req).await?;
1375        Ok((resp.job_status(), resp.message))
1376    }
1377
1378    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1379        let req = DeleteMetaSnapshotRequest {
1380            snapshot_ids: snapshot_ids.to_vec(),
1381        };
1382        let _resp = self.inner.delete_meta_snapshot(req).await?;
1383        Ok(())
1384    }
1385
1386    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1387        let req = GetMetaSnapshotManifestRequest {};
1388        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1389        Ok(resp.manifest.expect("should exist"))
1390    }
1391
1392    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1393        let req = GetTelemetryInfoRequest {};
1394        let resp = self.inner.get_telemetry_info(req).await?;
1395        Ok(resp)
1396    }
1397
1398    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1399        let req = GetMetaStoreInfoRequest {};
1400        let resp = self.inner.get_meta_store_info(req).await?;
1401        Ok(resp.meta_store_endpoint)
1402    }
1403
1404    pub async fn alter_sink_props(
1405        &self,
1406        sink_id: u32,
1407        changed_props: BTreeMap<String, String>,
1408        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1409        connector_conn_ref: Option<u32>,
1410    ) -> Result<()> {
1411        let req = AlterConnectorPropsRequest {
1412            object_id: sink_id,
1413            changed_props: changed_props.into_iter().collect(),
1414            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1415            connector_conn_ref,
1416            object_type: AlterConnectorPropsObject::Sink as i32,
1417        };
1418        let _resp = self.inner.alter_connector_props(req).await?;
1419        Ok(())
1420    }
1421
1422    pub async fn alter_source_connector_props(
1423        &self,
1424        source_id: u32,
1425        changed_props: BTreeMap<String, String>,
1426        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1427        connector_conn_ref: Option<u32>,
1428    ) -> Result<()> {
1429        let req = AlterConnectorPropsRequest {
1430            object_id: source_id,
1431            changed_props: changed_props.into_iter().collect(),
1432            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1433            connector_conn_ref,
1434            object_type: AlterConnectorPropsObject::Source as i32,
1435        };
1436        let _resp = self.inner.alter_connector_props(req).await?;
1437        Ok(())
1438    }
1439
1440    pub async fn set_system_param(
1441        &self,
1442        param: String,
1443        value: Option<String>,
1444    ) -> Result<Option<SystemParamsReader>> {
1445        let req = SetSystemParamRequest { param, value };
1446        let resp = self.inner.set_system_param(req).await?;
1447        Ok(resp.params.map(SystemParamsReader::from))
1448    }
1449
1450    pub async fn get_session_params(&self) -> Result<String> {
1451        let req = GetSessionParamsRequest {};
1452        let resp = self.inner.get_session_params(req).await?;
1453        Ok(resp.params)
1454    }
1455
1456    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1457        let req = SetSessionParamRequest { param, value };
1458        let resp = self.inner.set_session_param(req).await?;
1459        Ok(resp.param)
1460    }
1461
1462    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1463        let req = GetDdlProgressRequest {};
1464        let resp = self.inner.get_ddl_progress(req).await?;
1465        Ok(resp.ddl_progress)
1466    }
1467
1468    pub async fn split_compaction_group(
1469        &self,
1470        group_id: CompactionGroupId,
1471        table_ids_to_new_group: &[StateTableId],
1472        partition_vnode_count: u32,
1473    ) -> Result<CompactionGroupId> {
1474        let req = SplitCompactionGroupRequest {
1475            group_id,
1476            table_ids: table_ids_to_new_group.to_vec(),
1477            partition_vnode_count,
1478        };
1479        let resp = self.inner.split_compaction_group(req).await?;
1480        Ok(resp.new_group_id)
1481    }
1482
1483    pub async fn get_tables(
1484        &self,
1485        table_ids: &[u32],
1486        include_dropped_tables: bool,
1487    ) -> Result<HashMap<u32, Table>> {
1488        let req = GetTablesRequest {
1489            table_ids: table_ids.to_vec(),
1490            include_dropped_tables,
1491        };
1492        let resp = self.inner.get_tables(req).await?;
1493        Ok(resp.tables)
1494    }
1495
1496    pub async fn list_serving_vnode_mappings(
1497        &self,
1498    ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1499        let req = GetServingVnodeMappingsRequest {};
1500        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1501        let mappings = resp
1502            .worker_slot_mappings
1503            .into_iter()
1504            .map(|p| {
1505                (
1506                    p.fragment_id,
1507                    (
1508                        resp.fragment_to_table
1509                            .get(&p.fragment_id)
1510                            .cloned()
1511                            .unwrap_or(0),
1512                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1513                    ),
1514                )
1515            })
1516            .collect();
1517        Ok(mappings)
1518    }
1519
1520    pub async fn risectl_list_compaction_status(
1521        &self,
1522    ) -> Result<(
1523        Vec<CompactStatus>,
1524        Vec<CompactTaskAssignment>,
1525        Vec<CompactTaskProgress>,
1526    )> {
1527        let req = RiseCtlListCompactionStatusRequest {};
1528        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1529        Ok((
1530            resp.compaction_statuses,
1531            resp.task_assignment,
1532            resp.task_progress,
1533        ))
1534    }
1535
1536    pub async fn get_compaction_score(
1537        &self,
1538        compaction_group_id: CompactionGroupId,
1539    ) -> Result<Vec<PickerInfo>> {
1540        let req = GetCompactionScoreRequest {
1541            compaction_group_id,
1542        };
1543        let resp = self.inner.get_compaction_score(req).await?;
1544        Ok(resp.scores)
1545    }
1546
1547    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1548        let req = RiseCtlRebuildTableStatsRequest {};
1549        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1550        Ok(())
1551    }
1552
1553    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1554        let req = ListBranchedObjectRequest {};
1555        let resp = self.inner.list_branched_object(req).await?;
1556        Ok(resp.branched_objects)
1557    }
1558
1559    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1560        let req = ListActiveWriteLimitRequest {};
1561        let resp = self.inner.list_active_write_limit(req).await?;
1562        Ok(resp.write_limits)
1563    }
1564
1565    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1566        let req = ListHummockMetaConfigRequest {};
1567        let resp = self.inner.list_hummock_meta_config(req).await?;
1568        Ok(resp.configs)
1569    }
1570
1571    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1572        let _resp = self
1573            .inner
1574            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1575            .await?;
1576
1577        Ok(())
1578    }
1579
1580    pub async fn rw_cloud_validate_source(
1581        &self,
1582        source_type: SourceType,
1583        source_config: HashMap<String, String>,
1584    ) -> Result<RwCloudValidateSourceResponse> {
1585        let req = RwCloudValidateSourceRequest {
1586            source_type: source_type.into(),
1587            source_config,
1588        };
1589        let resp = self.inner.rw_cloud_validate_source(req).await?;
1590        Ok(resp)
1591    }
1592
1593    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1594        self.inner.core.read().await.sink_coordinate_client.clone()
1595    }
1596
1597    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1598        let req = ListCompactTaskAssignmentRequest {};
1599        let resp = self.inner.list_compact_task_assignment(req).await?;
1600        Ok(resp.task_assignment)
1601    }
1602
1603    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1604        let req = ListEventLogRequest::default();
1605        let resp = self.inner.list_event_log(req).await?;
1606        Ok(resp.event_logs)
1607    }
1608
1609    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1610        let req = ListCompactTaskProgressRequest {};
1611        let resp = self.inner.list_compact_task_progress(req).await?;
1612        Ok(resp.task_progress)
1613    }
1614
1615    #[cfg(madsim)]
1616    pub fn try_add_panic_event_blocking(
1617        &self,
1618        panic_info: impl Display,
1619        timeout_millis: Option<u64>,
1620    ) {
1621    }
1622
1623    /// If `timeout_millis` is None, default is used.
1624    #[cfg(not(madsim))]
1625    pub fn try_add_panic_event_blocking(
1626        &self,
1627        panic_info: impl Display,
1628        timeout_millis: Option<u64>,
1629    ) {
1630        let event = event_log::EventWorkerNodePanic {
1631            worker_id: self.worker_id,
1632            worker_type: self.worker_type.into(),
1633            host_addr: Some(self.host_addr.to_protobuf()),
1634            panic_info: format!("{panic_info}"),
1635        };
1636        let grpc_meta_client = self.inner.clone();
1637        let _ = thread::spawn(move || {
1638            let rt = tokio::runtime::Builder::new_current_thread()
1639                .enable_all()
1640                .build()
1641                .unwrap();
1642            let req = AddEventLogRequest {
1643                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1644            };
1645            rt.block_on(async {
1646                let _ = tokio::time::timeout(
1647                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1648                    grpc_meta_client.add_event_log(req),
1649                )
1650                .await;
1651            });
1652        })
1653        .join();
1654    }
1655
1656    pub async fn add_sink_fail_evet(
1657        &self,
1658        sink_id: u32,
1659        sink_name: String,
1660        connector: String,
1661        error: String,
1662    ) -> Result<()> {
1663        let event = event_log::EventSinkFail {
1664            sink_id,
1665            sink_name,
1666            connector,
1667            error,
1668        };
1669        let req = AddEventLogRequest {
1670            event: Some(add_event_log_request::Event::SinkFail(event)),
1671        };
1672        self.inner.add_event_log(req).await?;
1673        Ok(())
1674    }
1675
1676    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1677        let req = CancelCompactTaskRequest {
1678            task_id,
1679            task_status: task_status as _,
1680        };
1681        let resp = self.inner.cancel_compact_task(req).await?;
1682        Ok(resp.ret)
1683    }
1684
1685    pub async fn get_version_by_epoch(
1686        &self,
1687        epoch: HummockEpoch,
1688        table_id: u32,
1689    ) -> Result<PbHummockVersion> {
1690        let req = GetVersionByEpochRequest { epoch, table_id };
1691        let resp = self.inner.get_version_by_epoch(req).await?;
1692        Ok(resp.version.unwrap())
1693    }
1694
1695    pub async fn get_cluster_limits(
1696        &self,
1697    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1698        let req = GetClusterLimitsRequest {};
1699        let resp = self.inner.get_cluster_limits(req).await?;
1700        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1701    }
1702
1703    pub async fn merge_compaction_group(
1704        &self,
1705        left_group_id: CompactionGroupId,
1706        right_group_id: CompactionGroupId,
1707    ) -> Result<()> {
1708        let req = MergeCompactionGroupRequest {
1709            left_group_id,
1710            right_group_id,
1711        };
1712        self.inner.merge_compaction_group(req).await?;
1713        Ok(())
1714    }
1715
1716    /// List all rate limits for sources and backfills
1717    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1718        let request = ListRateLimitsRequest {};
1719        let resp = self.inner.list_rate_limits(request).await?;
1720        Ok(resp.rate_limits)
1721    }
1722
1723    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1724        let request = ListIcebergTablesRequest {};
1725        let resp = self.inner.list_iceberg_tables(request).await?;
1726        Ok(resp.iceberg_tables)
1727    }
1728
1729    /// Get the await tree of all nodes in the cluster.
1730    pub async fn get_cluster_stack_trace(
1731        &self,
1732        actor_traces_format: ActorTracesFormat,
1733    ) -> Result<StackTraceResponse> {
1734        let request = StackTraceRequest {
1735            actor_traces_format: actor_traces_format as i32,
1736        };
1737        let resp = self.inner.stack_trace(request).await?;
1738        Ok(resp)
1739    }
1740
1741    pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1742        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1743        self.inner.set_sync_log_store_aligned(request).await?;
1744        Ok(())
1745    }
1746
1747    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1748        self.inner.refresh(request).await
1749    }
1750}
1751
1752#[async_trait]
1753impl HummockMetaClient for MetaClient {
1754    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1755        let req = UnpinVersionBeforeRequest {
1756            context_id: self.worker_id(),
1757            unpin_version_before: unpin_version_before.to_u64(),
1758        };
1759        self.inner.unpin_version_before(req).await?;
1760        Ok(())
1761    }
1762
1763    async fn get_current_version(&self) -> Result<HummockVersion> {
1764        let req = GetCurrentVersionRequest::default();
1765        Ok(HummockVersion::from_rpc_protobuf(
1766            &self
1767                .inner
1768                .get_current_version(req)
1769                .await?
1770                .current_version
1771                .unwrap(),
1772        ))
1773    }
1774
1775    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1776        let resp = self
1777            .inner
1778            .get_new_object_ids(GetNewObjectIdsRequest { number })
1779            .await?;
1780        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1781    }
1782
1783    async fn commit_epoch_with_change_log(
1784        &self,
1785        _epoch: HummockEpoch,
1786        _sync_result: SyncResult,
1787        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1788    ) -> Result<()> {
1789        panic!("Only meta service can commit_epoch in production.")
1790    }
1791
1792    async fn trigger_manual_compaction(
1793        &self,
1794        compaction_group_id: u64,
1795        table_id: u32,
1796        level: u32,
1797        sst_ids: Vec<u64>,
1798    ) -> Result<()> {
1799        // TODO: support key_range parameter
1800        let req = TriggerManualCompactionRequest {
1801            compaction_group_id,
1802            table_id,
1803            // if table_id not exist, manual_compaction will include all the sst
1804            // without check internal_table_id
1805            level,
1806            sst_ids,
1807            ..Default::default()
1808        };
1809
1810        self.inner.trigger_manual_compaction(req).await?;
1811        Ok(())
1812    }
1813
1814    async fn trigger_full_gc(
1815        &self,
1816        sst_retention_time_sec: u64,
1817        prefix: Option<String>,
1818    ) -> Result<()> {
1819        self.inner
1820            .trigger_full_gc(TriggerFullGcRequest {
1821                sst_retention_time_sec,
1822                prefix,
1823            })
1824            .await?;
1825        Ok(())
1826    }
1827
1828    async fn subscribe_compaction_event(
1829        &self,
1830    ) -> Result<(
1831        UnboundedSender<SubscribeCompactionEventRequest>,
1832        BoxStream<'static, CompactionEventItem>,
1833    )> {
1834        let (request_sender, request_receiver) =
1835            unbounded_channel::<SubscribeCompactionEventRequest>();
1836        request_sender
1837            .send(SubscribeCompactionEventRequest {
1838                event: Some(subscribe_compaction_event_request::Event::Register(
1839                    Register {
1840                        context_id: self.worker_id(),
1841                    },
1842                )),
1843                create_at: SystemTime::now()
1844                    .duration_since(SystemTime::UNIX_EPOCH)
1845                    .expect("Clock may have gone backwards")
1846                    .as_millis() as u64,
1847            })
1848            .context("Failed to subscribe compaction event")?;
1849
1850        let stream = self
1851            .inner
1852            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1853                request_receiver,
1854            )))
1855            .await?;
1856
1857        Ok((request_sender, Box::pin(stream)))
1858    }
1859
1860    async fn get_version_by_epoch(
1861        &self,
1862        epoch: HummockEpoch,
1863        table_id: u32,
1864    ) -> Result<PbHummockVersion> {
1865        self.get_version_by_epoch(epoch, table_id).await
1866    }
1867
1868    async fn subscribe_iceberg_compaction_event(
1869        &self,
1870    ) -> Result<(
1871        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1872        BoxStream<'static, IcebergCompactionEventItem>,
1873    )> {
1874        let (request_sender, request_receiver) =
1875            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1876        request_sender
1877            .send(SubscribeIcebergCompactionEventRequest {
1878                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1879                    IcebergRegister {
1880                        context_id: self.worker_id(),
1881                    },
1882                )),
1883                create_at: SystemTime::now()
1884                    .duration_since(SystemTime::UNIX_EPOCH)
1885                    .expect("Clock may have gone backwards")
1886                    .as_millis() as u64,
1887            })
1888            .context("Failed to subscribe compaction event")?;
1889
1890        let stream = self
1891            .inner
1892            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1893                request_receiver,
1894            )))
1895            .await?;
1896
1897        Ok((request_sender, Box::pin(stream)))
1898    }
1899}
1900
1901#[async_trait]
1902impl TelemetryInfoFetcher for MetaClient {
1903    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1904        let resp = self
1905            .get_telemetry_info()
1906            .await
1907            .map_err(|e| e.to_report_string())?;
1908        let tracking_id = resp.get_tracking_id().ok();
1909        Ok(tracking_id.map(|id| id.to_owned()))
1910    }
1911}
1912
1913pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1914
1915#[derive(Debug, Clone)]
1916struct GrpcMetaClientCore {
1917    cluster_client: ClusterServiceClient<Channel>,
1918    meta_member_client: MetaMemberServiceClient<Channel>,
1919    heartbeat_client: HeartbeatServiceClient<Channel>,
1920    ddl_client: DdlServiceClient<Channel>,
1921    hummock_client: HummockManagerServiceClient<Channel>,
1922    notification_client: NotificationServiceClient<Channel>,
1923    stream_client: StreamManagerServiceClient<Channel>,
1924    user_client: UserServiceClient<Channel>,
1925    scale_client: ScaleServiceClient<Channel>,
1926    backup_client: BackupServiceClient<Channel>,
1927    telemetry_client: TelemetryInfoServiceClient<Channel>,
1928    system_params_client: SystemParamsServiceClient<Channel>,
1929    session_params_client: SessionParamServiceClient<Channel>,
1930    serving_client: ServingServiceClient<Channel>,
1931    cloud_client: CloudServiceClient<Channel>,
1932    sink_coordinate_client: SinkCoordinationRpcClient,
1933    event_log_client: EventLogServiceClient<Channel>,
1934    cluster_limit_client: ClusterLimitServiceClient<Channel>,
1935    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1936    monitor_client: MonitorServiceClient<Channel>,
1937}
1938
1939impl GrpcMetaClientCore {
1940    pub(crate) fn new(channel: Channel) -> Self {
1941        let cluster_client = ClusterServiceClient::new(channel.clone());
1942        let meta_member_client = MetaMemberClient::new(channel.clone());
1943        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1944        let ddl_client =
1945            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1946        let hummock_client =
1947            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1948        let notification_client =
1949            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1950        let stream_client =
1951            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1952        let user_client = UserServiceClient::new(channel.clone());
1953        let scale_client =
1954            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1955        let backup_client = BackupServiceClient::new(channel.clone());
1956        let telemetry_client =
1957            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1958        let system_params_client = SystemParamsServiceClient::new(channel.clone());
1959        let session_params_client = SessionParamServiceClient::new(channel.clone());
1960        let serving_client = ServingServiceClient::new(channel.clone());
1961        let cloud_client = CloudServiceClient::new(channel.clone());
1962        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1963        let event_log_client = EventLogServiceClient::new(channel.clone());
1964        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1965        let hosted_iceberg_catalog_service_client =
1966            HostedIcebergCatalogServiceClient::new(channel.clone());
1967        let monitor_client = MonitorServiceClient::new(channel);
1968
1969        GrpcMetaClientCore {
1970            cluster_client,
1971            meta_member_client,
1972            heartbeat_client,
1973            ddl_client,
1974            hummock_client,
1975            notification_client,
1976            stream_client,
1977            user_client,
1978            scale_client,
1979            backup_client,
1980            telemetry_client,
1981            system_params_client,
1982            session_params_client,
1983            serving_client,
1984            cloud_client,
1985            sink_coordinate_client,
1986            event_log_client,
1987            cluster_limit_client,
1988            hosted_iceberg_catalog_service_client,
1989            monitor_client,
1990        }
1991    }
1992}
1993
1994/// Client to meta server. Cloning the instance is lightweight.
1995///
1996/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
1997#[derive(Debug, Clone)]
1998struct GrpcMetaClient {
1999    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2000    core: Arc<RwLock<GrpcMetaClientCore>>,
2001}
2002
2003type MetaMemberClient = MetaMemberServiceClient<Channel>;
2004
2005struct MetaMemberGroup {
2006    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2007}
2008
2009struct MetaMemberManagement {
2010    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2011    members: Either<MetaMemberClient, MetaMemberGroup>,
2012    current_leader: http::Uri,
2013    meta_config: MetaConfig,
2014}
2015
2016impl MetaMemberManagement {
2017    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2018
2019    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2020        format!("http://{}:{}", addr.host, addr.port)
2021            .parse()
2022            .unwrap()
2023    }
2024
2025    async fn recreate_core(&self, channel: Channel) {
2026        let mut core = self.core_ref.write().await;
2027        *core = GrpcMetaClientCore::new(channel);
2028    }
2029
2030    async fn refresh_members(&mut self) -> Result<()> {
2031        let leader_addr = match self.members.as_mut() {
2032            Either::Left(client) => {
2033                let resp = client
2034                    .to_owned()
2035                    .members(MembersRequest {})
2036                    .await
2037                    .map_err(RpcError::from_meta_status)?;
2038                let resp = resp.into_inner();
2039                resp.members.into_iter().find(|member| member.is_leader)
2040            }
2041            Either::Right(member_group) => {
2042                let mut fetched_members = None;
2043
2044                for (addr, client) in &mut member_group.members {
2045                    let members: Result<_> = try {
2046                        let mut client = match client {
2047                            Some(cached_client) => cached_client.to_owned(),
2048                            None => {
2049                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2050                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2051                                    .await
2052                                    .context("failed to create client")?;
2053                                let new_client: MetaMemberClient =
2054                                    MetaMemberServiceClient::new(channel);
2055                                *client = Some(new_client.clone());
2056
2057                                new_client
2058                            }
2059                        };
2060
2061                        let resp = client
2062                            .members(MembersRequest {})
2063                            .await
2064                            .context("failed to fetch members")?;
2065
2066                        resp.into_inner().members
2067                    };
2068
2069                    let fetched = members.is_ok();
2070                    fetched_members = Some(members);
2071                    if fetched {
2072                        break;
2073                    }
2074                }
2075
2076                let members = fetched_members
2077                    .context("no member available in the list")?
2078                    .context("could not refresh members")?;
2079
2080                // find new leader
2081                let mut leader = None;
2082                for member in members {
2083                    if member.is_leader {
2084                        leader = Some(member.clone());
2085                    }
2086
2087                    let addr = Self::host_address_to_uri(member.address.unwrap());
2088                    // We don't clean any expired addrs here to deal with some extreme situations.
2089                    if !member_group.members.contains(&addr) {
2090                        tracing::info!("new meta member joined: {}", addr);
2091                        member_group.members.put(addr, None);
2092                    }
2093                }
2094
2095                leader
2096            }
2097        };
2098
2099        if let Some(leader) = leader_addr {
2100            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2101
2102            if discovered_leader != self.current_leader {
2103                tracing::info!("new meta leader {} discovered", discovered_leader);
2104
2105                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2106                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2107                    false,
2108                );
2109
2110                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2111                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2112                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2113                })
2114                .await?;
2115
2116                self.recreate_core(channel).await;
2117                self.current_leader = discovered_leader;
2118            }
2119        }
2120
2121        Ok(())
2122    }
2123}
2124
2125impl GrpcMetaClient {
2126    // See `Endpoint::http2_keep_alive_interval`
2127    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2128    // See `Endpoint::keep_alive_timeout`
2129    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2130    // Retry base interval in ms for connecting to meta server.
2131    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2132    // Max retry times for connecting to meta server.
2133    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2134
2135    fn start_meta_member_monitor(
2136        &self,
2137        init_leader_addr: http::Uri,
2138        members: Either<MetaMemberClient, MetaMemberGroup>,
2139        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2140        meta_config: MetaConfig,
2141    ) -> Result<()> {
2142        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2143        let current_leader = init_leader_addr;
2144
2145        let enable_period_tick = matches!(members, Either::Right(_));
2146
2147        let member_management = MetaMemberManagement {
2148            core_ref,
2149            members,
2150            current_leader,
2151            meta_config,
2152        };
2153
2154        let mut force_refresh_receiver = force_refresh_receiver;
2155
2156        tokio::spawn(async move {
2157            let mut member_management = member_management;
2158            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2159
2160            loop {
2161                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2162                    tokio::select! {
2163                        _ = ticker.tick() => None,
2164                        result_sender = force_refresh_receiver.recv() => {
2165                            if result_sender.is_none() {
2166                                break;
2167                            }
2168
2169                            result_sender
2170                        },
2171                    }
2172                } else {
2173                    let result_sender = force_refresh_receiver.recv().await;
2174
2175                    if result_sender.is_none() {
2176                        break;
2177                    }
2178
2179                    result_sender
2180                };
2181
2182                let tick_result = member_management.refresh_members().await;
2183                if let Err(e) = tick_result.as_ref() {
2184                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2185                }
2186
2187                if let Some(sender) = event {
2188                    // ignore resp
2189                    let _resp = sender.send(tick_result);
2190                }
2191            }
2192        });
2193
2194        Ok(())
2195    }
2196
2197    async fn force_refresh_leader(&self) -> Result<()> {
2198        let (sender, receiver) = oneshot::channel();
2199
2200        self.member_monitor_event_sender
2201            .send(sender)
2202            .await
2203            .map_err(|e| anyhow!(e))?;
2204
2205        receiver.await.map_err(|e| anyhow!(e))?
2206    }
2207
2208    /// Connect to the meta server from `addrs`.
2209    pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2210        let (channel, addr) = match strategy {
2211            MetaAddressStrategy::LoadBalance(addr) => {
2212                Self::try_build_rpc_channel(vec![addr.clone()]).await
2213            }
2214            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2215        }?;
2216        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2217        let client = GrpcMetaClient {
2218            member_monitor_event_sender: force_refresh_sender,
2219            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2220        };
2221
2222        let meta_member_client = client.core.read().await.meta_member_client.clone();
2223        let members = match strategy {
2224            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2225            MetaAddressStrategy::List(addrs) => {
2226                let mut members = LruCache::new(20);
2227                for addr in addrs {
2228                    members.put(addr.clone(), None);
2229                }
2230                members.put(addr.clone(), Some(meta_member_client));
2231
2232                Either::Right(MetaMemberGroup { members })
2233            }
2234        };
2235
2236        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2237
2238        client.force_refresh_leader().await?;
2239
2240        Ok(client)
2241    }
2242
2243    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2244        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2245    }
2246
2247    pub(crate) async fn try_build_rpc_channel(
2248        addrs: impl IntoIterator<Item = http::Uri>,
2249    ) -> Result<(Channel, http::Uri)> {
2250        let endpoints: Vec<_> = addrs
2251            .into_iter()
2252            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2253            .collect();
2254
2255        let mut last_error = None;
2256
2257        for (endpoint, addr) in endpoints {
2258            match Self::connect_to_endpoint(endpoint).await {
2259                Ok(channel) => {
2260                    tracing::info!("Connect to meta server {} successfully", addr);
2261                    return Ok((channel, addr));
2262                }
2263                Err(e) => {
2264                    tracing::warn!(
2265                        error = %e.as_report(),
2266                        "Failed to connect to meta server {}, trying again",
2267                        addr,
2268                    );
2269                    last_error = Some(e);
2270                }
2271            }
2272        }
2273
2274        if let Some(last_error) = last_error {
2275            Err(anyhow::anyhow!(last_error)
2276                .context("failed to connect to all meta servers")
2277                .into())
2278        } else {
2279            bail!("no meta server address provided")
2280        }
2281    }
2282
2283    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2284        let channel = endpoint
2285            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2286            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2287            .connect_timeout(Duration::from_secs(5))
2288            .monitored_connect("grpc-meta-client", Default::default())
2289            .await?
2290            .wrapped();
2291
2292        Ok(channel)
2293    }
2294
2295    pub(crate) fn retry_strategy_to_bound(
2296        high_bound: Duration,
2297        exceed: bool,
2298    ) -> impl Iterator<Item = Duration> {
2299        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2300            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2301            .map(jitter);
2302
2303        let mut sum = Duration::default();
2304
2305        iter.take_while(move |duration| {
2306            sum += *duration;
2307
2308            if exceed {
2309                sum < high_bound + *duration
2310            } else {
2311                sum < high_bound
2312            }
2313        })
2314    }
2315}
2316
2317macro_rules! for_all_meta_rpc {
2318    ($macro:ident) => {
2319        $macro! {
2320             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2321            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2322            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2323            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2324            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2325            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2326            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2327            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2328            ,{ stream_client, flush, FlushRequest, FlushResponse }
2329            ,{ stream_client, pause, PauseRequest, PauseResponse }
2330            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2331            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2332            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2333            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2334            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2335            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2336            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2337            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2338            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2339            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2340            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2341            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2342            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2343            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2344            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2345            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2346            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2347            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2348            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2349            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2350            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2351            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2352            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2353            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2354            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2355            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2356            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2357            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2358            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2359            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2360            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2361            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2362            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2363            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2364            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2365            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2366            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2367            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2368            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2369            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2370            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2371            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2372            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2373            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2374            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2375            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2376            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2377            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2378            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2379            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2380            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2381            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2382            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2383            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2384            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2385            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2386            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2387            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2388            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2389            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2390            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2391            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2392            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2393            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2394            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2395            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2396            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2397            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2398            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2399            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2400            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2401            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2402            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2403            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2404            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2405            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2406            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2407            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2408            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2409            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2410            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2411            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2412            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2413            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2414            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2415            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2416            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2417            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2418            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2419            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2420            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2421            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2422            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2423            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2424            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2425            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2426            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2427            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2428            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2429            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2430            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2431            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2432            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2433            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2434            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2435            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2436            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2437            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2438            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2439            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2440            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2441            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2442            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2443            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2444            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2445        }
2446    };
2447}
2448
2449impl GrpcMetaClient {
2450    async fn refresh_client_if_needed(&self, code: Code) {
2451        if matches!(
2452            code,
2453            Code::Unknown | Code::Unimplemented | Code::Unavailable
2454        ) {
2455            tracing::debug!("matching tonic code {}", code);
2456            let (result_sender, result_receiver) = oneshot::channel();
2457            if self
2458                .member_monitor_event_sender
2459                .try_send(result_sender)
2460                .is_ok()
2461            {
2462                if let Ok(Err(e)) = result_receiver.await {
2463                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2464                }
2465            } else {
2466                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2467            }
2468        }
2469    }
2470}
2471
2472impl GrpcMetaClient {
2473    for_all_meta_rpc! { meta_rpc_client_method_impl }
2474}