risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55    PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77    subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::{
80    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
81};
82use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
83use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
84use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
85use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
86use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
87use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
88use risingwave_pb::meta::list_actor_states_response::ActorState;
89use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
90use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
91use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
92use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
93use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
94use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
95use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
96use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
97use risingwave_pb::meta::serving_service_client::ServingServiceClient;
98use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
99use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
100use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
101use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
102use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
103use risingwave_pb::meta::{FragmentDistribution, *};
104use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
105use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
106use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
107use risingwave_pb::secret::PbSecretRef;
108use risingwave_pb::stream_plan::StreamFragmentGraph;
109use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
110use risingwave_pb::user::update_user_request::UpdateField;
111use risingwave_pb::user::user_service_client::UserServiceClient;
112use risingwave_pb::user::*;
113use thiserror_ext::AsReport;
114use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
115use tokio::sync::oneshot::Sender;
116use tokio::sync::{RwLock, mpsc, oneshot};
117use tokio::task::JoinHandle;
118use tokio::time::{self};
119use tokio_retry::strategy::{ExponentialBackoff, jitter};
120use tokio_stream::wrappers::UnboundedReceiverStream;
121use tonic::transport::Endpoint;
122use tonic::{Code, Request, Streaming};
123
124use crate::channel::{Channel, WrappedChannelExt};
125use crate::error::{Result, RpcError};
126use crate::hummock_meta_client::{
127    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
128    IcebergCompactionEventItem,
129};
130use crate::meta_rpc_client_method_impl;
131
132type ConnectionId = u32;
133type DatabaseId = u32;
134type SchemaId = u32;
135
136/// Client to meta server. Cloning the instance is lightweight.
137#[derive(Clone, Debug)]
138pub struct MetaClient {
139    worker_id: u32,
140    worker_type: WorkerType,
141    host_addr: HostAddr,
142    inner: GrpcMetaClient,
143    meta_config: Arc<MetaConfig>,
144    cluster_id: String,
145    shutting_down: Arc<AtomicBool>,
146}
147
148impl MetaClient {
149    pub fn worker_id(&self) -> u32 {
150        self.worker_id
151    }
152
153    pub fn host_addr(&self) -> &HostAddr {
154        &self.host_addr
155    }
156
157    pub fn worker_type(&self) -> WorkerType {
158        self.worker_type
159    }
160
161    pub fn cluster_id(&self) -> &str {
162        &self.cluster_id
163    }
164
165    /// Subscribe to notification from meta.
166    pub async fn subscribe(
167        &self,
168        subscribe_type: SubscribeType,
169    ) -> Result<Streaming<SubscribeResponse>> {
170        let request = SubscribeRequest {
171            subscribe_type: subscribe_type as i32,
172            host: Some(self.host_addr.to_protobuf()),
173            worker_id: self.worker_id(),
174        };
175
176        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
177            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
178            true,
179        );
180
181        tokio_retry::Retry::spawn(retry_strategy, || async {
182            let request = request.clone();
183            self.inner.subscribe(request).await
184        })
185        .await
186    }
187
188    pub async fn create_connection(
189        &self,
190        connection_name: String,
191        database_id: u32,
192        schema_id: u32,
193        owner_id: u32,
194        req: create_connection_request::Payload,
195    ) -> Result<WaitVersion> {
196        let request = CreateConnectionRequest {
197            name: connection_name,
198            database_id,
199            schema_id,
200            owner_id,
201            payload: Some(req),
202        };
203        let resp = self.inner.create_connection(request).await?;
204        Ok(resp
205            .version
206            .ok_or_else(|| anyhow!("wait version not set"))?)
207    }
208
209    pub async fn create_secret(
210        &self,
211        secret_name: String,
212        database_id: u32,
213        schema_id: u32,
214        owner_id: u32,
215        value: Vec<u8>,
216    ) -> Result<WaitVersion> {
217        let request = CreateSecretRequest {
218            name: secret_name,
219            database_id,
220            schema_id,
221            owner_id,
222            value,
223        };
224        let resp = self.inner.create_secret(request).await?;
225        Ok(resp
226            .version
227            .ok_or_else(|| anyhow!("wait version not set"))?)
228    }
229
230    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
231        let request = ListConnectionsRequest {};
232        let resp = self.inner.list_connections(request).await?;
233        Ok(resp.connections)
234    }
235
236    pub async fn drop_connection(
237        &self,
238        connection_id: ConnectionId,
239        cascade: bool,
240    ) -> Result<WaitVersion> {
241        let request = DropConnectionRequest {
242            connection_id,
243            cascade,
244        };
245        let resp = self.inner.drop_connection(request).await?;
246        Ok(resp
247            .version
248            .ok_or_else(|| anyhow!("wait version not set"))?)
249    }
250
251    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
252        let request = DropSecretRequest {
253            secret_id: secret_id.into(),
254        };
255        let resp = self.inner.drop_secret(request).await?;
256        Ok(resp
257            .version
258            .ok_or_else(|| anyhow!("wait version not set"))?)
259    }
260
261    /// Register the current node to the cluster and set the corresponding worker id.
262    ///
263    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
264    pub async fn register_new(
265        addr_strategy: MetaAddressStrategy,
266        worker_type: WorkerType,
267        addr: &HostAddr,
268        property: Property,
269        meta_config: Arc<MetaConfig>,
270    ) -> (Self, SystemParamsReader) {
271        let ret =
272            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
273
274        match ret {
275            Ok(ret) => ret,
276            Err(err) => {
277                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
278                std::process::exit(1);
279            }
280        }
281    }
282
283    async fn register_new_inner(
284        addr_strategy: MetaAddressStrategy,
285        worker_type: WorkerType,
286        addr: &HostAddr,
287        property: Property,
288        meta_config: Arc<MetaConfig>,
289    ) -> Result<(Self, SystemParamsReader)> {
290        tracing::info!("register meta client using strategy: {}", addr_strategy);
291
292        // Retry until reaching `max_heartbeat_interval_secs`
293        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
294            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
295            true,
296        );
297
298        if property.is_unschedulable {
299            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
300        }
301        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
302            retry_strategy,
303            || async {
304                let grpc_meta_client =
305                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
306
307                let add_worker_resp = grpc_meta_client
308                    .add_worker_node(AddWorkerNodeRequest {
309                        worker_type: worker_type as i32,
310                        host: Some(addr.to_protobuf()),
311                        property: Some(property.clone()),
312                        resource: Some(risingwave_pb::common::worker_node::Resource {
313                            rw_version: RW_VERSION.to_owned(),
314                            total_memory_bytes: system_memory_available_bytes() as _,
315                            total_cpu_cores: total_cpu_available() as _,
316                        }),
317                    })
318                    .await
319                    .context("failed to add worker node")?;
320
321                let system_params_resp = grpc_meta_client
322                    .get_system_params(GetSystemParamsRequest {})
323                    .await
324                    .context("failed to get initial system params")?;
325
326                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
327            },
328            // Only retry if there's any transient connection issue.
329            // If the error is from our implementation or business, do not retry it.
330            |e: &RpcError| !e.is_from_tonic_server_impl(),
331        )
332        .await;
333
334        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
335        let worker_id = add_worker_resp
336            .node_id
337            .expect("AddWorkerNodeResponse::node_id is empty");
338
339        let meta_client = Self {
340            worker_id,
341            worker_type,
342            host_addr: addr.clone(),
343            inner: grpc_meta_client,
344            meta_config: meta_config.clone(),
345            cluster_id: add_worker_resp.cluster_id,
346            shutting_down: Arc::new(false.into()),
347        };
348
349        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
350        REPORT_PANIC.call_once(|| {
351            let meta_client_clone = meta_client.clone();
352            std::panic::update_hook(move |default_hook, info| {
353                // Try to report panic event to meta node.
354                meta_client_clone.try_add_panic_event_blocking(info, None);
355                default_hook(info);
356            });
357        });
358
359        Ok((meta_client, system_params_resp.params.unwrap().into()))
360    }
361
362    /// Activate the current node in cluster to confirm it's ready to serve.
363    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
364        let request = ActivateWorkerNodeRequest {
365            host: Some(addr.to_protobuf()),
366            node_id: self.worker_id,
367        };
368        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
369            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
370            true,
371        );
372        tokio_retry::Retry::spawn(retry_strategy, || async {
373            let request = request.clone();
374            self.inner.activate_worker_node(request).await
375        })
376        .await?;
377
378        Ok(())
379    }
380
381    /// Send heartbeat signal to meta service.
382    pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
383        let request = HeartbeatRequest { node_id };
384        let resp = self.inner.heartbeat(request).await?;
385        if let Some(status) = resp.status
386            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
387        {
388            // Ignore the error if we're already shutting down.
389            // Otherwise, exit the process.
390            if !self.shutting_down.load(Relaxed) {
391                tracing::error!(message = status.message, "worker expired");
392                std::process::exit(1);
393            }
394        }
395        Ok(())
396    }
397
398    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
399        let request = CreateDatabaseRequest { db: Some(db) };
400        let resp = self.inner.create_database(request).await?;
401        // TODO: handle error in `resp.status` here
402        Ok(resp
403            .version
404            .ok_or_else(|| anyhow!("wait version not set"))?)
405    }
406
407    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
408        let request = CreateSchemaRequest {
409            schema: Some(schema),
410        };
411        let resp = self.inner.create_schema(request).await?;
412        // TODO: handle error in `resp.status` here
413        Ok(resp
414            .version
415            .ok_or_else(|| anyhow!("wait version not set"))?)
416    }
417
418    pub async fn create_materialized_view(
419        &self,
420        table: PbTable,
421        graph: StreamFragmentGraph,
422        dependencies: HashSet<ObjectId>,
423        specific_resource_group: Option<String>,
424        if_not_exists: bool,
425    ) -> Result<WaitVersion> {
426        let request = CreateMaterializedViewRequest {
427            materialized_view: Some(table),
428            fragment_graph: Some(graph),
429            backfill: PbBackfillType::Regular as _,
430            dependencies: dependencies.into_iter().collect(),
431            specific_resource_group,
432            if_not_exists,
433        };
434        let resp = self.inner.create_materialized_view(request).await?;
435        // TODO: handle error in `resp.status` here
436        Ok(resp
437            .version
438            .ok_or_else(|| anyhow!("wait version not set"))?)
439    }
440
441    pub async fn drop_materialized_view(
442        &self,
443        table_id: TableId,
444        cascade: bool,
445    ) -> Result<WaitVersion> {
446        let request = DropMaterializedViewRequest {
447            table_id: table_id.table_id(),
448            cascade,
449        };
450
451        let resp = self.inner.drop_materialized_view(request).await?;
452        Ok(resp
453            .version
454            .ok_or_else(|| anyhow!("wait version not set"))?)
455    }
456
457    pub async fn create_source(
458        &self,
459        source: PbSource,
460        graph: Option<StreamFragmentGraph>,
461        if_not_exists: bool,
462    ) -> Result<WaitVersion> {
463        let request = CreateSourceRequest {
464            source: Some(source),
465            fragment_graph: graph,
466            if_not_exists,
467        };
468
469        let resp = self.inner.create_source(request).await?;
470        Ok(resp
471            .version
472            .ok_or_else(|| anyhow!("wait version not set"))?)
473    }
474
475    pub async fn create_sink(
476        &self,
477        sink: PbSink,
478        graph: StreamFragmentGraph,
479        dependencies: HashSet<ObjectId>,
480        if_not_exists: bool,
481    ) -> Result<WaitVersion> {
482        let request = CreateSinkRequest {
483            sink: Some(sink),
484            fragment_graph: Some(graph),
485            dependencies: dependencies.into_iter().collect(),
486            if_not_exists,
487        };
488
489        let resp = self.inner.create_sink(request).await?;
490        Ok(resp
491            .version
492            .ok_or_else(|| anyhow!("wait version not set"))?)
493    }
494
495    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
496        let request = CreateSubscriptionRequest {
497            subscription: Some(subscription),
498        };
499
500        let resp = self.inner.create_subscription(request).await?;
501        Ok(resp
502            .version
503            .ok_or_else(|| anyhow!("wait version not set"))?)
504    }
505
506    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
507        let request = CreateFunctionRequest {
508            function: Some(function),
509        };
510        let resp = self.inner.create_function(request).await?;
511        Ok(resp
512            .version
513            .ok_or_else(|| anyhow!("wait version not set"))?)
514    }
515
516    pub async fn create_table(
517        &self,
518        source: Option<PbSource>,
519        table: PbTable,
520        graph: StreamFragmentGraph,
521        job_type: PbTableJobType,
522        if_not_exists: bool,
523        dependencies: HashSet<ObjectId>,
524    ) -> Result<WaitVersion> {
525        let request = CreateTableRequest {
526            materialized_view: Some(table),
527            fragment_graph: Some(graph),
528            source,
529            job_type: job_type as _,
530            if_not_exists,
531            dependencies: dependencies.into_iter().collect(),
532        };
533        let resp = self.inner.create_table(request).await?;
534        // TODO: handle error in `resp.status` here
535        Ok(resp
536            .version
537            .ok_or_else(|| anyhow!("wait version not set"))?)
538    }
539
540    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
541        let request = CommentOnRequest {
542            comment: Some(comment),
543        };
544        let resp = self.inner.comment_on(request).await?;
545        Ok(resp
546            .version
547            .ok_or_else(|| anyhow!("wait version not set"))?)
548    }
549
550    pub async fn alter_name(
551        &self,
552        object: alter_name_request::Object,
553        name: &str,
554    ) -> Result<WaitVersion> {
555        let request = AlterNameRequest {
556            object: Some(object),
557            new_name: name.to_owned(),
558        };
559        let resp = self.inner.alter_name(request).await?;
560        Ok(resp
561            .version
562            .ok_or_else(|| anyhow!("wait version not set"))?)
563    }
564
565    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
566        let request = AlterOwnerRequest {
567            object: Some(object),
568            owner_id,
569        };
570        let resp = self.inner.alter_owner(request).await?;
571        Ok(resp
572            .version
573            .ok_or_else(|| anyhow!("wait version not set"))?)
574    }
575
576    pub async fn alter_database_param(
577        &self,
578        database_id: DatabaseId,
579        param: AlterDatabaseParam,
580    ) -> Result<WaitVersion> {
581        let request = match param {
582            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
583                let barrier_interval_ms = OptionalUint32 {
584                    value: barrier_interval_ms,
585                };
586                AlterDatabaseParamRequest {
587                    database_id,
588                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
589                        barrier_interval_ms,
590                    )),
591                }
592            }
593            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
594                let checkpoint_frequency = OptionalUint64 {
595                    value: checkpoint_frequency,
596                };
597                AlterDatabaseParamRequest {
598                    database_id,
599                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
600                        checkpoint_frequency,
601                    )),
602                }
603            }
604        };
605        let resp = self.inner.alter_database_param(request).await?;
606        Ok(resp
607            .version
608            .ok_or_else(|| anyhow!("wait version not set"))?)
609    }
610
611    pub async fn alter_set_schema(
612        &self,
613        object: alter_set_schema_request::Object,
614        new_schema_id: u32,
615    ) -> Result<WaitVersion> {
616        let request = AlterSetSchemaRequest {
617            new_schema_id,
618            object: Some(object),
619        };
620        let resp = self.inner.alter_set_schema(request).await?;
621        Ok(resp
622            .version
623            .ok_or_else(|| anyhow!("wait version not set"))?)
624    }
625
626    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
627        let request = AlterSourceRequest {
628            source: Some(source),
629        };
630        let resp = self.inner.alter_source(request).await?;
631        Ok(resp
632            .version
633            .ok_or_else(|| anyhow!("wait version not set"))?)
634    }
635
636    pub async fn alter_parallelism(
637        &self,
638        table_id: u32,
639        parallelism: PbTableParallelism,
640        deferred: bool,
641    ) -> Result<()> {
642        let request = AlterParallelismRequest {
643            table_id,
644            parallelism: Some(parallelism),
645            deferred,
646        };
647
648        self.inner.alter_parallelism(request).await?;
649        Ok(())
650    }
651
652    pub async fn alter_cdc_table_backfill_parallelism(
653        &self,
654        table_id: u32,
655        parallelism: PbTableParallelism,
656    ) -> Result<()> {
657        let request = AlterCdcTableBackfillParallelismRequest {
658            table_id,
659            parallelism: Some(parallelism),
660        };
661        self.inner
662            .alter_cdc_table_backfill_parallelism(request)
663            .await?;
664        Ok(())
665    }
666
667    pub async fn alter_resource_group(
668        &self,
669        table_id: u32,
670        resource_group: Option<String>,
671        deferred: bool,
672    ) -> Result<()> {
673        let request = AlterResourceGroupRequest {
674            table_id,
675            resource_group,
676            deferred,
677        };
678
679        self.inner.alter_resource_group(request).await?;
680        Ok(())
681    }
682
683    pub async fn alter_swap_rename(
684        &self,
685        object: alter_swap_rename_request::Object,
686    ) -> Result<WaitVersion> {
687        let request = AlterSwapRenameRequest {
688            object: Some(object),
689        };
690        let resp = self.inner.alter_swap_rename(request).await?;
691        Ok(resp
692            .version
693            .ok_or_else(|| anyhow!("wait version not set"))?)
694    }
695
696    pub async fn alter_secret(
697        &self,
698        secret_id: u32,
699        secret_name: String,
700        database_id: u32,
701        schema_id: u32,
702        owner_id: u32,
703        value: Vec<u8>,
704    ) -> Result<WaitVersion> {
705        let request = AlterSecretRequest {
706            secret_id,
707            name: secret_name,
708            database_id,
709            schema_id,
710            owner_id,
711            value,
712        };
713        let resp = self.inner.alter_secret(request).await?;
714        Ok(resp
715            .version
716            .ok_or_else(|| anyhow!("wait version not set"))?)
717    }
718
719    pub async fn replace_job(
720        &self,
721        graph: StreamFragmentGraph,
722        replace_job: ReplaceJob,
723    ) -> Result<WaitVersion> {
724        let request = ReplaceJobPlanRequest {
725            plan: Some(ReplaceJobPlan {
726                fragment_graph: Some(graph),
727                replace_job: Some(replace_job),
728            }),
729        };
730        let resp = self.inner.replace_job_plan(request).await?;
731        // TODO: handle error in `resp.status` here
732        Ok(resp
733            .version
734            .ok_or_else(|| anyhow!("wait version not set"))?)
735    }
736
737    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
738        let request = AutoSchemaChangeRequest {
739            schema_change: Some(schema_change),
740        };
741        let _ = self.inner.auto_schema_change(request).await?;
742        Ok(())
743    }
744
745    pub async fn create_view(
746        &self,
747        view: PbView,
748        dependencies: HashSet<ObjectId>,
749    ) -> Result<WaitVersion> {
750        let request = CreateViewRequest {
751            view: Some(view),
752            dependencies: dependencies.into_iter().collect(),
753        };
754        let resp = self.inner.create_view(request).await?;
755        // TODO: handle error in `resp.status` here
756        Ok(resp
757            .version
758            .ok_or_else(|| anyhow!("wait version not set"))?)
759    }
760
761    pub async fn create_index(
762        &self,
763        index: PbIndex,
764        table: PbTable,
765        graph: StreamFragmentGraph,
766        if_not_exists: bool,
767    ) -> Result<WaitVersion> {
768        let request = CreateIndexRequest {
769            index: Some(index),
770            index_table: Some(table),
771            fragment_graph: Some(graph),
772            if_not_exists,
773        };
774        let resp = self.inner.create_index(request).await?;
775        // TODO: handle error in `resp.status` here
776        Ok(resp
777            .version
778            .ok_or_else(|| anyhow!("wait version not set"))?)
779    }
780
781    pub async fn drop_table(
782        &self,
783        source_id: Option<u32>,
784        table_id: TableId,
785        cascade: bool,
786    ) -> Result<WaitVersion> {
787        let request = DropTableRequest {
788            source_id: source_id.map(SourceId::Id),
789            table_id: table_id.table_id(),
790            cascade,
791        };
792
793        let resp = self.inner.drop_table(request).await?;
794        Ok(resp
795            .version
796            .ok_or_else(|| anyhow!("wait version not set"))?)
797    }
798
799    pub async fn compact_iceberg_table(&self, sink_id: u32) -> Result<u64> {
800        let request = CompactIcebergTableRequest { sink_id };
801        let resp = self.inner.compact_iceberg_table(request).await?;
802        Ok(resp.task_id)
803    }
804
805    pub async fn expire_iceberg_table_snapshots(&self, sink_id: u32) -> Result<()> {
806        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
807        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
808        Ok(())
809    }
810
811    pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
812        let request = DropViewRequest { view_id, cascade };
813        let resp = self.inner.drop_view(request).await?;
814        Ok(resp
815            .version
816            .ok_or_else(|| anyhow!("wait version not set"))?)
817    }
818
819    pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
820        let request = DropSourceRequest { source_id, cascade };
821        let resp = self.inner.drop_source(request).await?;
822        Ok(resp
823            .version
824            .ok_or_else(|| anyhow!("wait version not set"))?)
825    }
826
827    pub async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<WaitVersion> {
828        let request = DropSinkRequest { sink_id, cascade };
829        let resp = self.inner.drop_sink(request).await?;
830        Ok(resp
831            .version
832            .ok_or_else(|| anyhow!("wait version not set"))?)
833    }
834
835    pub async fn drop_subscription(
836        &self,
837        subscription_id: u32,
838        cascade: bool,
839    ) -> Result<WaitVersion> {
840        let request = DropSubscriptionRequest {
841            subscription_id,
842            cascade,
843        };
844        let resp = self.inner.drop_subscription(request).await?;
845        Ok(resp
846            .version
847            .ok_or_else(|| anyhow!("wait version not set"))?)
848    }
849
850    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
851        let request = DropIndexRequest {
852            index_id: index_id.index_id,
853            cascade,
854        };
855        let resp = self.inner.drop_index(request).await?;
856        Ok(resp
857            .version
858            .ok_or_else(|| anyhow!("wait version not set"))?)
859    }
860
861    pub async fn drop_function(
862        &self,
863        function_id: FunctionId,
864        cascade: bool,
865    ) -> Result<WaitVersion> {
866        let request = DropFunctionRequest {
867            function_id: function_id.0,
868            cascade,
869        };
870        let resp = self.inner.drop_function(request).await?;
871        Ok(resp
872            .version
873            .ok_or_else(|| anyhow!("wait version not set"))?)
874    }
875
876    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
877        let request = DropDatabaseRequest { database_id };
878        let resp = self.inner.drop_database(request).await?;
879        Ok(resp
880            .version
881            .ok_or_else(|| anyhow!("wait version not set"))?)
882    }
883
884    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
885        let request = DropSchemaRequest { schema_id, cascade };
886        let resp = self.inner.drop_schema(request).await?;
887        Ok(resp
888            .version
889            .ok_or_else(|| anyhow!("wait version not set"))?)
890    }
891
892    // TODO: using UserInfoVersion instead as return type.
893    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
894        let request = CreateUserRequest { user: Some(user) };
895        let resp = self.inner.create_user(request).await?;
896        Ok(resp.version)
897    }
898
899    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
900        let request = DropUserRequest { user_id };
901        let resp = self.inner.drop_user(request).await?;
902        Ok(resp.version)
903    }
904
905    pub async fn update_user(
906        &self,
907        user: UserInfo,
908        update_fields: Vec<UpdateField>,
909    ) -> Result<u64> {
910        let request = UpdateUserRequest {
911            user: Some(user),
912            update_fields: update_fields
913                .into_iter()
914                .map(|field| field as i32)
915                .collect::<Vec<_>>(),
916        };
917        let resp = self.inner.update_user(request).await?;
918        Ok(resp.version)
919    }
920
921    pub async fn grant_privilege(
922        &self,
923        user_ids: Vec<u32>,
924        privileges: Vec<GrantPrivilege>,
925        with_grant_option: bool,
926        granted_by: u32,
927    ) -> Result<u64> {
928        let request = GrantPrivilegeRequest {
929            user_ids,
930            privileges,
931            with_grant_option,
932            granted_by,
933        };
934        let resp = self.inner.grant_privilege(request).await?;
935        Ok(resp.version)
936    }
937
938    pub async fn revoke_privilege(
939        &self,
940        user_ids: Vec<u32>,
941        privileges: Vec<GrantPrivilege>,
942        granted_by: u32,
943        revoke_by: u32,
944        revoke_grant_option: bool,
945        cascade: bool,
946    ) -> Result<u64> {
947        let request = RevokePrivilegeRequest {
948            user_ids,
949            privileges,
950            granted_by,
951            revoke_by,
952            revoke_grant_option,
953            cascade,
954        };
955        let resp = self.inner.revoke_privilege(request).await?;
956        Ok(resp.version)
957    }
958
959    pub async fn alter_default_privilege(
960        &self,
961        user_ids: Vec<u32>,
962        database_id: DatabaseId,
963        schema_ids: Vec<SchemaId>,
964        operation: AlterDefaultPrivilegeOperation,
965        granted_by: u32,
966    ) -> Result<()> {
967        let request = AlterDefaultPrivilegeRequest {
968            user_ids,
969            database_id,
970            schema_ids,
971            operation: Some(operation),
972            granted_by,
973        };
974        self.inner.alter_default_privilege(request).await?;
975        Ok(())
976    }
977
978    /// Unregister the current node from the cluster.
979    pub async fn unregister(&self) -> Result<()> {
980        let request = DeleteWorkerNodeRequest {
981            host: Some(self.host_addr.to_protobuf()),
982        };
983        self.inner.delete_worker_node(request).await?;
984        self.shutting_down.store(true, Relaxed);
985        Ok(())
986    }
987
988    /// Try to unregister the current worker from the cluster with best effort. Log the result.
989    pub async fn try_unregister(&self) {
990        match self.unregister().await {
991            Ok(_) => {
992                tracing::info!(
993                    worker_id = self.worker_id(),
994                    "successfully unregistered from meta service",
995                )
996            }
997            Err(e) => {
998                tracing::warn!(
999                    error = %e.as_report(),
1000                    worker_id = self.worker_id(),
1001                    "failed to unregister from meta service",
1002                );
1003            }
1004        }
1005    }
1006
1007    pub async fn update_schedulability(
1008        &self,
1009        worker_ids: &[u32],
1010        schedulability: Schedulability,
1011    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1012        let request = UpdateWorkerNodeSchedulabilityRequest {
1013            worker_ids: worker_ids.to_vec(),
1014            schedulability: schedulability.into(),
1015        };
1016        let resp = self
1017            .inner
1018            .update_worker_node_schedulability(request)
1019            .await?;
1020        Ok(resp)
1021    }
1022
1023    pub async fn list_worker_nodes(
1024        &self,
1025        worker_type: Option<WorkerType>,
1026    ) -> Result<Vec<WorkerNode>> {
1027        let request = ListAllNodesRequest {
1028            worker_type: worker_type.map(Into::into),
1029            include_starting_nodes: true,
1030        };
1031        let resp = self.inner.list_all_nodes(request).await?;
1032        Ok(resp.nodes)
1033    }
1034
1035    /// Starts a heartbeat worker.
1036    pub fn start_heartbeat_loop(
1037        meta_client: MetaClient,
1038        min_interval: Duration,
1039    ) -> (JoinHandle<()>, Sender<()>) {
1040        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1041        let join_handle = tokio::spawn(async move {
1042            let mut min_interval_ticker = tokio::time::interval(min_interval);
1043            loop {
1044                tokio::select! {
1045                    biased;
1046                    // Shutdown
1047                    _ = &mut shutdown_rx => {
1048                        tracing::info!("Heartbeat loop is stopped");
1049                        return;
1050                    }
1051                    // Wait for interval
1052                    _ = min_interval_ticker.tick() => {},
1053                }
1054                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1055                match tokio::time::timeout(
1056                    // TODO: decide better min_interval for timeout
1057                    min_interval * 3,
1058                    meta_client.send_heartbeat(meta_client.worker_id()),
1059                )
1060                .await
1061                {
1062                    Ok(Ok(_)) => {}
1063                    Ok(Err(err)) => {
1064                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1065                    }
1066                    Err(_) => {
1067                        tracing::warn!("Failed to send_heartbeat: timeout");
1068                    }
1069                }
1070            }
1071        });
1072        (join_handle, shutdown_tx)
1073    }
1074
1075    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1076        let request = RisectlListStateTablesRequest {};
1077        let resp = self.inner.risectl_list_state_tables(request).await?;
1078        Ok(resp.tables)
1079    }
1080
1081    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1082        let request = FlushRequest { database_id };
1083        let resp = self.inner.flush(request).await?;
1084        Ok(HummockVersionId::new(resp.hummock_version_id))
1085    }
1086
1087    pub async fn wait(&self) -> Result<()> {
1088        let request = WaitRequest {};
1089        self.inner.wait(request).await?;
1090        Ok(())
1091    }
1092
1093    pub async fn recover(&self) -> Result<()> {
1094        let request = RecoverRequest {};
1095        self.inner.recover(request).await?;
1096        Ok(())
1097    }
1098
1099    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1100        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1101        let resp = self.inner.cancel_creating_jobs(request).await?;
1102        Ok(resp.canceled_jobs)
1103    }
1104
1105    pub async fn list_table_fragments(
1106        &self,
1107        table_ids: &[u32],
1108    ) -> Result<HashMap<u32, TableFragmentInfo>> {
1109        let request = ListTableFragmentsRequest {
1110            table_ids: table_ids.to_vec(),
1111        };
1112        let resp = self.inner.list_table_fragments(request).await?;
1113        Ok(resp.table_fragments)
1114    }
1115
1116    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1117        let resp = self
1118            .inner
1119            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1120            .await?;
1121        Ok(resp.states)
1122    }
1123
1124    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1125        let resp = self
1126            .inner
1127            .list_fragment_distribution(ListFragmentDistributionRequest {})
1128            .await?;
1129        Ok(resp.distributions)
1130    }
1131
1132    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1133        let resp = self
1134            .inner
1135            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1136            .await?;
1137        Ok(resp.distributions)
1138    }
1139
1140    pub async fn get_fragment_by_id(
1141        &self,
1142        fragment_id: u32,
1143    ) -> Result<Option<FragmentDistribution>> {
1144        let resp = self
1145            .inner
1146            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1147            .await?;
1148        Ok(resp.distribution)
1149    }
1150
1151    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1152        let resp = self
1153            .inner
1154            .list_actor_states(ListActorStatesRequest {})
1155            .await?;
1156        Ok(resp.states)
1157    }
1158
1159    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1160        let resp = self
1161            .inner
1162            .list_actor_splits(ListActorSplitsRequest {})
1163            .await?;
1164
1165        Ok(resp.actor_splits)
1166    }
1167
1168    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1169        let resp = self
1170            .inner
1171            .list_object_dependencies(ListObjectDependenciesRequest {})
1172            .await?;
1173        Ok(resp.dependencies)
1174    }
1175
1176    pub async fn pause(&self) -> Result<PauseResponse> {
1177        let request = PauseRequest {};
1178        let resp = self.inner.pause(request).await?;
1179        Ok(resp)
1180    }
1181
1182    pub async fn resume(&self) -> Result<ResumeResponse> {
1183        let request = ResumeRequest {};
1184        let resp = self.inner.resume(request).await?;
1185        Ok(resp)
1186    }
1187
1188    pub async fn apply_throttle(
1189        &self,
1190        kind: PbThrottleTarget,
1191        id: u32,
1192        rate: Option<u32>,
1193    ) -> Result<ApplyThrottleResponse> {
1194        let request = ApplyThrottleRequest {
1195            kind: kind as i32,
1196            id,
1197            rate,
1198        };
1199        let resp = self.inner.apply_throttle(request).await?;
1200        Ok(resp)
1201    }
1202
1203    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1204        let resp = self
1205            .inner
1206            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1207            .await?;
1208        Ok(resp.get_status().unwrap())
1209    }
1210
1211    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1212        let request = GetClusterInfoRequest {};
1213        let resp = self.inner.get_cluster_info(request).await?;
1214        Ok(resp)
1215    }
1216
1217    pub async fn reschedule(
1218        &self,
1219        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1220        revision: u64,
1221        resolve_no_shuffle_upstream: bool,
1222    ) -> Result<(bool, u64)> {
1223        let request = RescheduleRequest {
1224            revision,
1225            resolve_no_shuffle_upstream,
1226            worker_reschedules,
1227        };
1228        let resp = self.inner.reschedule(request).await?;
1229        Ok((resp.success, resp.revision))
1230    }
1231
1232    pub async fn risectl_get_pinned_versions_summary(
1233        &self,
1234    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1235        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1236        self.inner
1237            .rise_ctl_get_pinned_versions_summary(request)
1238            .await
1239    }
1240
1241    pub async fn risectl_get_checkpoint_hummock_version(
1242        &self,
1243    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1244        let request = RiseCtlGetCheckpointVersionRequest {};
1245        self.inner.rise_ctl_get_checkpoint_version(request).await
1246    }
1247
1248    pub async fn risectl_pause_hummock_version_checkpoint(
1249        &self,
1250    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1251        let request = RiseCtlPauseVersionCheckpointRequest {};
1252        self.inner.rise_ctl_pause_version_checkpoint(request).await
1253    }
1254
1255    pub async fn risectl_resume_hummock_version_checkpoint(
1256        &self,
1257    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1258        let request = RiseCtlResumeVersionCheckpointRequest {};
1259        self.inner.rise_ctl_resume_version_checkpoint(request).await
1260    }
1261
1262    pub async fn init_metadata_for_replay(
1263        &self,
1264        tables: Vec<PbTable>,
1265        compaction_groups: Vec<CompactionGroupInfo>,
1266    ) -> Result<()> {
1267        let req = InitMetadataForReplayRequest {
1268            tables,
1269            compaction_groups,
1270        };
1271        let _resp = self.inner.init_metadata_for_replay(req).await?;
1272        Ok(())
1273    }
1274
1275    pub async fn replay_version_delta(
1276        &self,
1277        version_delta: HummockVersionDelta,
1278    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1279        let req = ReplayVersionDeltaRequest {
1280            version_delta: Some(version_delta.into()),
1281        };
1282        let resp = self.inner.replay_version_delta(req).await?;
1283        Ok((
1284            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1285            resp.modified_compaction_groups,
1286        ))
1287    }
1288
1289    pub async fn list_version_deltas(
1290        &self,
1291        start_id: HummockVersionId,
1292        num_limit: u32,
1293        committed_epoch_limit: HummockEpoch,
1294    ) -> Result<Vec<HummockVersionDelta>> {
1295        let req = ListVersionDeltasRequest {
1296            start_id: start_id.to_u64(),
1297            num_limit,
1298            committed_epoch_limit,
1299        };
1300        Ok(self
1301            .inner
1302            .list_version_deltas(req)
1303            .await?
1304            .version_deltas
1305            .unwrap()
1306            .version_deltas
1307            .iter()
1308            .map(HummockVersionDelta::from_rpc_protobuf)
1309            .collect())
1310    }
1311
1312    pub async fn trigger_compaction_deterministic(
1313        &self,
1314        version_id: HummockVersionId,
1315        compaction_groups: Vec<CompactionGroupId>,
1316    ) -> Result<()> {
1317        let req = TriggerCompactionDeterministicRequest {
1318            version_id: version_id.to_u64(),
1319            compaction_groups,
1320        };
1321        self.inner.trigger_compaction_deterministic(req).await?;
1322        Ok(())
1323    }
1324
1325    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1326        let req = DisableCommitEpochRequest {};
1327        Ok(HummockVersion::from_rpc_protobuf(
1328            &self
1329                .inner
1330                .disable_commit_epoch(req)
1331                .await?
1332                .current_version
1333                .unwrap(),
1334        ))
1335    }
1336
1337    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1338        let req = GetAssignedCompactTaskNumRequest {};
1339        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1340        Ok(resp.num_tasks as usize)
1341    }
1342
1343    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1344        let req = RiseCtlListCompactionGroupRequest {};
1345        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1346        Ok(resp.compaction_groups)
1347    }
1348
1349    pub async fn risectl_update_compaction_config(
1350        &self,
1351        compaction_groups: &[CompactionGroupId],
1352        configs: &[MutableConfig],
1353    ) -> Result<()> {
1354        let req = RiseCtlUpdateCompactionConfigRequest {
1355            compaction_group_ids: compaction_groups.to_vec(),
1356            configs: configs
1357                .iter()
1358                .map(
1359                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1360                        mutable_config: Some(c.clone()),
1361                    },
1362                )
1363                .collect(),
1364        };
1365        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1366        Ok(())
1367    }
1368
1369    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1370        let req = BackupMetaRequest { remarks };
1371        let resp = self.inner.backup_meta(req).await?;
1372        Ok(resp.job_id)
1373    }
1374
1375    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1376        let req = GetBackupJobStatusRequest { job_id };
1377        let resp = self.inner.get_backup_job_status(req).await?;
1378        Ok((resp.job_status(), resp.message))
1379    }
1380
1381    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1382        let req = DeleteMetaSnapshotRequest {
1383            snapshot_ids: snapshot_ids.to_vec(),
1384        };
1385        let _resp = self.inner.delete_meta_snapshot(req).await?;
1386        Ok(())
1387    }
1388
1389    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1390        let req = GetMetaSnapshotManifestRequest {};
1391        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1392        Ok(resp.manifest.expect("should exist"))
1393    }
1394
1395    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1396        let req = GetTelemetryInfoRequest {};
1397        let resp = self.inner.get_telemetry_info(req).await?;
1398        Ok(resp)
1399    }
1400
1401    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1402        let req = GetMetaStoreInfoRequest {};
1403        let resp = self.inner.get_meta_store_info(req).await?;
1404        Ok(resp.meta_store_endpoint)
1405    }
1406
1407    pub async fn alter_sink_props(
1408        &self,
1409        sink_id: u32,
1410        changed_props: BTreeMap<String, String>,
1411        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1412        connector_conn_ref: Option<u32>,
1413    ) -> Result<()> {
1414        let req = AlterConnectorPropsRequest {
1415            object_id: sink_id,
1416            changed_props: changed_props.into_iter().collect(),
1417            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1418            connector_conn_ref,
1419            object_type: AlterConnectorPropsObject::Sink as i32,
1420            extra_options: None,
1421        };
1422        let _resp = self.inner.alter_connector_props(req).await?;
1423        Ok(())
1424    }
1425
1426    pub async fn alter_iceberg_table_props(
1427        &self,
1428        table_id: u32,
1429        sink_id: u32,
1430        source_id: u32,
1431        changed_props: BTreeMap<String, String>,
1432        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1433        connector_conn_ref: Option<u32>,
1434    ) -> Result<()> {
1435        let req = AlterConnectorPropsRequest {
1436            object_id: table_id,
1437            changed_props: changed_props.into_iter().collect(),
1438            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1439            connector_conn_ref,
1440            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1441            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1442                sink_id: sink_id as i32,
1443                source_id: source_id as i32,
1444            })),
1445        };
1446        let _resp = self.inner.alter_connector_props(req).await?;
1447        Ok(())
1448    }
1449
1450    pub async fn alter_source_connector_props(
1451        &self,
1452        source_id: u32,
1453        changed_props: BTreeMap<String, String>,
1454        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1455        connector_conn_ref: Option<u32>,
1456    ) -> Result<()> {
1457        let req = AlterConnectorPropsRequest {
1458            object_id: source_id,
1459            changed_props: changed_props.into_iter().collect(),
1460            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1461            connector_conn_ref,
1462            object_type: AlterConnectorPropsObject::Source as i32,
1463            extra_options: None,
1464        };
1465        let _resp = self.inner.alter_connector_props(req).await?;
1466        Ok(())
1467    }
1468
1469    pub async fn set_system_param(
1470        &self,
1471        param: String,
1472        value: Option<String>,
1473    ) -> Result<Option<SystemParamsReader>> {
1474        let req = SetSystemParamRequest { param, value };
1475        let resp = self.inner.set_system_param(req).await?;
1476        Ok(resp.params.map(SystemParamsReader::from))
1477    }
1478
1479    pub async fn get_session_params(&self) -> Result<String> {
1480        let req = GetSessionParamsRequest {};
1481        let resp = self.inner.get_session_params(req).await?;
1482        Ok(resp.params)
1483    }
1484
1485    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1486        let req = SetSessionParamRequest { param, value };
1487        let resp = self.inner.set_session_param(req).await?;
1488        Ok(resp.param)
1489    }
1490
1491    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1492        let req = GetDdlProgressRequest {};
1493        let resp = self.inner.get_ddl_progress(req).await?;
1494        Ok(resp.ddl_progress)
1495    }
1496
1497    pub async fn split_compaction_group(
1498        &self,
1499        group_id: CompactionGroupId,
1500        table_ids_to_new_group: &[StateTableId],
1501        partition_vnode_count: u32,
1502    ) -> Result<CompactionGroupId> {
1503        let req = SplitCompactionGroupRequest {
1504            group_id,
1505            table_ids: table_ids_to_new_group.to_vec(),
1506            partition_vnode_count,
1507        };
1508        let resp = self.inner.split_compaction_group(req).await?;
1509        Ok(resp.new_group_id)
1510    }
1511
1512    pub async fn get_tables(
1513        &self,
1514        table_ids: &[u32],
1515        include_dropped_tables: bool,
1516    ) -> Result<HashMap<u32, Table>> {
1517        let req = GetTablesRequest {
1518            table_ids: table_ids.to_vec(),
1519            include_dropped_tables,
1520        };
1521        let resp = self.inner.get_tables(req).await?;
1522        Ok(resp.tables)
1523    }
1524
1525    pub async fn list_serving_vnode_mappings(
1526        &self,
1527    ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1528        let req = GetServingVnodeMappingsRequest {};
1529        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1530        let mappings = resp
1531            .worker_slot_mappings
1532            .into_iter()
1533            .map(|p| {
1534                (
1535                    p.fragment_id,
1536                    (
1537                        resp.fragment_to_table
1538                            .get(&p.fragment_id)
1539                            .cloned()
1540                            .unwrap_or(0),
1541                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1542                    ),
1543                )
1544            })
1545            .collect();
1546        Ok(mappings)
1547    }
1548
1549    pub async fn risectl_list_compaction_status(
1550        &self,
1551    ) -> Result<(
1552        Vec<CompactStatus>,
1553        Vec<CompactTaskAssignment>,
1554        Vec<CompactTaskProgress>,
1555    )> {
1556        let req = RiseCtlListCompactionStatusRequest {};
1557        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1558        Ok((
1559            resp.compaction_statuses,
1560            resp.task_assignment,
1561            resp.task_progress,
1562        ))
1563    }
1564
1565    pub async fn get_compaction_score(
1566        &self,
1567        compaction_group_id: CompactionGroupId,
1568    ) -> Result<Vec<PickerInfo>> {
1569        let req = GetCompactionScoreRequest {
1570            compaction_group_id,
1571        };
1572        let resp = self.inner.get_compaction_score(req).await?;
1573        Ok(resp.scores)
1574    }
1575
1576    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1577        let req = RiseCtlRebuildTableStatsRequest {};
1578        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1579        Ok(())
1580    }
1581
1582    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1583        let req = ListBranchedObjectRequest {};
1584        let resp = self.inner.list_branched_object(req).await?;
1585        Ok(resp.branched_objects)
1586    }
1587
1588    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1589        let req = ListActiveWriteLimitRequest {};
1590        let resp = self.inner.list_active_write_limit(req).await?;
1591        Ok(resp.write_limits)
1592    }
1593
1594    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1595        let req = ListHummockMetaConfigRequest {};
1596        let resp = self.inner.list_hummock_meta_config(req).await?;
1597        Ok(resp.configs)
1598    }
1599
1600    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1601        let _resp = self
1602            .inner
1603            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1604            .await?;
1605
1606        Ok(())
1607    }
1608
1609    pub async fn rw_cloud_validate_source(
1610        &self,
1611        source_type: SourceType,
1612        source_config: HashMap<String, String>,
1613    ) -> Result<RwCloudValidateSourceResponse> {
1614        let req = RwCloudValidateSourceRequest {
1615            source_type: source_type.into(),
1616            source_config,
1617        };
1618        let resp = self.inner.rw_cloud_validate_source(req).await?;
1619        Ok(resp)
1620    }
1621
1622    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1623        self.inner.core.read().await.sink_coordinate_client.clone()
1624    }
1625
1626    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1627        let req = ListCompactTaskAssignmentRequest {};
1628        let resp = self.inner.list_compact_task_assignment(req).await?;
1629        Ok(resp.task_assignment)
1630    }
1631
1632    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1633        let req = ListEventLogRequest::default();
1634        let resp = self.inner.list_event_log(req).await?;
1635        Ok(resp.event_logs)
1636    }
1637
1638    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1639        let req = ListCompactTaskProgressRequest {};
1640        let resp = self.inner.list_compact_task_progress(req).await?;
1641        Ok(resp.task_progress)
1642    }
1643
1644    #[cfg(madsim)]
1645    pub fn try_add_panic_event_blocking(
1646        &self,
1647        panic_info: impl Display,
1648        timeout_millis: Option<u64>,
1649    ) {
1650    }
1651
1652    /// If `timeout_millis` is None, default is used.
1653    #[cfg(not(madsim))]
1654    pub fn try_add_panic_event_blocking(
1655        &self,
1656        panic_info: impl Display,
1657        timeout_millis: Option<u64>,
1658    ) {
1659        let event = event_log::EventWorkerNodePanic {
1660            worker_id: self.worker_id,
1661            worker_type: self.worker_type.into(),
1662            host_addr: Some(self.host_addr.to_protobuf()),
1663            panic_info: format!("{panic_info}"),
1664        };
1665        let grpc_meta_client = self.inner.clone();
1666        let _ = thread::spawn(move || {
1667            let rt = tokio::runtime::Builder::new_current_thread()
1668                .enable_all()
1669                .build()
1670                .unwrap();
1671            let req = AddEventLogRequest {
1672                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1673            };
1674            rt.block_on(async {
1675                let _ = tokio::time::timeout(
1676                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1677                    grpc_meta_client.add_event_log(req),
1678                )
1679                .await;
1680            });
1681        })
1682        .join();
1683    }
1684
1685    pub async fn add_sink_fail_evet(
1686        &self,
1687        sink_id: u32,
1688        sink_name: String,
1689        connector: String,
1690        error: String,
1691    ) -> Result<()> {
1692        let event = event_log::EventSinkFail {
1693            sink_id,
1694            sink_name,
1695            connector,
1696            error,
1697        };
1698        let req = AddEventLogRequest {
1699            event: Some(add_event_log_request::Event::SinkFail(event)),
1700        };
1701        self.inner.add_event_log(req).await?;
1702        Ok(())
1703    }
1704
1705    pub async fn add_cdc_auto_schema_change_fail_event(
1706        &self,
1707        table_id: u32,
1708        table_name: String,
1709        cdc_table_id: String,
1710        upstream_ddl: String,
1711        fail_info: String,
1712    ) -> Result<()> {
1713        let event = event_log::EventAutoSchemaChangeFail {
1714            table_id,
1715            table_name,
1716            cdc_table_id,
1717            upstream_ddl,
1718            fail_info,
1719        };
1720        let req = AddEventLogRequest {
1721            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1722        };
1723        self.inner.add_event_log(req).await?;
1724        Ok(())
1725    }
1726
1727    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1728        let req = CancelCompactTaskRequest {
1729            task_id,
1730            task_status: task_status as _,
1731        };
1732        let resp = self.inner.cancel_compact_task(req).await?;
1733        Ok(resp.ret)
1734    }
1735
1736    pub async fn get_version_by_epoch(
1737        &self,
1738        epoch: HummockEpoch,
1739        table_id: u32,
1740    ) -> Result<PbHummockVersion> {
1741        let req = GetVersionByEpochRequest { epoch, table_id };
1742        let resp = self.inner.get_version_by_epoch(req).await?;
1743        Ok(resp.version.unwrap())
1744    }
1745
1746    pub async fn get_cluster_limits(
1747        &self,
1748    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1749        let req = GetClusterLimitsRequest {};
1750        let resp = self.inner.get_cluster_limits(req).await?;
1751        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1752    }
1753
1754    pub async fn merge_compaction_group(
1755        &self,
1756        left_group_id: CompactionGroupId,
1757        right_group_id: CompactionGroupId,
1758    ) -> Result<()> {
1759        let req = MergeCompactionGroupRequest {
1760            left_group_id,
1761            right_group_id,
1762        };
1763        self.inner.merge_compaction_group(req).await?;
1764        Ok(())
1765    }
1766
1767    /// List all rate limits for sources and backfills
1768    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1769        let request = ListRateLimitsRequest {};
1770        let resp = self.inner.list_rate_limits(request).await?;
1771        Ok(resp.rate_limits)
1772    }
1773
1774    pub async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>> {
1775        let request = ListCdcProgressRequest {};
1776        let resp = self.inner.list_cdc_progress(request).await?;
1777        Ok(resp.cdc_progress)
1778    }
1779
1780    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1781        let request = ListIcebergTablesRequest {};
1782        let resp = self.inner.list_iceberg_tables(request).await?;
1783        Ok(resp.iceberg_tables)
1784    }
1785
1786    /// Get the await tree of all nodes in the cluster.
1787    pub async fn get_cluster_stack_trace(
1788        &self,
1789        actor_traces_format: ActorTracesFormat,
1790    ) -> Result<StackTraceResponse> {
1791        let request = StackTraceRequest {
1792            actor_traces_format: actor_traces_format as i32,
1793        };
1794        let resp = self.inner.stack_trace(request).await?;
1795        Ok(resp)
1796    }
1797
1798    pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1799        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1800        self.inner.set_sync_log_store_aligned(request).await?;
1801        Ok(())
1802    }
1803
1804    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1805        self.inner.refresh(request).await
1806    }
1807}
1808
1809#[async_trait]
1810impl HummockMetaClient for MetaClient {
1811    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1812        let req = UnpinVersionBeforeRequest {
1813            context_id: self.worker_id(),
1814            unpin_version_before: unpin_version_before.to_u64(),
1815        };
1816        self.inner.unpin_version_before(req).await?;
1817        Ok(())
1818    }
1819
1820    async fn get_current_version(&self) -> Result<HummockVersion> {
1821        let req = GetCurrentVersionRequest::default();
1822        Ok(HummockVersion::from_rpc_protobuf(
1823            &self
1824                .inner
1825                .get_current_version(req)
1826                .await?
1827                .current_version
1828                .unwrap(),
1829        ))
1830    }
1831
1832    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1833        let resp = self
1834            .inner
1835            .get_new_object_ids(GetNewObjectIdsRequest { number })
1836            .await?;
1837        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1838    }
1839
1840    async fn commit_epoch_with_change_log(
1841        &self,
1842        _epoch: HummockEpoch,
1843        _sync_result: SyncResult,
1844        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1845    ) -> Result<()> {
1846        panic!("Only meta service can commit_epoch in production.")
1847    }
1848
1849    async fn trigger_manual_compaction(
1850        &self,
1851        compaction_group_id: u64,
1852        table_id: u32,
1853        level: u32,
1854        sst_ids: Vec<u64>,
1855    ) -> Result<()> {
1856        // TODO: support key_range parameter
1857        let req = TriggerManualCompactionRequest {
1858            compaction_group_id,
1859            table_id,
1860            // if table_id not exist, manual_compaction will include all the sst
1861            // without check internal_table_id
1862            level,
1863            sst_ids,
1864            ..Default::default()
1865        };
1866
1867        self.inner.trigger_manual_compaction(req).await?;
1868        Ok(())
1869    }
1870
1871    async fn trigger_full_gc(
1872        &self,
1873        sst_retention_time_sec: u64,
1874        prefix: Option<String>,
1875    ) -> Result<()> {
1876        self.inner
1877            .trigger_full_gc(TriggerFullGcRequest {
1878                sst_retention_time_sec,
1879                prefix,
1880            })
1881            .await?;
1882        Ok(())
1883    }
1884
1885    async fn subscribe_compaction_event(
1886        &self,
1887    ) -> Result<(
1888        UnboundedSender<SubscribeCompactionEventRequest>,
1889        BoxStream<'static, CompactionEventItem>,
1890    )> {
1891        let (request_sender, request_receiver) =
1892            unbounded_channel::<SubscribeCompactionEventRequest>();
1893        request_sender
1894            .send(SubscribeCompactionEventRequest {
1895                event: Some(subscribe_compaction_event_request::Event::Register(
1896                    Register {
1897                        context_id: self.worker_id(),
1898                    },
1899                )),
1900                create_at: SystemTime::now()
1901                    .duration_since(SystemTime::UNIX_EPOCH)
1902                    .expect("Clock may have gone backwards")
1903                    .as_millis() as u64,
1904            })
1905            .context("Failed to subscribe compaction event")?;
1906
1907        let stream = self
1908            .inner
1909            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1910                request_receiver,
1911            )))
1912            .await?;
1913
1914        Ok((request_sender, Box::pin(stream)))
1915    }
1916
1917    async fn get_version_by_epoch(
1918        &self,
1919        epoch: HummockEpoch,
1920        table_id: u32,
1921    ) -> Result<PbHummockVersion> {
1922        self.get_version_by_epoch(epoch, table_id).await
1923    }
1924
1925    async fn subscribe_iceberg_compaction_event(
1926        &self,
1927    ) -> Result<(
1928        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1929        BoxStream<'static, IcebergCompactionEventItem>,
1930    )> {
1931        let (request_sender, request_receiver) =
1932            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1933        request_sender
1934            .send(SubscribeIcebergCompactionEventRequest {
1935                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1936                    IcebergRegister {
1937                        context_id: self.worker_id(),
1938                    },
1939                )),
1940                create_at: SystemTime::now()
1941                    .duration_since(SystemTime::UNIX_EPOCH)
1942                    .expect("Clock may have gone backwards")
1943                    .as_millis() as u64,
1944            })
1945            .context("Failed to subscribe compaction event")?;
1946
1947        let stream = self
1948            .inner
1949            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1950                request_receiver,
1951            )))
1952            .await?;
1953
1954        Ok((request_sender, Box::pin(stream)))
1955    }
1956}
1957
1958#[async_trait]
1959impl TelemetryInfoFetcher for MetaClient {
1960    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1961        let resp = self
1962            .get_telemetry_info()
1963            .await
1964            .map_err(|e| e.to_report_string())?;
1965        let tracking_id = resp.get_tracking_id().ok();
1966        Ok(tracking_id.map(|id| id.to_owned()))
1967    }
1968}
1969
1970pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1971
1972#[derive(Debug, Clone)]
1973struct GrpcMetaClientCore {
1974    cluster_client: ClusterServiceClient<Channel>,
1975    meta_member_client: MetaMemberServiceClient<Channel>,
1976    heartbeat_client: HeartbeatServiceClient<Channel>,
1977    ddl_client: DdlServiceClient<Channel>,
1978    hummock_client: HummockManagerServiceClient<Channel>,
1979    notification_client: NotificationServiceClient<Channel>,
1980    stream_client: StreamManagerServiceClient<Channel>,
1981    user_client: UserServiceClient<Channel>,
1982    scale_client: ScaleServiceClient<Channel>,
1983    backup_client: BackupServiceClient<Channel>,
1984    telemetry_client: TelemetryInfoServiceClient<Channel>,
1985    system_params_client: SystemParamsServiceClient<Channel>,
1986    session_params_client: SessionParamServiceClient<Channel>,
1987    serving_client: ServingServiceClient<Channel>,
1988    cloud_client: CloudServiceClient<Channel>,
1989    sink_coordinate_client: SinkCoordinationRpcClient,
1990    event_log_client: EventLogServiceClient<Channel>,
1991    cluster_limit_client: ClusterLimitServiceClient<Channel>,
1992    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1993    monitor_client: MonitorServiceClient<Channel>,
1994}
1995
1996impl GrpcMetaClientCore {
1997    pub(crate) fn new(channel: Channel) -> Self {
1998        let cluster_client = ClusterServiceClient::new(channel.clone());
1999        let meta_member_client = MetaMemberClient::new(channel.clone());
2000        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2001        let ddl_client =
2002            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2003        let hummock_client =
2004            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2005        let notification_client =
2006            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2007        let stream_client =
2008            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2009        let user_client = UserServiceClient::new(channel.clone());
2010        let scale_client =
2011            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2012        let backup_client = BackupServiceClient::new(channel.clone());
2013        let telemetry_client =
2014            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2015        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2016        let session_params_client = SessionParamServiceClient::new(channel.clone());
2017        let serving_client = ServingServiceClient::new(channel.clone());
2018        let cloud_client = CloudServiceClient::new(channel.clone());
2019        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
2020        let event_log_client = EventLogServiceClient::new(channel.clone());
2021        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2022        let hosted_iceberg_catalog_service_client =
2023            HostedIcebergCatalogServiceClient::new(channel.clone());
2024        let monitor_client = MonitorServiceClient::new(channel);
2025
2026        GrpcMetaClientCore {
2027            cluster_client,
2028            meta_member_client,
2029            heartbeat_client,
2030            ddl_client,
2031            hummock_client,
2032            notification_client,
2033            stream_client,
2034            user_client,
2035            scale_client,
2036            backup_client,
2037            telemetry_client,
2038            system_params_client,
2039            session_params_client,
2040            serving_client,
2041            cloud_client,
2042            sink_coordinate_client,
2043            event_log_client,
2044            cluster_limit_client,
2045            hosted_iceberg_catalog_service_client,
2046            monitor_client,
2047        }
2048    }
2049}
2050
2051/// Client to meta server. Cloning the instance is lightweight.
2052///
2053/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2054#[derive(Debug, Clone)]
2055struct GrpcMetaClient {
2056    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2057    core: Arc<RwLock<GrpcMetaClientCore>>,
2058}
2059
2060type MetaMemberClient = MetaMemberServiceClient<Channel>;
2061
2062struct MetaMemberGroup {
2063    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2064}
2065
2066struct MetaMemberManagement {
2067    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2068    members: Either<MetaMemberClient, MetaMemberGroup>,
2069    current_leader: http::Uri,
2070    meta_config: Arc<MetaConfig>,
2071}
2072
2073impl MetaMemberManagement {
2074    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2075
2076    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2077        format!("http://{}:{}", addr.host, addr.port)
2078            .parse()
2079            .unwrap()
2080    }
2081
2082    async fn recreate_core(&self, channel: Channel) {
2083        let mut core = self.core_ref.write().await;
2084        *core = GrpcMetaClientCore::new(channel);
2085    }
2086
2087    async fn refresh_members(&mut self) -> Result<()> {
2088        let leader_addr = match self.members.as_mut() {
2089            Either::Left(client) => {
2090                let resp = client
2091                    .to_owned()
2092                    .members(MembersRequest {})
2093                    .await
2094                    .map_err(RpcError::from_meta_status)?;
2095                let resp = resp.into_inner();
2096                resp.members.into_iter().find(|member| member.is_leader)
2097            }
2098            Either::Right(member_group) => {
2099                let mut fetched_members = None;
2100
2101                for (addr, client) in &mut member_group.members {
2102                    let members: Result<_> = try {
2103                        let mut client = match client {
2104                            Some(cached_client) => cached_client.to_owned(),
2105                            None => {
2106                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2107                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2108                                    .await
2109                                    .context("failed to create client")?;
2110                                let new_client: MetaMemberClient =
2111                                    MetaMemberServiceClient::new(channel);
2112                                *client = Some(new_client.clone());
2113
2114                                new_client
2115                            }
2116                        };
2117
2118                        let resp = client
2119                            .members(MembersRequest {})
2120                            .await
2121                            .context("failed to fetch members")?;
2122
2123                        resp.into_inner().members
2124                    };
2125
2126                    let fetched = members.is_ok();
2127                    fetched_members = Some(members);
2128                    if fetched {
2129                        break;
2130                    }
2131                }
2132
2133                let members = fetched_members
2134                    .context("no member available in the list")?
2135                    .context("could not refresh members")?;
2136
2137                // find new leader
2138                let mut leader = None;
2139                for member in members {
2140                    if member.is_leader {
2141                        leader = Some(member.clone());
2142                    }
2143
2144                    let addr = Self::host_address_to_uri(member.address.unwrap());
2145                    // We don't clean any expired addrs here to deal with some extreme situations.
2146                    if !member_group.members.contains(&addr) {
2147                        tracing::info!("new meta member joined: {}", addr);
2148                        member_group.members.put(addr, None);
2149                    }
2150                }
2151
2152                leader
2153            }
2154        };
2155
2156        if let Some(leader) = leader_addr {
2157            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2158
2159            if discovered_leader != self.current_leader {
2160                tracing::info!("new meta leader {} discovered", discovered_leader);
2161
2162                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2163                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2164                    false,
2165                );
2166
2167                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2168                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2169                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2170                })
2171                .await?;
2172
2173                self.recreate_core(channel).await;
2174                self.current_leader = discovered_leader;
2175            }
2176        }
2177
2178        Ok(())
2179    }
2180}
2181
2182impl GrpcMetaClient {
2183    // See `Endpoint::http2_keep_alive_interval`
2184    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2185    // See `Endpoint::keep_alive_timeout`
2186    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2187    // Retry base interval in ms for connecting to meta server.
2188    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2189    // Max retry times for connecting to meta server.
2190    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2191
2192    fn start_meta_member_monitor(
2193        &self,
2194        init_leader_addr: http::Uri,
2195        members: Either<MetaMemberClient, MetaMemberGroup>,
2196        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2197        meta_config: Arc<MetaConfig>,
2198    ) -> Result<()> {
2199        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2200        let current_leader = init_leader_addr;
2201
2202        let enable_period_tick = matches!(members, Either::Right(_));
2203
2204        let member_management = MetaMemberManagement {
2205            core_ref,
2206            members,
2207            current_leader,
2208            meta_config,
2209        };
2210
2211        let mut force_refresh_receiver = force_refresh_receiver;
2212
2213        tokio::spawn(async move {
2214            let mut member_management = member_management;
2215            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2216
2217            loop {
2218                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2219                    tokio::select! {
2220                        _ = ticker.tick() => None,
2221                        result_sender = force_refresh_receiver.recv() => {
2222                            if result_sender.is_none() {
2223                                break;
2224                            }
2225
2226                            result_sender
2227                        },
2228                    }
2229                } else {
2230                    let result_sender = force_refresh_receiver.recv().await;
2231
2232                    if result_sender.is_none() {
2233                        break;
2234                    }
2235
2236                    result_sender
2237                };
2238
2239                let tick_result = member_management.refresh_members().await;
2240                if let Err(e) = tick_result.as_ref() {
2241                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2242                }
2243
2244                if let Some(sender) = event {
2245                    // ignore resp
2246                    let _resp = sender.send(tick_result);
2247                }
2248            }
2249        });
2250
2251        Ok(())
2252    }
2253
2254    async fn force_refresh_leader(&self) -> Result<()> {
2255        let (sender, receiver) = oneshot::channel();
2256
2257        self.member_monitor_event_sender
2258            .send(sender)
2259            .await
2260            .map_err(|e| anyhow!(e))?;
2261
2262        receiver.await.map_err(|e| anyhow!(e))?
2263    }
2264
2265    /// Connect to the meta server from `addrs`.
2266    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2267        let (channel, addr) = match strategy {
2268            MetaAddressStrategy::LoadBalance(addr) => {
2269                Self::try_build_rpc_channel(vec![addr.clone()]).await
2270            }
2271            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2272        }?;
2273        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2274        let client = GrpcMetaClient {
2275            member_monitor_event_sender: force_refresh_sender,
2276            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2277        };
2278
2279        let meta_member_client = client.core.read().await.meta_member_client.clone();
2280        let members = match strategy {
2281            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2282            MetaAddressStrategy::List(addrs) => {
2283                let mut members = LruCache::new(20);
2284                for addr in addrs {
2285                    members.put(addr.clone(), None);
2286                }
2287                members.put(addr.clone(), Some(meta_member_client));
2288
2289                Either::Right(MetaMemberGroup { members })
2290            }
2291        };
2292
2293        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2294
2295        client.force_refresh_leader().await?;
2296
2297        Ok(client)
2298    }
2299
2300    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2301        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2302    }
2303
2304    pub(crate) async fn try_build_rpc_channel(
2305        addrs: impl IntoIterator<Item = http::Uri>,
2306    ) -> Result<(Channel, http::Uri)> {
2307        let endpoints: Vec<_> = addrs
2308            .into_iter()
2309            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2310            .collect();
2311
2312        let mut last_error = None;
2313
2314        for (endpoint, addr) in endpoints {
2315            match Self::connect_to_endpoint(endpoint).await {
2316                Ok(channel) => {
2317                    tracing::info!("Connect to meta server {} successfully", addr);
2318                    return Ok((channel, addr));
2319                }
2320                Err(e) => {
2321                    tracing::warn!(
2322                        error = %e.as_report(),
2323                        "Failed to connect to meta server {}, trying again",
2324                        addr,
2325                    );
2326                    last_error = Some(e);
2327                }
2328            }
2329        }
2330
2331        if let Some(last_error) = last_error {
2332            Err(anyhow::anyhow!(last_error)
2333                .context("failed to connect to all meta servers")
2334                .into())
2335        } else {
2336            bail!("no meta server address provided")
2337        }
2338    }
2339
2340    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2341        let channel = endpoint
2342            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2343            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2344            .connect_timeout(Duration::from_secs(5))
2345            .monitored_connect("grpc-meta-client", Default::default())
2346            .await?
2347            .wrapped();
2348
2349        Ok(channel)
2350    }
2351
2352    pub(crate) fn retry_strategy_to_bound(
2353        high_bound: Duration,
2354        exceed: bool,
2355    ) -> impl Iterator<Item = Duration> {
2356        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2357            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2358            .map(jitter);
2359
2360        let mut sum = Duration::default();
2361
2362        iter.take_while(move |duration| {
2363            sum += *duration;
2364
2365            if exceed {
2366                sum < high_bound + *duration
2367            } else {
2368                sum < high_bound
2369            }
2370        })
2371    }
2372}
2373
2374macro_rules! for_all_meta_rpc {
2375    ($macro:ident) => {
2376        $macro! {
2377             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2378            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2379            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2380            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2381            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2382            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2383            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2384            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2385            ,{ stream_client, flush, FlushRequest, FlushResponse }
2386            ,{ stream_client, pause, PauseRequest, PauseResponse }
2387            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2388            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2389            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2390            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2391            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2392            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2393            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2394            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2395            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2396            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2397            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2398            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2399            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2400            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2401            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2402            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2403            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2404            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2405            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2406            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2407            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2408            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2409            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2410            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2411            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2412            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2413            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2414            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2415            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2416            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2417            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2418            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2419            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2420            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2421            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2422            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2423            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2424            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2425            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2426            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2427            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2428            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2429            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2430            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2431            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2432            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2433            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2434            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2435            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2436            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2437            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2438            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2439            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2440            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2441            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2442            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2443            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2444            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2445            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2446            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2447            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2448            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2449            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2450            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2451            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2452            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2453            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2454            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2455            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2456            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2457            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2458            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2459            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2460            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2461            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2462            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2463            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2464            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2465            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2466            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2467            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2468            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2469            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2470            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2471            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2472            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2473            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2474            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2475            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2476            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2477            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2478            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2479            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2480            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2481            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2482            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2483            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2484            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2485            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2486            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2487            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2488            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2489            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2490            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2491            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2492            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2493            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2494            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2495            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2496            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2497            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2498            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2499            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2500            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2501            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2502            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2503            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2504        }
2505    };
2506}
2507
2508impl GrpcMetaClient {
2509    async fn refresh_client_if_needed(&self, code: Code) {
2510        if matches!(
2511            code,
2512            Code::Unknown | Code::Unimplemented | Code::Unavailable
2513        ) {
2514            tracing::debug!("matching tonic code {}", code);
2515            let (result_sender, result_receiver) = oneshot::channel();
2516            if self
2517                .member_monitor_event_sender
2518                .try_send(result_sender)
2519                .is_ok()
2520            {
2521                if let Ok(Err(e)) = result_receiver.await {
2522                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2523                }
2524            } else {
2525                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2526            }
2527        }
2528    }
2529}
2530
2531impl GrpcMetaClient {
2532    for_all_meta_rpc! { meta_rpc_client_method_impl }
2533}