risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55    PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77    subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
80use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
81use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
82use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
83use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
84use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
85use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
86use risingwave_pb::meta::list_actor_states_response::ActorState;
87use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
88use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
89use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
90use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
91use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
92use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
93use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
94use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
95use risingwave_pb::meta::serving_service_client::ServingServiceClient;
96use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
97use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
98use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
99use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
100use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
101use risingwave_pb::meta::{FragmentDistribution, *};
102use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
103use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
104use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
105use risingwave_pb::secret::PbSecretRef;
106use risingwave_pb::stream_plan::StreamFragmentGraph;
107use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
108use risingwave_pb::user::update_user_request::UpdateField;
109use risingwave_pb::user::user_service_client::UserServiceClient;
110use risingwave_pb::user::*;
111use thiserror_ext::AsReport;
112use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
113use tokio::sync::oneshot::Sender;
114use tokio::sync::{RwLock, mpsc, oneshot};
115use tokio::task::JoinHandle;
116use tokio::time::{self};
117use tokio_retry::strategy::{ExponentialBackoff, jitter};
118use tokio_stream::wrappers::UnboundedReceiverStream;
119use tonic::transport::Endpoint;
120use tonic::{Code, Request, Streaming};
121
122use crate::channel::{Channel, WrappedChannelExt};
123use crate::error::{Result, RpcError};
124use crate::hummock_meta_client::{
125    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
126    IcebergCompactionEventItem,
127};
128use crate::meta_rpc_client_method_impl;
129
130type ConnectionId = u32;
131type DatabaseId = u32;
132type SchemaId = u32;
133
134/// Client to meta server. Cloning the instance is lightweight.
135#[derive(Clone, Debug)]
136pub struct MetaClient {
137    worker_id: u32,
138    worker_type: WorkerType,
139    host_addr: HostAddr,
140    inner: GrpcMetaClient,
141    meta_config: MetaConfig,
142    cluster_id: String,
143    shutting_down: Arc<AtomicBool>,
144}
145
146impl MetaClient {
147    pub fn worker_id(&self) -> u32 {
148        self.worker_id
149    }
150
151    pub fn host_addr(&self) -> &HostAddr {
152        &self.host_addr
153    }
154
155    pub fn worker_type(&self) -> WorkerType {
156        self.worker_type
157    }
158
159    pub fn cluster_id(&self) -> &str {
160        &self.cluster_id
161    }
162
163    /// Subscribe to notification from meta.
164    pub async fn subscribe(
165        &self,
166        subscribe_type: SubscribeType,
167    ) -> Result<Streaming<SubscribeResponse>> {
168        let request = SubscribeRequest {
169            subscribe_type: subscribe_type as i32,
170            host: Some(self.host_addr.to_protobuf()),
171            worker_id: self.worker_id(),
172        };
173
174        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
175            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
176            true,
177        );
178
179        tokio_retry::Retry::spawn(retry_strategy, || async {
180            let request = request.clone();
181            self.inner.subscribe(request).await
182        })
183        .await
184    }
185
186    pub async fn create_connection(
187        &self,
188        connection_name: String,
189        database_id: u32,
190        schema_id: u32,
191        owner_id: u32,
192        req: create_connection_request::Payload,
193    ) -> Result<WaitVersion> {
194        let request = CreateConnectionRequest {
195            name: connection_name,
196            database_id,
197            schema_id,
198            owner_id,
199            payload: Some(req),
200        };
201        let resp = self.inner.create_connection(request).await?;
202        Ok(resp
203            .version
204            .ok_or_else(|| anyhow!("wait version not set"))?)
205    }
206
207    pub async fn create_secret(
208        &self,
209        secret_name: String,
210        database_id: u32,
211        schema_id: u32,
212        owner_id: u32,
213        value: Vec<u8>,
214    ) -> Result<WaitVersion> {
215        let request = CreateSecretRequest {
216            name: secret_name,
217            database_id,
218            schema_id,
219            owner_id,
220            value,
221        };
222        let resp = self.inner.create_secret(request).await?;
223        Ok(resp
224            .version
225            .ok_or_else(|| anyhow!("wait version not set"))?)
226    }
227
228    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
229        let request = ListConnectionsRequest {};
230        let resp = self.inner.list_connections(request).await?;
231        Ok(resp.connections)
232    }
233
234    pub async fn drop_connection(
235        &self,
236        connection_id: ConnectionId,
237        cascade: bool,
238    ) -> Result<WaitVersion> {
239        let request = DropConnectionRequest {
240            connection_id,
241            cascade,
242        };
243        let resp = self.inner.drop_connection(request).await?;
244        Ok(resp
245            .version
246            .ok_or_else(|| anyhow!("wait version not set"))?)
247    }
248
249    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
250        let request = DropSecretRequest {
251            secret_id: secret_id.into(),
252        };
253        let resp = self.inner.drop_secret(request).await?;
254        Ok(resp
255            .version
256            .ok_or_else(|| anyhow!("wait version not set"))?)
257    }
258
259    /// Register the current node to the cluster and set the corresponding worker id.
260    ///
261    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
262    pub async fn register_new(
263        addr_strategy: MetaAddressStrategy,
264        worker_type: WorkerType,
265        addr: &HostAddr,
266        property: Property,
267        meta_config: &MetaConfig,
268    ) -> (Self, SystemParamsReader) {
269        let ret =
270            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
271
272        match ret {
273            Ok(ret) => ret,
274            Err(err) => {
275                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
276                std::process::exit(1);
277            }
278        }
279    }
280
281    async fn register_new_inner(
282        addr_strategy: MetaAddressStrategy,
283        worker_type: WorkerType,
284        addr: &HostAddr,
285        property: Property,
286        meta_config: &MetaConfig,
287    ) -> Result<(Self, SystemParamsReader)> {
288        tracing::info!("register meta client using strategy: {}", addr_strategy);
289
290        // Retry until reaching `max_heartbeat_interval_secs`
291        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
292            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
293            true,
294        );
295
296        if property.is_unschedulable {
297            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
298        }
299        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
300            retry_strategy,
301            || async {
302                let grpc_meta_client =
303                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
304
305                let add_worker_resp = grpc_meta_client
306                    .add_worker_node(AddWorkerNodeRequest {
307                        worker_type: worker_type as i32,
308                        host: Some(addr.to_protobuf()),
309                        property: Some(property.clone()),
310                        resource: Some(risingwave_pb::common::worker_node::Resource {
311                            rw_version: RW_VERSION.to_owned(),
312                            total_memory_bytes: system_memory_available_bytes() as _,
313                            total_cpu_cores: total_cpu_available() as _,
314                        }),
315                    })
316                    .await
317                    .context("failed to add worker node")?;
318
319                let system_params_resp = grpc_meta_client
320                    .get_system_params(GetSystemParamsRequest {})
321                    .await
322                    .context("failed to get initial system params")?;
323
324                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325            },
326            // Only retry if there's any transient connection issue.
327            // If the error is from our implementation or business, do not retry it.
328            |e: &RpcError| !e.is_from_tonic_server_impl(),
329        )
330        .await;
331
332        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333        let worker_id = add_worker_resp
334            .node_id
335            .expect("AddWorkerNodeResponse::node_id is empty");
336
337        let meta_client = Self {
338            worker_id,
339            worker_type,
340            host_addr: addr.clone(),
341            inner: grpc_meta_client,
342            meta_config: meta_config.to_owned(),
343            cluster_id: add_worker_resp.cluster_id,
344            shutting_down: Arc::new(false.into()),
345        };
346
347        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348        REPORT_PANIC.call_once(|| {
349            let meta_client_clone = meta_client.clone();
350            std::panic::update_hook(move |default_hook, info| {
351                // Try to report panic event to meta node.
352                meta_client_clone.try_add_panic_event_blocking(info, None);
353                default_hook(info);
354            });
355        });
356
357        Ok((meta_client, system_params_resp.params.unwrap().into()))
358    }
359
360    /// Activate the current node in cluster to confirm it's ready to serve.
361    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362        let request = ActivateWorkerNodeRequest {
363            host: Some(addr.to_protobuf()),
364            node_id: self.worker_id,
365        };
366        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368            true,
369        );
370        tokio_retry::Retry::spawn(retry_strategy, || async {
371            let request = request.clone();
372            self.inner.activate_worker_node(request).await
373        })
374        .await?;
375
376        Ok(())
377    }
378
379    /// Send heartbeat signal to meta service.
380    pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
381        let request = HeartbeatRequest { node_id };
382        let resp = self.inner.heartbeat(request).await?;
383        if let Some(status) = resp.status
384            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
385        {
386            // Ignore the error if we're already shutting down.
387            // Otherwise, exit the process.
388            if !self.shutting_down.load(Relaxed) {
389                tracing::error!(message = status.message, "worker expired");
390                std::process::exit(1);
391            }
392        }
393        Ok(())
394    }
395
396    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
397        let request = CreateDatabaseRequest { db: Some(db) };
398        let resp = self.inner.create_database(request).await?;
399        // TODO: handle error in `resp.status` here
400        Ok(resp
401            .version
402            .ok_or_else(|| anyhow!("wait version not set"))?)
403    }
404
405    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
406        let request = CreateSchemaRequest {
407            schema: Some(schema),
408        };
409        let resp = self.inner.create_schema(request).await?;
410        // TODO: handle error in `resp.status` here
411        Ok(resp
412            .version
413            .ok_or_else(|| anyhow!("wait version not set"))?)
414    }
415
416    pub async fn create_materialized_view(
417        &self,
418        table: PbTable,
419        graph: StreamFragmentGraph,
420        dependencies: HashSet<ObjectId>,
421        specific_resource_group: Option<String>,
422        if_not_exists: bool,
423    ) -> Result<WaitVersion> {
424        let request = CreateMaterializedViewRequest {
425            materialized_view: Some(table),
426            fragment_graph: Some(graph),
427            backfill: PbBackfillType::Regular as _,
428            dependencies: dependencies.into_iter().collect(),
429            specific_resource_group,
430            if_not_exists,
431        };
432        let resp = self.inner.create_materialized_view(request).await?;
433        // TODO: handle error in `resp.status` here
434        Ok(resp
435            .version
436            .ok_or_else(|| anyhow!("wait version not set"))?)
437    }
438
439    pub async fn drop_materialized_view(
440        &self,
441        table_id: TableId,
442        cascade: bool,
443    ) -> Result<WaitVersion> {
444        let request = DropMaterializedViewRequest {
445            table_id: table_id.table_id(),
446            cascade,
447        };
448
449        let resp = self.inner.drop_materialized_view(request).await?;
450        Ok(resp
451            .version
452            .ok_or_else(|| anyhow!("wait version not set"))?)
453    }
454
455    pub async fn create_source(
456        &self,
457        source: PbSource,
458        graph: Option<StreamFragmentGraph>,
459        if_not_exists: bool,
460    ) -> Result<WaitVersion> {
461        let request = CreateSourceRequest {
462            source: Some(source),
463            fragment_graph: graph,
464            if_not_exists,
465        };
466
467        let resp = self.inner.create_source(request).await?;
468        Ok(resp
469            .version
470            .ok_or_else(|| anyhow!("wait version not set"))?)
471    }
472
473    pub async fn create_sink(
474        &self,
475        sink: PbSink,
476        graph: StreamFragmentGraph,
477        dependencies: HashSet<ObjectId>,
478        if_not_exists: bool,
479    ) -> Result<WaitVersion> {
480        let request = CreateSinkRequest {
481            sink: Some(sink),
482            fragment_graph: Some(graph),
483            dependencies: dependencies.into_iter().collect(),
484            if_not_exists,
485        };
486
487        let resp = self.inner.create_sink(request).await?;
488        Ok(resp
489            .version
490            .ok_or_else(|| anyhow!("wait version not set"))?)
491    }
492
493    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
494        let request = CreateSubscriptionRequest {
495            subscription: Some(subscription),
496        };
497
498        let resp = self.inner.create_subscription(request).await?;
499        Ok(resp
500            .version
501            .ok_or_else(|| anyhow!("wait version not set"))?)
502    }
503
504    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
505        let request = CreateFunctionRequest {
506            function: Some(function),
507        };
508        let resp = self.inner.create_function(request).await?;
509        Ok(resp
510            .version
511            .ok_or_else(|| anyhow!("wait version not set"))?)
512    }
513
514    pub async fn create_table(
515        &self,
516        source: Option<PbSource>,
517        table: PbTable,
518        graph: StreamFragmentGraph,
519        job_type: PbTableJobType,
520        if_not_exists: bool,
521        dependencies: HashSet<ObjectId>,
522    ) -> Result<WaitVersion> {
523        let request = CreateTableRequest {
524            materialized_view: Some(table),
525            fragment_graph: Some(graph),
526            source,
527            job_type: job_type as _,
528            if_not_exists,
529            dependencies: dependencies.into_iter().collect(),
530        };
531        let resp = self.inner.create_table(request).await?;
532        // TODO: handle error in `resp.status` here
533        Ok(resp
534            .version
535            .ok_or_else(|| anyhow!("wait version not set"))?)
536    }
537
538    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
539        let request = CommentOnRequest {
540            comment: Some(comment),
541        };
542        let resp = self.inner.comment_on(request).await?;
543        Ok(resp
544            .version
545            .ok_or_else(|| anyhow!("wait version not set"))?)
546    }
547
548    pub async fn alter_name(
549        &self,
550        object: alter_name_request::Object,
551        name: &str,
552    ) -> Result<WaitVersion> {
553        let request = AlterNameRequest {
554            object: Some(object),
555            new_name: name.to_owned(),
556        };
557        let resp = self.inner.alter_name(request).await?;
558        Ok(resp
559            .version
560            .ok_or_else(|| anyhow!("wait version not set"))?)
561    }
562
563    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
564        let request = AlterOwnerRequest {
565            object: Some(object),
566            owner_id,
567        };
568        let resp = self.inner.alter_owner(request).await?;
569        Ok(resp
570            .version
571            .ok_or_else(|| anyhow!("wait version not set"))?)
572    }
573
574    pub async fn alter_database_param(
575        &self,
576        database_id: DatabaseId,
577        param: AlterDatabaseParam,
578    ) -> Result<WaitVersion> {
579        let request = match param {
580            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
581                let barrier_interval_ms = OptionalUint32 {
582                    value: barrier_interval_ms,
583                };
584                AlterDatabaseParamRequest {
585                    database_id,
586                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
587                        barrier_interval_ms,
588                    )),
589                }
590            }
591            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
592                let checkpoint_frequency = OptionalUint64 {
593                    value: checkpoint_frequency,
594                };
595                AlterDatabaseParamRequest {
596                    database_id,
597                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
598                        checkpoint_frequency,
599                    )),
600                }
601            }
602        };
603        let resp = self.inner.alter_database_param(request).await?;
604        Ok(resp
605            .version
606            .ok_or_else(|| anyhow!("wait version not set"))?)
607    }
608
609    pub async fn alter_set_schema(
610        &self,
611        object: alter_set_schema_request::Object,
612        new_schema_id: u32,
613    ) -> Result<WaitVersion> {
614        let request = AlterSetSchemaRequest {
615            new_schema_id,
616            object: Some(object),
617        };
618        let resp = self.inner.alter_set_schema(request).await?;
619        Ok(resp
620            .version
621            .ok_or_else(|| anyhow!("wait version not set"))?)
622    }
623
624    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
625        let request = AlterSourceRequest {
626            source: Some(source),
627        };
628        let resp = self.inner.alter_source(request).await?;
629        Ok(resp
630            .version
631            .ok_or_else(|| anyhow!("wait version not set"))?)
632    }
633
634    pub async fn alter_parallelism(
635        &self,
636        table_id: u32,
637        parallelism: PbTableParallelism,
638        deferred: bool,
639    ) -> Result<()> {
640        let request = AlterParallelismRequest {
641            table_id,
642            parallelism: Some(parallelism),
643            deferred,
644        };
645
646        self.inner.alter_parallelism(request).await?;
647        Ok(())
648    }
649
650    pub async fn alter_cdc_table_backfill_parallelism(
651        &self,
652        table_id: u32,
653        parallelism: PbTableParallelism,
654    ) -> Result<()> {
655        let request = AlterCdcTableBackfillParallelismRequest {
656            table_id,
657            parallelism: Some(parallelism),
658        };
659        self.inner
660            .alter_cdc_table_backfill_parallelism(request)
661            .await?;
662        Ok(())
663    }
664
665    pub async fn alter_resource_group(
666        &self,
667        table_id: u32,
668        resource_group: Option<String>,
669        deferred: bool,
670    ) -> Result<()> {
671        let request = AlterResourceGroupRequest {
672            table_id,
673            resource_group,
674            deferred,
675        };
676
677        self.inner.alter_resource_group(request).await?;
678        Ok(())
679    }
680
681    pub async fn alter_swap_rename(
682        &self,
683        object: alter_swap_rename_request::Object,
684    ) -> Result<WaitVersion> {
685        let request = AlterSwapRenameRequest {
686            object: Some(object),
687        };
688        let resp = self.inner.alter_swap_rename(request).await?;
689        Ok(resp
690            .version
691            .ok_or_else(|| anyhow!("wait version not set"))?)
692    }
693
694    pub async fn alter_secret(
695        &self,
696        secret_id: u32,
697        secret_name: String,
698        database_id: u32,
699        schema_id: u32,
700        owner_id: u32,
701        value: Vec<u8>,
702    ) -> Result<WaitVersion> {
703        let request = AlterSecretRequest {
704            secret_id,
705            name: secret_name,
706            database_id,
707            schema_id,
708            owner_id,
709            value,
710        };
711        let resp = self.inner.alter_secret(request).await?;
712        Ok(resp
713            .version
714            .ok_or_else(|| anyhow!("wait version not set"))?)
715    }
716
717    pub async fn replace_job(
718        &self,
719        graph: StreamFragmentGraph,
720        replace_job: ReplaceJob,
721    ) -> Result<WaitVersion> {
722        let request = ReplaceJobPlanRequest {
723            plan: Some(ReplaceJobPlan {
724                fragment_graph: Some(graph),
725                replace_job: Some(replace_job),
726            }),
727        };
728        let resp = self.inner.replace_job_plan(request).await?;
729        // TODO: handle error in `resp.status` here
730        Ok(resp
731            .version
732            .ok_or_else(|| anyhow!("wait version not set"))?)
733    }
734
735    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
736        let request = AutoSchemaChangeRequest {
737            schema_change: Some(schema_change),
738        };
739        let _ = self.inner.auto_schema_change(request).await?;
740        Ok(())
741    }
742
743    pub async fn create_view(
744        &self,
745        view: PbView,
746        dependencies: HashSet<ObjectId>,
747    ) -> Result<WaitVersion> {
748        let request = CreateViewRequest {
749            view: Some(view),
750            dependencies: dependencies.into_iter().collect(),
751        };
752        let resp = self.inner.create_view(request).await?;
753        // TODO: handle error in `resp.status` here
754        Ok(resp
755            .version
756            .ok_or_else(|| anyhow!("wait version not set"))?)
757    }
758
759    pub async fn create_index(
760        &self,
761        index: PbIndex,
762        table: PbTable,
763        graph: StreamFragmentGraph,
764        if_not_exists: bool,
765    ) -> Result<WaitVersion> {
766        let request = CreateIndexRequest {
767            index: Some(index),
768            index_table: Some(table),
769            fragment_graph: Some(graph),
770            if_not_exists,
771        };
772        let resp = self.inner.create_index(request).await?;
773        // TODO: handle error in `resp.status` here
774        Ok(resp
775            .version
776            .ok_or_else(|| anyhow!("wait version not set"))?)
777    }
778
779    pub async fn drop_table(
780        &self,
781        source_id: Option<u32>,
782        table_id: TableId,
783        cascade: bool,
784    ) -> Result<WaitVersion> {
785        let request = DropTableRequest {
786            source_id: source_id.map(SourceId::Id),
787            table_id: table_id.table_id(),
788            cascade,
789        };
790
791        let resp = self.inner.drop_table(request).await?;
792        Ok(resp
793            .version
794            .ok_or_else(|| anyhow!("wait version not set"))?)
795    }
796
797    pub async fn compact_iceberg_table(&self, sink_id: u32) -> Result<u64> {
798        let request = CompactIcebergTableRequest { sink_id };
799        let resp = self.inner.compact_iceberg_table(request).await?;
800        Ok(resp.task_id)
801    }
802
803    pub async fn expire_iceberg_table_snapshots(&self, sink_id: u32) -> Result<()> {
804        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
805        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
806        Ok(())
807    }
808
809    pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
810        let request = DropViewRequest { view_id, cascade };
811        let resp = self.inner.drop_view(request).await?;
812        Ok(resp
813            .version
814            .ok_or_else(|| anyhow!("wait version not set"))?)
815    }
816
817    pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
818        let request = DropSourceRequest { source_id, cascade };
819        let resp = self.inner.drop_source(request).await?;
820        Ok(resp
821            .version
822            .ok_or_else(|| anyhow!("wait version not set"))?)
823    }
824
825    pub async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<WaitVersion> {
826        let request = DropSinkRequest { sink_id, cascade };
827        let resp = self.inner.drop_sink(request).await?;
828        Ok(resp
829            .version
830            .ok_or_else(|| anyhow!("wait version not set"))?)
831    }
832
833    pub async fn drop_subscription(
834        &self,
835        subscription_id: u32,
836        cascade: bool,
837    ) -> Result<WaitVersion> {
838        let request = DropSubscriptionRequest {
839            subscription_id,
840            cascade,
841        };
842        let resp = self.inner.drop_subscription(request).await?;
843        Ok(resp
844            .version
845            .ok_or_else(|| anyhow!("wait version not set"))?)
846    }
847
848    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
849        let request = DropIndexRequest {
850            index_id: index_id.index_id,
851            cascade,
852        };
853        let resp = self.inner.drop_index(request).await?;
854        Ok(resp
855            .version
856            .ok_or_else(|| anyhow!("wait version not set"))?)
857    }
858
859    pub async fn drop_function(
860        &self,
861        function_id: FunctionId,
862        cascade: bool,
863    ) -> Result<WaitVersion> {
864        let request = DropFunctionRequest {
865            function_id: function_id.0,
866            cascade,
867        };
868        let resp = self.inner.drop_function(request).await?;
869        Ok(resp
870            .version
871            .ok_or_else(|| anyhow!("wait version not set"))?)
872    }
873
874    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
875        let request = DropDatabaseRequest { database_id };
876        let resp = self.inner.drop_database(request).await?;
877        Ok(resp
878            .version
879            .ok_or_else(|| anyhow!("wait version not set"))?)
880    }
881
882    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
883        let request = DropSchemaRequest { schema_id, cascade };
884        let resp = self.inner.drop_schema(request).await?;
885        Ok(resp
886            .version
887            .ok_or_else(|| anyhow!("wait version not set"))?)
888    }
889
890    // TODO: using UserInfoVersion instead as return type.
891    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
892        let request = CreateUserRequest { user: Some(user) };
893        let resp = self.inner.create_user(request).await?;
894        Ok(resp.version)
895    }
896
897    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
898        let request = DropUserRequest { user_id };
899        let resp = self.inner.drop_user(request).await?;
900        Ok(resp.version)
901    }
902
903    pub async fn update_user(
904        &self,
905        user: UserInfo,
906        update_fields: Vec<UpdateField>,
907    ) -> Result<u64> {
908        let request = UpdateUserRequest {
909            user: Some(user),
910            update_fields: update_fields
911                .into_iter()
912                .map(|field| field as i32)
913                .collect::<Vec<_>>(),
914        };
915        let resp = self.inner.update_user(request).await?;
916        Ok(resp.version)
917    }
918
919    pub async fn grant_privilege(
920        &self,
921        user_ids: Vec<u32>,
922        privileges: Vec<GrantPrivilege>,
923        with_grant_option: bool,
924        granted_by: u32,
925    ) -> Result<u64> {
926        let request = GrantPrivilegeRequest {
927            user_ids,
928            privileges,
929            with_grant_option,
930            granted_by,
931        };
932        let resp = self.inner.grant_privilege(request).await?;
933        Ok(resp.version)
934    }
935
936    pub async fn revoke_privilege(
937        &self,
938        user_ids: Vec<u32>,
939        privileges: Vec<GrantPrivilege>,
940        granted_by: u32,
941        revoke_by: u32,
942        revoke_grant_option: bool,
943        cascade: bool,
944    ) -> Result<u64> {
945        let request = RevokePrivilegeRequest {
946            user_ids,
947            privileges,
948            granted_by,
949            revoke_by,
950            revoke_grant_option,
951            cascade,
952        };
953        let resp = self.inner.revoke_privilege(request).await?;
954        Ok(resp.version)
955    }
956
957    pub async fn alter_default_privilege(
958        &self,
959        user_ids: Vec<u32>,
960        database_id: DatabaseId,
961        schema_ids: Vec<SchemaId>,
962        operation: AlterDefaultPrivilegeOperation,
963        granted_by: u32,
964    ) -> Result<()> {
965        let request = AlterDefaultPrivilegeRequest {
966            user_ids,
967            database_id,
968            schema_ids,
969            operation: Some(operation),
970            granted_by,
971        };
972        self.inner.alter_default_privilege(request).await?;
973        Ok(())
974    }
975
976    /// Unregister the current node from the cluster.
977    pub async fn unregister(&self) -> Result<()> {
978        let request = DeleteWorkerNodeRequest {
979            host: Some(self.host_addr.to_protobuf()),
980        };
981        self.inner.delete_worker_node(request).await?;
982        self.shutting_down.store(true, Relaxed);
983        Ok(())
984    }
985
986    /// Try to unregister the current worker from the cluster with best effort. Log the result.
987    pub async fn try_unregister(&self) {
988        match self.unregister().await {
989            Ok(_) => {
990                tracing::info!(
991                    worker_id = self.worker_id(),
992                    "successfully unregistered from meta service",
993                )
994            }
995            Err(e) => {
996                tracing::warn!(
997                    error = %e.as_report(),
998                    worker_id = self.worker_id(),
999                    "failed to unregister from meta service",
1000                );
1001            }
1002        }
1003    }
1004
1005    pub async fn update_schedulability(
1006        &self,
1007        worker_ids: &[u32],
1008        schedulability: Schedulability,
1009    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1010        let request = UpdateWorkerNodeSchedulabilityRequest {
1011            worker_ids: worker_ids.to_vec(),
1012            schedulability: schedulability.into(),
1013        };
1014        let resp = self
1015            .inner
1016            .update_worker_node_schedulability(request)
1017            .await?;
1018        Ok(resp)
1019    }
1020
1021    pub async fn list_worker_nodes(
1022        &self,
1023        worker_type: Option<WorkerType>,
1024    ) -> Result<Vec<WorkerNode>> {
1025        let request = ListAllNodesRequest {
1026            worker_type: worker_type.map(Into::into),
1027            include_starting_nodes: true,
1028        };
1029        let resp = self.inner.list_all_nodes(request).await?;
1030        Ok(resp.nodes)
1031    }
1032
1033    /// Starts a heartbeat worker.
1034    pub fn start_heartbeat_loop(
1035        meta_client: MetaClient,
1036        min_interval: Duration,
1037    ) -> (JoinHandle<()>, Sender<()>) {
1038        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1039        let join_handle = tokio::spawn(async move {
1040            let mut min_interval_ticker = tokio::time::interval(min_interval);
1041            loop {
1042                tokio::select! {
1043                    biased;
1044                    // Shutdown
1045                    _ = &mut shutdown_rx => {
1046                        tracing::info!("Heartbeat loop is stopped");
1047                        return;
1048                    }
1049                    // Wait for interval
1050                    _ = min_interval_ticker.tick() => {},
1051                }
1052                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1053                match tokio::time::timeout(
1054                    // TODO: decide better min_interval for timeout
1055                    min_interval * 3,
1056                    meta_client.send_heartbeat(meta_client.worker_id()),
1057                )
1058                .await
1059                {
1060                    Ok(Ok(_)) => {}
1061                    Ok(Err(err)) => {
1062                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1063                    }
1064                    Err(_) => {
1065                        tracing::warn!("Failed to send_heartbeat: timeout");
1066                    }
1067                }
1068            }
1069        });
1070        (join_handle, shutdown_tx)
1071    }
1072
1073    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1074        let request = RisectlListStateTablesRequest {};
1075        let resp = self.inner.risectl_list_state_tables(request).await?;
1076        Ok(resp.tables)
1077    }
1078
1079    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1080        let request = FlushRequest { database_id };
1081        let resp = self.inner.flush(request).await?;
1082        Ok(HummockVersionId::new(resp.hummock_version_id))
1083    }
1084
1085    pub async fn wait(&self) -> Result<()> {
1086        let request = WaitRequest {};
1087        self.inner.wait(request).await?;
1088        Ok(())
1089    }
1090
1091    pub async fn recover(&self) -> Result<()> {
1092        let request = RecoverRequest {};
1093        self.inner.recover(request).await?;
1094        Ok(())
1095    }
1096
1097    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1098        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1099        let resp = self.inner.cancel_creating_jobs(request).await?;
1100        Ok(resp.canceled_jobs)
1101    }
1102
1103    pub async fn list_table_fragments(
1104        &self,
1105        table_ids: &[u32],
1106    ) -> Result<HashMap<u32, TableFragmentInfo>> {
1107        let request = ListTableFragmentsRequest {
1108            table_ids: table_ids.to_vec(),
1109        };
1110        let resp = self.inner.list_table_fragments(request).await?;
1111        Ok(resp.table_fragments)
1112    }
1113
1114    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1115        let resp = self
1116            .inner
1117            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1118            .await?;
1119        Ok(resp.states)
1120    }
1121
1122    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1123        let resp = self
1124            .inner
1125            .list_fragment_distribution(ListFragmentDistributionRequest {})
1126            .await?;
1127        Ok(resp.distributions)
1128    }
1129
1130    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1131        let resp = self
1132            .inner
1133            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1134            .await?;
1135        Ok(resp.distributions)
1136    }
1137
1138    pub async fn get_fragment_by_id(
1139        &self,
1140        fragment_id: u32,
1141    ) -> Result<Option<FragmentDistribution>> {
1142        let resp = self
1143            .inner
1144            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1145            .await?;
1146        Ok(resp.distribution)
1147    }
1148
1149    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1150        let resp = self
1151            .inner
1152            .list_actor_states(ListActorStatesRequest {})
1153            .await?;
1154        Ok(resp.states)
1155    }
1156
1157    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1158        let resp = self
1159            .inner
1160            .list_actor_splits(ListActorSplitsRequest {})
1161            .await?;
1162
1163        Ok(resp.actor_splits)
1164    }
1165
1166    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1167        let resp = self
1168            .inner
1169            .list_object_dependencies(ListObjectDependenciesRequest {})
1170            .await?;
1171        Ok(resp.dependencies)
1172    }
1173
1174    pub async fn pause(&self) -> Result<PauseResponse> {
1175        let request = PauseRequest {};
1176        let resp = self.inner.pause(request).await?;
1177        Ok(resp)
1178    }
1179
1180    pub async fn resume(&self) -> Result<ResumeResponse> {
1181        let request = ResumeRequest {};
1182        let resp = self.inner.resume(request).await?;
1183        Ok(resp)
1184    }
1185
1186    pub async fn apply_throttle(
1187        &self,
1188        kind: PbThrottleTarget,
1189        id: u32,
1190        rate: Option<u32>,
1191    ) -> Result<ApplyThrottleResponse> {
1192        let request = ApplyThrottleRequest {
1193            kind: kind as i32,
1194            id,
1195            rate,
1196        };
1197        let resp = self.inner.apply_throttle(request).await?;
1198        Ok(resp)
1199    }
1200
1201    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1202        let resp = self
1203            .inner
1204            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1205            .await?;
1206        Ok(resp.get_status().unwrap())
1207    }
1208
1209    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1210        let request = GetClusterInfoRequest {};
1211        let resp = self.inner.get_cluster_info(request).await?;
1212        Ok(resp)
1213    }
1214
1215    pub async fn reschedule(
1216        &self,
1217        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1218        revision: u64,
1219        resolve_no_shuffle_upstream: bool,
1220    ) -> Result<(bool, u64)> {
1221        let request = RescheduleRequest {
1222            revision,
1223            resolve_no_shuffle_upstream,
1224            worker_reschedules,
1225        };
1226        let resp = self.inner.reschedule(request).await?;
1227        Ok((resp.success, resp.revision))
1228    }
1229
1230    pub async fn risectl_get_pinned_versions_summary(
1231        &self,
1232    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1233        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1234        self.inner
1235            .rise_ctl_get_pinned_versions_summary(request)
1236            .await
1237    }
1238
1239    pub async fn risectl_get_checkpoint_hummock_version(
1240        &self,
1241    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1242        let request = RiseCtlGetCheckpointVersionRequest {};
1243        self.inner.rise_ctl_get_checkpoint_version(request).await
1244    }
1245
1246    pub async fn risectl_pause_hummock_version_checkpoint(
1247        &self,
1248    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1249        let request = RiseCtlPauseVersionCheckpointRequest {};
1250        self.inner.rise_ctl_pause_version_checkpoint(request).await
1251    }
1252
1253    pub async fn risectl_resume_hummock_version_checkpoint(
1254        &self,
1255    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1256        let request = RiseCtlResumeVersionCheckpointRequest {};
1257        self.inner.rise_ctl_resume_version_checkpoint(request).await
1258    }
1259
1260    pub async fn init_metadata_for_replay(
1261        &self,
1262        tables: Vec<PbTable>,
1263        compaction_groups: Vec<CompactionGroupInfo>,
1264    ) -> Result<()> {
1265        let req = InitMetadataForReplayRequest {
1266            tables,
1267            compaction_groups,
1268        };
1269        let _resp = self.inner.init_metadata_for_replay(req).await?;
1270        Ok(())
1271    }
1272
1273    pub async fn replay_version_delta(
1274        &self,
1275        version_delta: HummockVersionDelta,
1276    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1277        let req = ReplayVersionDeltaRequest {
1278            version_delta: Some(version_delta.into()),
1279        };
1280        let resp = self.inner.replay_version_delta(req).await?;
1281        Ok((
1282            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1283            resp.modified_compaction_groups,
1284        ))
1285    }
1286
1287    pub async fn list_version_deltas(
1288        &self,
1289        start_id: HummockVersionId,
1290        num_limit: u32,
1291        committed_epoch_limit: HummockEpoch,
1292    ) -> Result<Vec<HummockVersionDelta>> {
1293        let req = ListVersionDeltasRequest {
1294            start_id: start_id.to_u64(),
1295            num_limit,
1296            committed_epoch_limit,
1297        };
1298        Ok(self
1299            .inner
1300            .list_version_deltas(req)
1301            .await?
1302            .version_deltas
1303            .unwrap()
1304            .version_deltas
1305            .iter()
1306            .map(HummockVersionDelta::from_rpc_protobuf)
1307            .collect())
1308    }
1309
1310    pub async fn trigger_compaction_deterministic(
1311        &self,
1312        version_id: HummockVersionId,
1313        compaction_groups: Vec<CompactionGroupId>,
1314    ) -> Result<()> {
1315        let req = TriggerCompactionDeterministicRequest {
1316            version_id: version_id.to_u64(),
1317            compaction_groups,
1318        };
1319        self.inner.trigger_compaction_deterministic(req).await?;
1320        Ok(())
1321    }
1322
1323    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1324        let req = DisableCommitEpochRequest {};
1325        Ok(HummockVersion::from_rpc_protobuf(
1326            &self
1327                .inner
1328                .disable_commit_epoch(req)
1329                .await?
1330                .current_version
1331                .unwrap(),
1332        ))
1333    }
1334
1335    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1336        let req = GetAssignedCompactTaskNumRequest {};
1337        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1338        Ok(resp.num_tasks as usize)
1339    }
1340
1341    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1342        let req = RiseCtlListCompactionGroupRequest {};
1343        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1344        Ok(resp.compaction_groups)
1345    }
1346
1347    pub async fn risectl_update_compaction_config(
1348        &self,
1349        compaction_groups: &[CompactionGroupId],
1350        configs: &[MutableConfig],
1351    ) -> Result<()> {
1352        let req = RiseCtlUpdateCompactionConfigRequest {
1353            compaction_group_ids: compaction_groups.to_vec(),
1354            configs: configs
1355                .iter()
1356                .map(
1357                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1358                        mutable_config: Some(c.clone()),
1359                    },
1360                )
1361                .collect(),
1362        };
1363        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1364        Ok(())
1365    }
1366
1367    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1368        let req = BackupMetaRequest { remarks };
1369        let resp = self.inner.backup_meta(req).await?;
1370        Ok(resp.job_id)
1371    }
1372
1373    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1374        let req = GetBackupJobStatusRequest { job_id };
1375        let resp = self.inner.get_backup_job_status(req).await?;
1376        Ok((resp.job_status(), resp.message))
1377    }
1378
1379    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1380        let req = DeleteMetaSnapshotRequest {
1381            snapshot_ids: snapshot_ids.to_vec(),
1382        };
1383        let _resp = self.inner.delete_meta_snapshot(req).await?;
1384        Ok(())
1385    }
1386
1387    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1388        let req = GetMetaSnapshotManifestRequest {};
1389        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1390        Ok(resp.manifest.expect("should exist"))
1391    }
1392
1393    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1394        let req = GetTelemetryInfoRequest {};
1395        let resp = self.inner.get_telemetry_info(req).await?;
1396        Ok(resp)
1397    }
1398
1399    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1400        let req = GetMetaStoreInfoRequest {};
1401        let resp = self.inner.get_meta_store_info(req).await?;
1402        Ok(resp.meta_store_endpoint)
1403    }
1404
1405    pub async fn alter_sink_props(
1406        &self,
1407        sink_id: u32,
1408        changed_props: BTreeMap<String, String>,
1409        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1410        connector_conn_ref: Option<u32>,
1411    ) -> Result<()> {
1412        let req = AlterConnectorPropsRequest {
1413            object_id: sink_id,
1414            changed_props: changed_props.into_iter().collect(),
1415            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1416            connector_conn_ref,
1417            object_type: AlterConnectorPropsObject::Sink as i32,
1418        };
1419        let _resp = self.inner.alter_connector_props(req).await?;
1420        Ok(())
1421    }
1422
1423    pub async fn alter_source_connector_props(
1424        &self,
1425        source_id: u32,
1426        changed_props: BTreeMap<String, String>,
1427        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1428        connector_conn_ref: Option<u32>,
1429    ) -> Result<()> {
1430        let req = AlterConnectorPropsRequest {
1431            object_id: source_id,
1432            changed_props: changed_props.into_iter().collect(),
1433            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1434            connector_conn_ref,
1435            object_type: AlterConnectorPropsObject::Source as i32,
1436        };
1437        let _resp = self.inner.alter_connector_props(req).await?;
1438        Ok(())
1439    }
1440
1441    pub async fn set_system_param(
1442        &self,
1443        param: String,
1444        value: Option<String>,
1445    ) -> Result<Option<SystemParamsReader>> {
1446        let req = SetSystemParamRequest { param, value };
1447        let resp = self.inner.set_system_param(req).await?;
1448        Ok(resp.params.map(SystemParamsReader::from))
1449    }
1450
1451    pub async fn get_session_params(&self) -> Result<String> {
1452        let req = GetSessionParamsRequest {};
1453        let resp = self.inner.get_session_params(req).await?;
1454        Ok(resp.params)
1455    }
1456
1457    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1458        let req = SetSessionParamRequest { param, value };
1459        let resp = self.inner.set_session_param(req).await?;
1460        Ok(resp.param)
1461    }
1462
1463    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1464        let req = GetDdlProgressRequest {};
1465        let resp = self.inner.get_ddl_progress(req).await?;
1466        Ok(resp.ddl_progress)
1467    }
1468
1469    pub async fn split_compaction_group(
1470        &self,
1471        group_id: CompactionGroupId,
1472        table_ids_to_new_group: &[StateTableId],
1473        partition_vnode_count: u32,
1474    ) -> Result<CompactionGroupId> {
1475        let req = SplitCompactionGroupRequest {
1476            group_id,
1477            table_ids: table_ids_to_new_group.to_vec(),
1478            partition_vnode_count,
1479        };
1480        let resp = self.inner.split_compaction_group(req).await?;
1481        Ok(resp.new_group_id)
1482    }
1483
1484    pub async fn get_tables(
1485        &self,
1486        table_ids: &[u32],
1487        include_dropped_tables: bool,
1488    ) -> Result<HashMap<u32, Table>> {
1489        let req = GetTablesRequest {
1490            table_ids: table_ids.to_vec(),
1491            include_dropped_tables,
1492        };
1493        let resp = self.inner.get_tables(req).await?;
1494        Ok(resp.tables)
1495    }
1496
1497    pub async fn list_serving_vnode_mappings(
1498        &self,
1499    ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1500        let req = GetServingVnodeMappingsRequest {};
1501        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1502        let mappings = resp
1503            .worker_slot_mappings
1504            .into_iter()
1505            .map(|p| {
1506                (
1507                    p.fragment_id,
1508                    (
1509                        resp.fragment_to_table
1510                            .get(&p.fragment_id)
1511                            .cloned()
1512                            .unwrap_or(0),
1513                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1514                    ),
1515                )
1516            })
1517            .collect();
1518        Ok(mappings)
1519    }
1520
1521    pub async fn risectl_list_compaction_status(
1522        &self,
1523    ) -> Result<(
1524        Vec<CompactStatus>,
1525        Vec<CompactTaskAssignment>,
1526        Vec<CompactTaskProgress>,
1527    )> {
1528        let req = RiseCtlListCompactionStatusRequest {};
1529        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1530        Ok((
1531            resp.compaction_statuses,
1532            resp.task_assignment,
1533            resp.task_progress,
1534        ))
1535    }
1536
1537    pub async fn get_compaction_score(
1538        &self,
1539        compaction_group_id: CompactionGroupId,
1540    ) -> Result<Vec<PickerInfo>> {
1541        let req = GetCompactionScoreRequest {
1542            compaction_group_id,
1543        };
1544        let resp = self.inner.get_compaction_score(req).await?;
1545        Ok(resp.scores)
1546    }
1547
1548    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1549        let req = RiseCtlRebuildTableStatsRequest {};
1550        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1551        Ok(())
1552    }
1553
1554    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1555        let req = ListBranchedObjectRequest {};
1556        let resp = self.inner.list_branched_object(req).await?;
1557        Ok(resp.branched_objects)
1558    }
1559
1560    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1561        let req = ListActiveWriteLimitRequest {};
1562        let resp = self.inner.list_active_write_limit(req).await?;
1563        Ok(resp.write_limits)
1564    }
1565
1566    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1567        let req = ListHummockMetaConfigRequest {};
1568        let resp = self.inner.list_hummock_meta_config(req).await?;
1569        Ok(resp.configs)
1570    }
1571
1572    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1573        let _resp = self
1574            .inner
1575            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1576            .await?;
1577
1578        Ok(())
1579    }
1580
1581    pub async fn rw_cloud_validate_source(
1582        &self,
1583        source_type: SourceType,
1584        source_config: HashMap<String, String>,
1585    ) -> Result<RwCloudValidateSourceResponse> {
1586        let req = RwCloudValidateSourceRequest {
1587            source_type: source_type.into(),
1588            source_config,
1589        };
1590        let resp = self.inner.rw_cloud_validate_source(req).await?;
1591        Ok(resp)
1592    }
1593
1594    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1595        self.inner.core.read().await.sink_coordinate_client.clone()
1596    }
1597
1598    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1599        let req = ListCompactTaskAssignmentRequest {};
1600        let resp = self.inner.list_compact_task_assignment(req).await?;
1601        Ok(resp.task_assignment)
1602    }
1603
1604    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1605        let req = ListEventLogRequest::default();
1606        let resp = self.inner.list_event_log(req).await?;
1607        Ok(resp.event_logs)
1608    }
1609
1610    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1611        let req = ListCompactTaskProgressRequest {};
1612        let resp = self.inner.list_compact_task_progress(req).await?;
1613        Ok(resp.task_progress)
1614    }
1615
1616    #[cfg(madsim)]
1617    pub fn try_add_panic_event_blocking(
1618        &self,
1619        panic_info: impl Display,
1620        timeout_millis: Option<u64>,
1621    ) {
1622    }
1623
1624    /// If `timeout_millis` is None, default is used.
1625    #[cfg(not(madsim))]
1626    pub fn try_add_panic_event_blocking(
1627        &self,
1628        panic_info: impl Display,
1629        timeout_millis: Option<u64>,
1630    ) {
1631        let event = event_log::EventWorkerNodePanic {
1632            worker_id: self.worker_id,
1633            worker_type: self.worker_type.into(),
1634            host_addr: Some(self.host_addr.to_protobuf()),
1635            panic_info: format!("{panic_info}"),
1636        };
1637        let grpc_meta_client = self.inner.clone();
1638        let _ = thread::spawn(move || {
1639            let rt = tokio::runtime::Builder::new_current_thread()
1640                .enable_all()
1641                .build()
1642                .unwrap();
1643            let req = AddEventLogRequest {
1644                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1645            };
1646            rt.block_on(async {
1647                let _ = tokio::time::timeout(
1648                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1649                    grpc_meta_client.add_event_log(req),
1650                )
1651                .await;
1652            });
1653        })
1654        .join();
1655    }
1656
1657    pub async fn add_sink_fail_evet(
1658        &self,
1659        sink_id: u32,
1660        sink_name: String,
1661        connector: String,
1662        error: String,
1663    ) -> Result<()> {
1664        let event = event_log::EventSinkFail {
1665            sink_id,
1666            sink_name,
1667            connector,
1668            error,
1669        };
1670        let req = AddEventLogRequest {
1671            event: Some(add_event_log_request::Event::SinkFail(event)),
1672        };
1673        self.inner.add_event_log(req).await?;
1674        Ok(())
1675    }
1676
1677    pub async fn add_cdc_auto_schema_change_fail_event(
1678        &self,
1679        table_id: u32,
1680        table_name: String,
1681        cdc_table_id: String,
1682        upstream_ddl: String,
1683        fail_info: String,
1684    ) -> Result<()> {
1685        let event = event_log::EventAutoSchemaChangeFail {
1686            table_id,
1687            table_name,
1688            cdc_table_id,
1689            upstream_ddl,
1690            fail_info,
1691        };
1692        let req = AddEventLogRequest {
1693            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1694        };
1695        self.inner.add_event_log(req).await?;
1696        Ok(())
1697    }
1698
1699    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1700        let req = CancelCompactTaskRequest {
1701            task_id,
1702            task_status: task_status as _,
1703        };
1704        let resp = self.inner.cancel_compact_task(req).await?;
1705        Ok(resp.ret)
1706    }
1707
1708    pub async fn get_version_by_epoch(
1709        &self,
1710        epoch: HummockEpoch,
1711        table_id: u32,
1712    ) -> Result<PbHummockVersion> {
1713        let req = GetVersionByEpochRequest { epoch, table_id };
1714        let resp = self.inner.get_version_by_epoch(req).await?;
1715        Ok(resp.version.unwrap())
1716    }
1717
1718    pub async fn get_cluster_limits(
1719        &self,
1720    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1721        let req = GetClusterLimitsRequest {};
1722        let resp = self.inner.get_cluster_limits(req).await?;
1723        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1724    }
1725
1726    pub async fn merge_compaction_group(
1727        &self,
1728        left_group_id: CompactionGroupId,
1729        right_group_id: CompactionGroupId,
1730    ) -> Result<()> {
1731        let req = MergeCompactionGroupRequest {
1732            left_group_id,
1733            right_group_id,
1734        };
1735        self.inner.merge_compaction_group(req).await?;
1736        Ok(())
1737    }
1738
1739    /// List all rate limits for sources and backfills
1740    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1741        let request = ListRateLimitsRequest {};
1742        let resp = self.inner.list_rate_limits(request).await?;
1743        Ok(resp.rate_limits)
1744    }
1745
1746    pub async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>> {
1747        let request = ListCdcProgressRequest {};
1748        let resp = self.inner.list_cdc_progress(request).await?;
1749        Ok(resp.cdc_progress)
1750    }
1751
1752    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1753        let request = ListIcebergTablesRequest {};
1754        let resp = self.inner.list_iceberg_tables(request).await?;
1755        Ok(resp.iceberg_tables)
1756    }
1757
1758    /// Get the await tree of all nodes in the cluster.
1759    pub async fn get_cluster_stack_trace(
1760        &self,
1761        actor_traces_format: ActorTracesFormat,
1762    ) -> Result<StackTraceResponse> {
1763        let request = StackTraceRequest {
1764            actor_traces_format: actor_traces_format as i32,
1765        };
1766        let resp = self.inner.stack_trace(request).await?;
1767        Ok(resp)
1768    }
1769
1770    pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1771        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1772        self.inner.set_sync_log_store_aligned(request).await?;
1773        Ok(())
1774    }
1775
1776    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1777        self.inner.refresh(request).await
1778    }
1779}
1780
1781#[async_trait]
1782impl HummockMetaClient for MetaClient {
1783    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1784        let req = UnpinVersionBeforeRequest {
1785            context_id: self.worker_id(),
1786            unpin_version_before: unpin_version_before.to_u64(),
1787        };
1788        self.inner.unpin_version_before(req).await?;
1789        Ok(())
1790    }
1791
1792    async fn get_current_version(&self) -> Result<HummockVersion> {
1793        let req = GetCurrentVersionRequest::default();
1794        Ok(HummockVersion::from_rpc_protobuf(
1795            &self
1796                .inner
1797                .get_current_version(req)
1798                .await?
1799                .current_version
1800                .unwrap(),
1801        ))
1802    }
1803
1804    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1805        let resp = self
1806            .inner
1807            .get_new_object_ids(GetNewObjectIdsRequest { number })
1808            .await?;
1809        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1810    }
1811
1812    async fn commit_epoch_with_change_log(
1813        &self,
1814        _epoch: HummockEpoch,
1815        _sync_result: SyncResult,
1816        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1817    ) -> Result<()> {
1818        panic!("Only meta service can commit_epoch in production.")
1819    }
1820
1821    async fn trigger_manual_compaction(
1822        &self,
1823        compaction_group_id: u64,
1824        table_id: u32,
1825        level: u32,
1826        sst_ids: Vec<u64>,
1827    ) -> Result<()> {
1828        // TODO: support key_range parameter
1829        let req = TriggerManualCompactionRequest {
1830            compaction_group_id,
1831            table_id,
1832            // if table_id not exist, manual_compaction will include all the sst
1833            // without check internal_table_id
1834            level,
1835            sst_ids,
1836            ..Default::default()
1837        };
1838
1839        self.inner.trigger_manual_compaction(req).await?;
1840        Ok(())
1841    }
1842
1843    async fn trigger_full_gc(
1844        &self,
1845        sst_retention_time_sec: u64,
1846        prefix: Option<String>,
1847    ) -> Result<()> {
1848        self.inner
1849            .trigger_full_gc(TriggerFullGcRequest {
1850                sst_retention_time_sec,
1851                prefix,
1852            })
1853            .await?;
1854        Ok(())
1855    }
1856
1857    async fn subscribe_compaction_event(
1858        &self,
1859    ) -> Result<(
1860        UnboundedSender<SubscribeCompactionEventRequest>,
1861        BoxStream<'static, CompactionEventItem>,
1862    )> {
1863        let (request_sender, request_receiver) =
1864            unbounded_channel::<SubscribeCompactionEventRequest>();
1865        request_sender
1866            .send(SubscribeCompactionEventRequest {
1867                event: Some(subscribe_compaction_event_request::Event::Register(
1868                    Register {
1869                        context_id: self.worker_id(),
1870                    },
1871                )),
1872                create_at: SystemTime::now()
1873                    .duration_since(SystemTime::UNIX_EPOCH)
1874                    .expect("Clock may have gone backwards")
1875                    .as_millis() as u64,
1876            })
1877            .context("Failed to subscribe compaction event")?;
1878
1879        let stream = self
1880            .inner
1881            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1882                request_receiver,
1883            )))
1884            .await?;
1885
1886        Ok((request_sender, Box::pin(stream)))
1887    }
1888
1889    async fn get_version_by_epoch(
1890        &self,
1891        epoch: HummockEpoch,
1892        table_id: u32,
1893    ) -> Result<PbHummockVersion> {
1894        self.get_version_by_epoch(epoch, table_id).await
1895    }
1896
1897    async fn subscribe_iceberg_compaction_event(
1898        &self,
1899    ) -> Result<(
1900        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1901        BoxStream<'static, IcebergCompactionEventItem>,
1902    )> {
1903        let (request_sender, request_receiver) =
1904            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1905        request_sender
1906            .send(SubscribeIcebergCompactionEventRequest {
1907                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1908                    IcebergRegister {
1909                        context_id: self.worker_id(),
1910                    },
1911                )),
1912                create_at: SystemTime::now()
1913                    .duration_since(SystemTime::UNIX_EPOCH)
1914                    .expect("Clock may have gone backwards")
1915                    .as_millis() as u64,
1916            })
1917            .context("Failed to subscribe compaction event")?;
1918
1919        let stream = self
1920            .inner
1921            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1922                request_receiver,
1923            )))
1924            .await?;
1925
1926        Ok((request_sender, Box::pin(stream)))
1927    }
1928}
1929
1930#[async_trait]
1931impl TelemetryInfoFetcher for MetaClient {
1932    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1933        let resp = self
1934            .get_telemetry_info()
1935            .await
1936            .map_err(|e| e.to_report_string())?;
1937        let tracking_id = resp.get_tracking_id().ok();
1938        Ok(tracking_id.map(|id| id.to_owned()))
1939    }
1940}
1941
1942pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1943
1944#[derive(Debug, Clone)]
1945struct GrpcMetaClientCore {
1946    cluster_client: ClusterServiceClient<Channel>,
1947    meta_member_client: MetaMemberServiceClient<Channel>,
1948    heartbeat_client: HeartbeatServiceClient<Channel>,
1949    ddl_client: DdlServiceClient<Channel>,
1950    hummock_client: HummockManagerServiceClient<Channel>,
1951    notification_client: NotificationServiceClient<Channel>,
1952    stream_client: StreamManagerServiceClient<Channel>,
1953    user_client: UserServiceClient<Channel>,
1954    scale_client: ScaleServiceClient<Channel>,
1955    backup_client: BackupServiceClient<Channel>,
1956    telemetry_client: TelemetryInfoServiceClient<Channel>,
1957    system_params_client: SystemParamsServiceClient<Channel>,
1958    session_params_client: SessionParamServiceClient<Channel>,
1959    serving_client: ServingServiceClient<Channel>,
1960    cloud_client: CloudServiceClient<Channel>,
1961    sink_coordinate_client: SinkCoordinationRpcClient,
1962    event_log_client: EventLogServiceClient<Channel>,
1963    cluster_limit_client: ClusterLimitServiceClient<Channel>,
1964    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1965    monitor_client: MonitorServiceClient<Channel>,
1966}
1967
1968impl GrpcMetaClientCore {
1969    pub(crate) fn new(channel: Channel) -> Self {
1970        let cluster_client = ClusterServiceClient::new(channel.clone());
1971        let meta_member_client = MetaMemberClient::new(channel.clone());
1972        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1973        let ddl_client =
1974            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1975        let hummock_client =
1976            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1977        let notification_client =
1978            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1979        let stream_client =
1980            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1981        let user_client = UserServiceClient::new(channel.clone());
1982        let scale_client =
1983            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1984        let backup_client = BackupServiceClient::new(channel.clone());
1985        let telemetry_client =
1986            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1987        let system_params_client = SystemParamsServiceClient::new(channel.clone());
1988        let session_params_client = SessionParamServiceClient::new(channel.clone());
1989        let serving_client = ServingServiceClient::new(channel.clone());
1990        let cloud_client = CloudServiceClient::new(channel.clone());
1991        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1992        let event_log_client = EventLogServiceClient::new(channel.clone());
1993        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1994        let hosted_iceberg_catalog_service_client =
1995            HostedIcebergCatalogServiceClient::new(channel.clone());
1996        let monitor_client = MonitorServiceClient::new(channel);
1997
1998        GrpcMetaClientCore {
1999            cluster_client,
2000            meta_member_client,
2001            heartbeat_client,
2002            ddl_client,
2003            hummock_client,
2004            notification_client,
2005            stream_client,
2006            user_client,
2007            scale_client,
2008            backup_client,
2009            telemetry_client,
2010            system_params_client,
2011            session_params_client,
2012            serving_client,
2013            cloud_client,
2014            sink_coordinate_client,
2015            event_log_client,
2016            cluster_limit_client,
2017            hosted_iceberg_catalog_service_client,
2018            monitor_client,
2019        }
2020    }
2021}
2022
2023/// Client to meta server. Cloning the instance is lightweight.
2024///
2025/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2026#[derive(Debug, Clone)]
2027struct GrpcMetaClient {
2028    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2029    core: Arc<RwLock<GrpcMetaClientCore>>,
2030}
2031
2032type MetaMemberClient = MetaMemberServiceClient<Channel>;
2033
2034struct MetaMemberGroup {
2035    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2036}
2037
2038struct MetaMemberManagement {
2039    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2040    members: Either<MetaMemberClient, MetaMemberGroup>,
2041    current_leader: http::Uri,
2042    meta_config: MetaConfig,
2043}
2044
2045impl MetaMemberManagement {
2046    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2047
2048    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2049        format!("http://{}:{}", addr.host, addr.port)
2050            .parse()
2051            .unwrap()
2052    }
2053
2054    async fn recreate_core(&self, channel: Channel) {
2055        let mut core = self.core_ref.write().await;
2056        *core = GrpcMetaClientCore::new(channel);
2057    }
2058
2059    async fn refresh_members(&mut self) -> Result<()> {
2060        let leader_addr = match self.members.as_mut() {
2061            Either::Left(client) => {
2062                let resp = client
2063                    .to_owned()
2064                    .members(MembersRequest {})
2065                    .await
2066                    .map_err(RpcError::from_meta_status)?;
2067                let resp = resp.into_inner();
2068                resp.members.into_iter().find(|member| member.is_leader)
2069            }
2070            Either::Right(member_group) => {
2071                let mut fetched_members = None;
2072
2073                for (addr, client) in &mut member_group.members {
2074                    let members: Result<_> = try {
2075                        let mut client = match client {
2076                            Some(cached_client) => cached_client.to_owned(),
2077                            None => {
2078                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2079                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2080                                    .await
2081                                    .context("failed to create client")?;
2082                                let new_client: MetaMemberClient =
2083                                    MetaMemberServiceClient::new(channel);
2084                                *client = Some(new_client.clone());
2085
2086                                new_client
2087                            }
2088                        };
2089
2090                        let resp = client
2091                            .members(MembersRequest {})
2092                            .await
2093                            .context("failed to fetch members")?;
2094
2095                        resp.into_inner().members
2096                    };
2097
2098                    let fetched = members.is_ok();
2099                    fetched_members = Some(members);
2100                    if fetched {
2101                        break;
2102                    }
2103                }
2104
2105                let members = fetched_members
2106                    .context("no member available in the list")?
2107                    .context("could not refresh members")?;
2108
2109                // find new leader
2110                let mut leader = None;
2111                for member in members {
2112                    if member.is_leader {
2113                        leader = Some(member.clone());
2114                    }
2115
2116                    let addr = Self::host_address_to_uri(member.address.unwrap());
2117                    // We don't clean any expired addrs here to deal with some extreme situations.
2118                    if !member_group.members.contains(&addr) {
2119                        tracing::info!("new meta member joined: {}", addr);
2120                        member_group.members.put(addr, None);
2121                    }
2122                }
2123
2124                leader
2125            }
2126        };
2127
2128        if let Some(leader) = leader_addr {
2129            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2130
2131            if discovered_leader != self.current_leader {
2132                tracing::info!("new meta leader {} discovered", discovered_leader);
2133
2134                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2135                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2136                    false,
2137                );
2138
2139                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2140                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2141                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2142                })
2143                .await?;
2144
2145                self.recreate_core(channel).await;
2146                self.current_leader = discovered_leader;
2147            }
2148        }
2149
2150        Ok(())
2151    }
2152}
2153
2154impl GrpcMetaClient {
2155    // See `Endpoint::http2_keep_alive_interval`
2156    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2157    // See `Endpoint::keep_alive_timeout`
2158    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2159    // Retry base interval in ms for connecting to meta server.
2160    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2161    // Max retry times for connecting to meta server.
2162    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2163
2164    fn start_meta_member_monitor(
2165        &self,
2166        init_leader_addr: http::Uri,
2167        members: Either<MetaMemberClient, MetaMemberGroup>,
2168        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2169        meta_config: MetaConfig,
2170    ) -> Result<()> {
2171        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2172        let current_leader = init_leader_addr;
2173
2174        let enable_period_tick = matches!(members, Either::Right(_));
2175
2176        let member_management = MetaMemberManagement {
2177            core_ref,
2178            members,
2179            current_leader,
2180            meta_config,
2181        };
2182
2183        let mut force_refresh_receiver = force_refresh_receiver;
2184
2185        tokio::spawn(async move {
2186            let mut member_management = member_management;
2187            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2188
2189            loop {
2190                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2191                    tokio::select! {
2192                        _ = ticker.tick() => None,
2193                        result_sender = force_refresh_receiver.recv() => {
2194                            if result_sender.is_none() {
2195                                break;
2196                            }
2197
2198                            result_sender
2199                        },
2200                    }
2201                } else {
2202                    let result_sender = force_refresh_receiver.recv().await;
2203
2204                    if result_sender.is_none() {
2205                        break;
2206                    }
2207
2208                    result_sender
2209                };
2210
2211                let tick_result = member_management.refresh_members().await;
2212                if let Err(e) = tick_result.as_ref() {
2213                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2214                }
2215
2216                if let Some(sender) = event {
2217                    // ignore resp
2218                    let _resp = sender.send(tick_result);
2219                }
2220            }
2221        });
2222
2223        Ok(())
2224    }
2225
2226    async fn force_refresh_leader(&self) -> Result<()> {
2227        let (sender, receiver) = oneshot::channel();
2228
2229        self.member_monitor_event_sender
2230            .send(sender)
2231            .await
2232            .map_err(|e| anyhow!(e))?;
2233
2234        receiver.await.map_err(|e| anyhow!(e))?
2235    }
2236
2237    /// Connect to the meta server from `addrs`.
2238    pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2239        let (channel, addr) = match strategy {
2240            MetaAddressStrategy::LoadBalance(addr) => {
2241                Self::try_build_rpc_channel(vec![addr.clone()]).await
2242            }
2243            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2244        }?;
2245        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2246        let client = GrpcMetaClient {
2247            member_monitor_event_sender: force_refresh_sender,
2248            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2249        };
2250
2251        let meta_member_client = client.core.read().await.meta_member_client.clone();
2252        let members = match strategy {
2253            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2254            MetaAddressStrategy::List(addrs) => {
2255                let mut members = LruCache::new(20);
2256                for addr in addrs {
2257                    members.put(addr.clone(), None);
2258                }
2259                members.put(addr.clone(), Some(meta_member_client));
2260
2261                Either::Right(MetaMemberGroup { members })
2262            }
2263        };
2264
2265        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2266
2267        client.force_refresh_leader().await?;
2268
2269        Ok(client)
2270    }
2271
2272    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2273        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2274    }
2275
2276    pub(crate) async fn try_build_rpc_channel(
2277        addrs: impl IntoIterator<Item = http::Uri>,
2278    ) -> Result<(Channel, http::Uri)> {
2279        let endpoints: Vec<_> = addrs
2280            .into_iter()
2281            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2282            .collect();
2283
2284        let mut last_error = None;
2285
2286        for (endpoint, addr) in endpoints {
2287            match Self::connect_to_endpoint(endpoint).await {
2288                Ok(channel) => {
2289                    tracing::info!("Connect to meta server {} successfully", addr);
2290                    return Ok((channel, addr));
2291                }
2292                Err(e) => {
2293                    tracing::warn!(
2294                        error = %e.as_report(),
2295                        "Failed to connect to meta server {}, trying again",
2296                        addr,
2297                    );
2298                    last_error = Some(e);
2299                }
2300            }
2301        }
2302
2303        if let Some(last_error) = last_error {
2304            Err(anyhow::anyhow!(last_error)
2305                .context("failed to connect to all meta servers")
2306                .into())
2307        } else {
2308            bail!("no meta server address provided")
2309        }
2310    }
2311
2312    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2313        let channel = endpoint
2314            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2315            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2316            .connect_timeout(Duration::from_secs(5))
2317            .monitored_connect("grpc-meta-client", Default::default())
2318            .await?
2319            .wrapped();
2320
2321        Ok(channel)
2322    }
2323
2324    pub(crate) fn retry_strategy_to_bound(
2325        high_bound: Duration,
2326        exceed: bool,
2327    ) -> impl Iterator<Item = Duration> {
2328        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2329            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2330            .map(jitter);
2331
2332        let mut sum = Duration::default();
2333
2334        iter.take_while(move |duration| {
2335            sum += *duration;
2336
2337            if exceed {
2338                sum < high_bound + *duration
2339            } else {
2340                sum < high_bound
2341            }
2342        })
2343    }
2344}
2345
2346macro_rules! for_all_meta_rpc {
2347    ($macro:ident) => {
2348        $macro! {
2349             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2350            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2351            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2352            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2353            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2354            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2355            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2356            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2357            ,{ stream_client, flush, FlushRequest, FlushResponse }
2358            ,{ stream_client, pause, PauseRequest, PauseResponse }
2359            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2360            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2361            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2362            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2363            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2364            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2365            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2366            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2367            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2368            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2369            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2370            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2371            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2372            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2373            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2374            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2375            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2376            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2377            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2378            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2379            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2380            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2381            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2382            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2383            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2384            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2385            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2386            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2387            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2388            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2389            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2390            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2391            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2392            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2393            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2394            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2395            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2396            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2397            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2398            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2399            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2400            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2401            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2402            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2403            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2404            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2405            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2406            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2407            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2408            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2409            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2410            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2411            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2412            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2413            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2414            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2415            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2416            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2417            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2418            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2419            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2420            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2421            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2422            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2423            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2424            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2425            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2426            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2427            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2428            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2429            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2430            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2431            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2432            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2433            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2434            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2435            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2436            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2437            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2438            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2439            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2440            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2441            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2442            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2443            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2444            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2445            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2446            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2447            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2448            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2449            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2450            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2451            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2452            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2453            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2454            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2455            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2456            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2457            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2458            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2459            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2460            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2461            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2462            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2463            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2464            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2465            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2466            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2467            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2468            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2469            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2470            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2471            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2472            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2473            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2474            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2475            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2476        }
2477    };
2478}
2479
2480impl GrpcMetaClient {
2481    async fn refresh_client_if_needed(&self, code: Code) {
2482        if matches!(
2483            code,
2484            Code::Unknown | Code::Unimplemented | Code::Unavailable
2485        ) {
2486            tracing::debug!("matching tonic code {}", code);
2487            let (result_sender, result_receiver) = oneshot::channel();
2488            if self
2489                .member_monitor_event_sender
2490                .try_send(result_sender)
2491                .is_ok()
2492            {
2493                if let Ok(Err(e)) = result_receiver.await {
2494                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2495                }
2496            } else {
2497                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2498            }
2499        }
2500    }
2501}
2502
2503impl GrpcMetaClient {
2504    for_all_meta_rpc! { meta_rpc_client_method_impl }
2505}