risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38    ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58    PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
68use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
69use risingwave_pb::ddl_service::*;
70use risingwave_pb::hummock::compact_task::TaskStatus;
71use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
72use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
73use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
74use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
75use risingwave_pb::hummock::write_limits::WriteLimit;
76use risingwave_pb::hummock::*;
77use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
78use risingwave_pb::iceberg_compaction::{
79    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
80    subscribe_iceberg_compaction_event_request,
81};
82use risingwave_pb::id::{ActorId, FragmentId, SourceId};
83use risingwave_pb::meta::alter_connector_props_request::{
84    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
85};
86use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
87use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
88use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
89use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
90use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
91use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
92use risingwave_pb::meta::list_actor_states_response::ActorState;
93use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
94use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
95use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
96use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
97use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
98use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
99use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
100use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
101use risingwave_pb::meta::serving_service_client::ServingServiceClient;
102use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
103use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
104use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
105use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
106use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
107use risingwave_pb::meta::{FragmentDistribution, *};
108use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
109use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
110use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
111use risingwave_pb::secret::PbSecretRef;
112use risingwave_pb::stream_plan::StreamFragmentGraph;
113use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
114use risingwave_pb::user::update_user_request::UpdateField;
115use risingwave_pb::user::user_service_client::UserServiceClient;
116use risingwave_pb::user::*;
117use thiserror_ext::AsReport;
118use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
119use tokio::sync::oneshot::Sender;
120use tokio::sync::{RwLock, mpsc, oneshot};
121use tokio::task::JoinHandle;
122use tokio::time::{self};
123use tokio_retry::strategy::{ExponentialBackoff, jitter};
124use tokio_stream::wrappers::UnboundedReceiverStream;
125use tonic::transport::Endpoint;
126use tonic::{Code, Request, Streaming};
127
128use crate::channel::{Channel, WrappedChannelExt};
129use crate::error::{Result, RpcError};
130use crate::hummock_meta_client::{
131    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
132    IcebergCompactionEventItem,
133};
134use crate::meta_rpc_client_method_impl;
135
136/// Client to meta server. Cloning the instance is lightweight.
137#[derive(Clone, Debug)]
138pub struct MetaClient {
139    worker_id: WorkerId,
140    worker_type: WorkerType,
141    host_addr: HostAddr,
142    inner: GrpcMetaClient,
143    meta_config: Arc<MetaConfig>,
144    cluster_id: String,
145    shutting_down: Arc<AtomicBool>,
146}
147
148impl MetaClient {
149    pub fn worker_id(&self) -> WorkerId {
150        self.worker_id
151    }
152
153    pub fn host_addr(&self) -> &HostAddr {
154        &self.host_addr
155    }
156
157    pub fn worker_type(&self) -> WorkerType {
158        self.worker_type
159    }
160
161    pub fn cluster_id(&self) -> &str {
162        &self.cluster_id
163    }
164
165    /// Subscribe to notification from meta.
166    pub async fn subscribe(
167        &self,
168        subscribe_type: SubscribeType,
169    ) -> Result<Streaming<SubscribeResponse>> {
170        let request = SubscribeRequest {
171            subscribe_type: subscribe_type as i32,
172            host: Some(self.host_addr.to_protobuf()),
173            worker_id: self.worker_id(),
174        };
175
176        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
177            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
178            true,
179        );
180
181        tokio_retry::Retry::spawn(retry_strategy, || async {
182            let request = request.clone();
183            self.inner.subscribe(request).await
184        })
185        .await
186    }
187
188    pub async fn create_connection(
189        &self,
190        connection_name: String,
191        database_id: DatabaseId,
192        schema_id: SchemaId,
193        owner_id: u32,
194        req: create_connection_request::Payload,
195    ) -> Result<WaitVersion> {
196        let request = CreateConnectionRequest {
197            name: connection_name,
198            database_id,
199            schema_id,
200            owner_id,
201            payload: Some(req),
202        };
203        let resp = self.inner.create_connection(request).await?;
204        Ok(resp
205            .version
206            .ok_or_else(|| anyhow!("wait version not set"))?)
207    }
208
209    pub async fn create_secret(
210        &self,
211        secret_name: String,
212        database_id: DatabaseId,
213        schema_id: SchemaId,
214        owner_id: u32,
215        value: Vec<u8>,
216    ) -> Result<WaitVersion> {
217        let request = CreateSecretRequest {
218            name: secret_name,
219            database_id,
220            schema_id,
221            owner_id,
222            value,
223        };
224        let resp = self.inner.create_secret(request).await?;
225        Ok(resp
226            .version
227            .ok_or_else(|| anyhow!("wait version not set"))?)
228    }
229
230    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
231        let request = ListConnectionsRequest {};
232        let resp = self.inner.list_connections(request).await?;
233        Ok(resp.connections)
234    }
235
236    pub async fn drop_connection(
237        &self,
238        connection_id: ConnectionId,
239        cascade: bool,
240    ) -> Result<WaitVersion> {
241        let request = DropConnectionRequest {
242            connection_id,
243            cascade,
244        };
245        let resp = self.inner.drop_connection(request).await?;
246        Ok(resp
247            .version
248            .ok_or_else(|| anyhow!("wait version not set"))?)
249    }
250
251    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
252        let request = DropSecretRequest { secret_id };
253        let resp = self.inner.drop_secret(request).await?;
254        Ok(resp
255            .version
256            .ok_or_else(|| anyhow!("wait version not set"))?)
257    }
258
259    /// Register the current node to the cluster and set the corresponding worker id.
260    ///
261    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
262    pub async fn register_new(
263        addr_strategy: MetaAddressStrategy,
264        worker_type: WorkerType,
265        addr: &HostAddr,
266        property: Property,
267        meta_config: Arc<MetaConfig>,
268    ) -> (Self, SystemParamsReader) {
269        let ret =
270            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
271
272        match ret {
273            Ok(ret) => ret,
274            Err(err) => {
275                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
276                std::process::exit(1);
277            }
278        }
279    }
280
281    async fn register_new_inner(
282        addr_strategy: MetaAddressStrategy,
283        worker_type: WorkerType,
284        addr: &HostAddr,
285        property: Property,
286        meta_config: Arc<MetaConfig>,
287    ) -> Result<(Self, SystemParamsReader)> {
288        tracing::info!("register meta client using strategy: {}", addr_strategy);
289
290        // Retry until reaching `max_heartbeat_interval_secs`
291        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
292            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
293            true,
294        );
295
296        if property.is_unschedulable {
297            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
298        }
299        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
300            retry_strategy,
301            || async {
302                let grpc_meta_client =
303                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
304
305                let add_worker_resp = grpc_meta_client
306                    .add_worker_node(AddWorkerNodeRequest {
307                        worker_type: worker_type as i32,
308                        host: Some(addr.to_protobuf()),
309                        property: Some(property.clone()),
310                        resource: Some(risingwave_pb::common::worker_node::Resource {
311                            rw_version: RW_VERSION.to_owned(),
312                            total_memory_bytes: system_memory_available_bytes() as _,
313                            total_cpu_cores: total_cpu_available() as _,
314                            hostname: hostname(),
315                        }),
316                    })
317                    .await
318                    .context("failed to add worker node")?;
319
320                let system_params_resp = grpc_meta_client
321                    .get_system_params(GetSystemParamsRequest {})
322                    .await
323                    .context("failed to get initial system params")?;
324
325                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
326            },
327            // Only retry if there's any transient connection issue.
328            // If the error is from our implementation or business, do not retry it.
329            |e: &RpcError| !e.is_from_tonic_server_impl(),
330        )
331        .await;
332
333        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
334        let worker_id = add_worker_resp
335            .node_id
336            .expect("AddWorkerNodeResponse::node_id is empty");
337
338        let meta_client = Self {
339            worker_id,
340            worker_type,
341            host_addr: addr.clone(),
342            inner: grpc_meta_client,
343            meta_config: meta_config.clone(),
344            cluster_id: add_worker_resp.cluster_id,
345            shutting_down: Arc::new(false.into()),
346        };
347
348        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
349        REPORT_PANIC.call_once(|| {
350            let meta_client_clone = meta_client.clone();
351            std::panic::update_hook(move |default_hook, info| {
352                // Try to report panic event to meta node.
353                meta_client_clone.try_add_panic_event_blocking(info, None);
354                default_hook(info);
355            });
356        });
357
358        Ok((meta_client, system_params_resp.params.unwrap().into()))
359    }
360
361    /// Activate the current node in cluster to confirm it's ready to serve.
362    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
363        let request = ActivateWorkerNodeRequest {
364            host: Some(addr.to_protobuf()),
365            node_id: self.worker_id,
366        };
367        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
368            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
369            true,
370        );
371        tokio_retry::Retry::spawn(retry_strategy, || async {
372            let request = request.clone();
373            self.inner.activate_worker_node(request).await
374        })
375        .await?;
376
377        Ok(())
378    }
379
380    /// Send heartbeat signal to meta service.
381    pub async fn send_heartbeat(&self, node_id: WorkerId) -> Result<()> {
382        let request = HeartbeatRequest { node_id };
383        let resp = self.inner.heartbeat(request).await?;
384        if let Some(status) = resp.status
385            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
386        {
387            // Ignore the error if we're already shutting down.
388            // Otherwise, exit the process.
389            if !self.shutting_down.load(Relaxed) {
390                tracing::error!(message = status.message, "worker expired");
391                std::process::exit(1);
392            }
393        }
394        Ok(())
395    }
396
397    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
398        let request = CreateDatabaseRequest { db: Some(db) };
399        let resp = self.inner.create_database(request).await?;
400        // TODO: handle error in `resp.status` here
401        Ok(resp
402            .version
403            .ok_or_else(|| anyhow!("wait version not set"))?)
404    }
405
406    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
407        let request = CreateSchemaRequest {
408            schema: Some(schema),
409        };
410        let resp = self.inner.create_schema(request).await?;
411        // TODO: handle error in `resp.status` here
412        Ok(resp
413            .version
414            .ok_or_else(|| anyhow!("wait version not set"))?)
415    }
416
417    pub async fn create_materialized_view(
418        &self,
419        table: PbTable,
420        graph: StreamFragmentGraph,
421        dependencies: HashSet<ObjectId>,
422        specific_resource_group: Option<String>,
423        if_not_exists: bool,
424    ) -> Result<WaitVersion> {
425        let request = CreateMaterializedViewRequest {
426            materialized_view: Some(table),
427            fragment_graph: Some(graph),
428            backfill: PbBackfillType::Regular as _,
429            dependencies: dependencies.into_iter().collect(),
430            specific_resource_group,
431            if_not_exists,
432        };
433        let resp = self.inner.create_materialized_view(request).await?;
434        // TODO: handle error in `resp.status` here
435        Ok(resp
436            .version
437            .ok_or_else(|| anyhow!("wait version not set"))?)
438    }
439
440    pub async fn drop_materialized_view(
441        &self,
442        table_id: TableId,
443        cascade: bool,
444    ) -> Result<WaitVersion> {
445        let request = DropMaterializedViewRequest { table_id, cascade };
446
447        let resp = self.inner.drop_materialized_view(request).await?;
448        Ok(resp
449            .version
450            .ok_or_else(|| anyhow!("wait version not set"))?)
451    }
452
453    pub async fn create_source(
454        &self,
455        source: PbSource,
456        graph: Option<StreamFragmentGraph>,
457        if_not_exists: bool,
458    ) -> Result<WaitVersion> {
459        let request = CreateSourceRequest {
460            source: Some(source),
461            fragment_graph: graph,
462            if_not_exists,
463        };
464
465        let resp = self.inner.create_source(request).await?;
466        Ok(resp
467            .version
468            .ok_or_else(|| anyhow!("wait version not set"))?)
469    }
470
471    pub async fn create_sink(
472        &self,
473        sink: PbSink,
474        graph: StreamFragmentGraph,
475        dependencies: HashSet<ObjectId>,
476        if_not_exists: bool,
477    ) -> Result<WaitVersion> {
478        let request = CreateSinkRequest {
479            sink: Some(sink),
480            fragment_graph: Some(graph),
481            dependencies: dependencies.into_iter().collect(),
482            if_not_exists,
483        };
484
485        let resp = self.inner.create_sink(request).await?;
486        Ok(resp
487            .version
488            .ok_or_else(|| anyhow!("wait version not set"))?)
489    }
490
491    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
492        let request = CreateSubscriptionRequest {
493            subscription: Some(subscription),
494        };
495
496        let resp = self.inner.create_subscription(request).await?;
497        Ok(resp
498            .version
499            .ok_or_else(|| anyhow!("wait version not set"))?)
500    }
501
502    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
503        let request = CreateFunctionRequest {
504            function: Some(function),
505        };
506        let resp = self.inner.create_function(request).await?;
507        Ok(resp
508            .version
509            .ok_or_else(|| anyhow!("wait version not set"))?)
510    }
511
512    pub async fn create_table(
513        &self,
514        source: Option<PbSource>,
515        table: PbTable,
516        graph: StreamFragmentGraph,
517        job_type: PbTableJobType,
518        if_not_exists: bool,
519        dependencies: HashSet<ObjectId>,
520    ) -> Result<WaitVersion> {
521        let request = CreateTableRequest {
522            materialized_view: Some(table),
523            fragment_graph: Some(graph),
524            source,
525            job_type: job_type as _,
526            if_not_exists,
527            dependencies: dependencies.into_iter().collect(),
528        };
529        let resp = self.inner.create_table(request).await?;
530        // TODO: handle error in `resp.status` here
531        Ok(resp
532            .version
533            .ok_or_else(|| anyhow!("wait version not set"))?)
534    }
535
536    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
537        let request = CommentOnRequest {
538            comment: Some(comment),
539        };
540        let resp = self.inner.comment_on(request).await?;
541        Ok(resp
542            .version
543            .ok_or_else(|| anyhow!("wait version not set"))?)
544    }
545
546    pub async fn alter_name(
547        &self,
548        object: alter_name_request::Object,
549        name: &str,
550    ) -> Result<WaitVersion> {
551        let request = AlterNameRequest {
552            object: Some(object),
553            new_name: name.to_owned(),
554        };
555        let resp = self.inner.alter_name(request).await?;
556        Ok(resp
557            .version
558            .ok_or_else(|| anyhow!("wait version not set"))?)
559    }
560
561    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
562        let request = AlterOwnerRequest {
563            object: Some(object),
564            owner_id,
565        };
566        let resp = self.inner.alter_owner(request).await?;
567        Ok(resp
568            .version
569            .ok_or_else(|| anyhow!("wait version not set"))?)
570    }
571
572    pub async fn alter_database_param(
573        &self,
574        database_id: DatabaseId,
575        param: AlterDatabaseParam,
576    ) -> Result<WaitVersion> {
577        let request = match param {
578            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
579                let barrier_interval_ms = OptionalUint32 {
580                    value: barrier_interval_ms,
581                };
582                AlterDatabaseParamRequest {
583                    database_id,
584                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
585                        barrier_interval_ms,
586                    )),
587                }
588            }
589            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
590                let checkpoint_frequency = OptionalUint64 {
591                    value: checkpoint_frequency,
592                };
593                AlterDatabaseParamRequest {
594                    database_id,
595                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
596                        checkpoint_frequency,
597                    )),
598                }
599            }
600        };
601        let resp = self.inner.alter_database_param(request).await?;
602        Ok(resp
603            .version
604            .ok_or_else(|| anyhow!("wait version not set"))?)
605    }
606
607    pub async fn alter_set_schema(
608        &self,
609        object: alter_set_schema_request::Object,
610        new_schema_id: SchemaId,
611    ) -> Result<WaitVersion> {
612        let request = AlterSetSchemaRequest {
613            new_schema_id,
614            object: Some(object),
615        };
616        let resp = self.inner.alter_set_schema(request).await?;
617        Ok(resp
618            .version
619            .ok_or_else(|| anyhow!("wait version not set"))?)
620    }
621
622    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
623        let request = AlterSourceRequest {
624            source: Some(source),
625        };
626        let resp = self.inner.alter_source(request).await?;
627        Ok(resp
628            .version
629            .ok_or_else(|| anyhow!("wait version not set"))?)
630    }
631
632    pub async fn alter_parallelism(
633        &self,
634        job_id: JobId,
635        parallelism: PbTableParallelism,
636        deferred: bool,
637    ) -> Result<()> {
638        let request = AlterParallelismRequest {
639            table_id: job_id,
640            parallelism: Some(parallelism),
641            deferred,
642        };
643
644        self.inner.alter_parallelism(request).await?;
645        Ok(())
646    }
647
648    pub async fn alter_fragment_parallelism(
649        &self,
650        fragment_ids: Vec<FragmentId>,
651        parallelism: Option<PbTableParallelism>,
652    ) -> Result<()> {
653        let request = AlterFragmentParallelismRequest {
654            fragment_ids,
655            parallelism,
656        };
657
658        self.inner.alter_fragment_parallelism(request).await?;
659        Ok(())
660    }
661
662    pub async fn alter_cdc_table_backfill_parallelism(
663        &self,
664        table_id: JobId,
665        parallelism: PbTableParallelism,
666    ) -> Result<()> {
667        let request = AlterCdcTableBackfillParallelismRequest {
668            table_id,
669            parallelism: Some(parallelism),
670        };
671        self.inner
672            .alter_cdc_table_backfill_parallelism(request)
673            .await?;
674        Ok(())
675    }
676
677    pub async fn alter_resource_group(
678        &self,
679        table_id: TableId,
680        resource_group: Option<String>,
681        deferred: bool,
682    ) -> Result<()> {
683        let request = AlterResourceGroupRequest {
684            table_id,
685            resource_group,
686            deferred,
687        };
688
689        self.inner.alter_resource_group(request).await?;
690        Ok(())
691    }
692
693    pub async fn alter_swap_rename(
694        &self,
695        object: alter_swap_rename_request::Object,
696    ) -> Result<WaitVersion> {
697        let request = AlterSwapRenameRequest {
698            object: Some(object),
699        };
700        let resp = self.inner.alter_swap_rename(request).await?;
701        Ok(resp
702            .version
703            .ok_or_else(|| anyhow!("wait version not set"))?)
704    }
705
706    pub async fn alter_secret(
707        &self,
708        secret_id: SecretId,
709        secret_name: String,
710        database_id: DatabaseId,
711        schema_id: SchemaId,
712        owner_id: u32,
713        value: Vec<u8>,
714    ) -> Result<WaitVersion> {
715        let request = AlterSecretRequest {
716            secret_id,
717            name: secret_name,
718            database_id,
719            schema_id,
720            owner_id,
721            value,
722        };
723        let resp = self.inner.alter_secret(request).await?;
724        Ok(resp
725            .version
726            .ok_or_else(|| anyhow!("wait version not set"))?)
727    }
728
729    pub async fn replace_job(
730        &self,
731        graph: StreamFragmentGraph,
732        replace_job: ReplaceJob,
733    ) -> Result<WaitVersion> {
734        let request = ReplaceJobPlanRequest {
735            plan: Some(ReplaceJobPlan {
736                fragment_graph: Some(graph),
737                replace_job: Some(replace_job),
738            }),
739        };
740        let resp = self.inner.replace_job_plan(request).await?;
741        // TODO: handle error in `resp.status` here
742        Ok(resp
743            .version
744            .ok_or_else(|| anyhow!("wait version not set"))?)
745    }
746
747    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
748        let request = AutoSchemaChangeRequest {
749            schema_change: Some(schema_change),
750        };
751        let _ = self.inner.auto_schema_change(request).await?;
752        Ok(())
753    }
754
755    pub async fn create_view(
756        &self,
757        view: PbView,
758        dependencies: HashSet<ObjectId>,
759    ) -> Result<WaitVersion> {
760        let request = CreateViewRequest {
761            view: Some(view),
762            dependencies: dependencies.into_iter().collect(),
763        };
764        let resp = self.inner.create_view(request).await?;
765        // TODO: handle error in `resp.status` here
766        Ok(resp
767            .version
768            .ok_or_else(|| anyhow!("wait version not set"))?)
769    }
770
771    pub async fn create_index(
772        &self,
773        index: PbIndex,
774        table: PbTable,
775        graph: StreamFragmentGraph,
776        if_not_exists: bool,
777    ) -> Result<WaitVersion> {
778        let request = CreateIndexRequest {
779            index: Some(index),
780            index_table: Some(table),
781            fragment_graph: Some(graph),
782            if_not_exists,
783        };
784        let resp = self.inner.create_index(request).await?;
785        // TODO: handle error in `resp.status` here
786        Ok(resp
787            .version
788            .ok_or_else(|| anyhow!("wait version not set"))?)
789    }
790
791    pub async fn drop_table(
792        &self,
793        source_id: Option<u32>,
794        table_id: TableId,
795        cascade: bool,
796    ) -> Result<WaitVersion> {
797        let request = DropTableRequest {
798            source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
799            table_id,
800            cascade,
801        };
802
803        let resp = self.inner.drop_table(request).await?;
804        Ok(resp
805            .version
806            .ok_or_else(|| anyhow!("wait version not set"))?)
807    }
808
809    pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
810        let request = CompactIcebergTableRequest { sink_id };
811        let resp = self.inner.compact_iceberg_table(request).await?;
812        Ok(resp.task_id)
813    }
814
815    pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
816        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
817        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
818        Ok(())
819    }
820
821    pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
822        let request = DropViewRequest { view_id, cascade };
823        let resp = self.inner.drop_view(request).await?;
824        Ok(resp
825            .version
826            .ok_or_else(|| anyhow!("wait version not set"))?)
827    }
828
829    pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
830        let request = DropSourceRequest { source_id, cascade };
831        let resp = self.inner.drop_source(request).await?;
832        Ok(resp
833            .version
834            .ok_or_else(|| anyhow!("wait version not set"))?)
835    }
836
837    pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
838        let request = DropSinkRequest { sink_id, cascade };
839        let resp = self.inner.drop_sink(request).await?;
840        Ok(resp
841            .version
842            .ok_or_else(|| anyhow!("wait version not set"))?)
843    }
844
845    pub async fn drop_subscription(
846        &self,
847        subscription_id: SubscriptionId,
848        cascade: bool,
849    ) -> Result<WaitVersion> {
850        let request = DropSubscriptionRequest {
851            subscription_id,
852            cascade,
853        };
854        let resp = self.inner.drop_subscription(request).await?;
855        Ok(resp
856            .version
857            .ok_or_else(|| anyhow!("wait version not set"))?)
858    }
859
860    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
861        let request = DropIndexRequest { index_id, cascade };
862        let resp = self.inner.drop_index(request).await?;
863        Ok(resp
864            .version
865            .ok_or_else(|| anyhow!("wait version not set"))?)
866    }
867
868    pub async fn drop_function(
869        &self,
870        function_id: FunctionId,
871        cascade: bool,
872    ) -> Result<WaitVersion> {
873        let request = DropFunctionRequest {
874            function_id,
875            cascade,
876        };
877        let resp = self.inner.drop_function(request).await?;
878        Ok(resp
879            .version
880            .ok_or_else(|| anyhow!("wait version not set"))?)
881    }
882
883    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
884        let request = DropDatabaseRequest { database_id };
885        let resp = self.inner.drop_database(request).await?;
886        Ok(resp
887            .version
888            .ok_or_else(|| anyhow!("wait version not set"))?)
889    }
890
891    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
892        let request = DropSchemaRequest { schema_id, cascade };
893        let resp = self.inner.drop_schema(request).await?;
894        Ok(resp
895            .version
896            .ok_or_else(|| anyhow!("wait version not set"))?)
897    }
898
899    // TODO: using UserInfoVersion instead as return type.
900    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
901        let request = CreateUserRequest { user: Some(user) };
902        let resp = self.inner.create_user(request).await?;
903        Ok(resp.version)
904    }
905
906    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
907        let request = DropUserRequest { user_id };
908        let resp = self.inner.drop_user(request).await?;
909        Ok(resp.version)
910    }
911
912    pub async fn update_user(
913        &self,
914        user: UserInfo,
915        update_fields: Vec<UpdateField>,
916    ) -> Result<u64> {
917        let request = UpdateUserRequest {
918            user: Some(user),
919            update_fields: update_fields
920                .into_iter()
921                .map(|field| field as i32)
922                .collect::<Vec<_>>(),
923        };
924        let resp = self.inner.update_user(request).await?;
925        Ok(resp.version)
926    }
927
928    pub async fn grant_privilege(
929        &self,
930        user_ids: Vec<u32>,
931        privileges: Vec<GrantPrivilege>,
932        with_grant_option: bool,
933        granted_by: u32,
934    ) -> Result<u64> {
935        let request = GrantPrivilegeRequest {
936            user_ids,
937            privileges,
938            with_grant_option,
939            granted_by,
940        };
941        let resp = self.inner.grant_privilege(request).await?;
942        Ok(resp.version)
943    }
944
945    pub async fn revoke_privilege(
946        &self,
947        user_ids: Vec<u32>,
948        privileges: Vec<GrantPrivilege>,
949        granted_by: u32,
950        revoke_by: u32,
951        revoke_grant_option: bool,
952        cascade: bool,
953    ) -> Result<u64> {
954        let request = RevokePrivilegeRequest {
955            user_ids,
956            privileges,
957            granted_by,
958            revoke_by,
959            revoke_grant_option,
960            cascade,
961        };
962        let resp = self.inner.revoke_privilege(request).await?;
963        Ok(resp.version)
964    }
965
966    pub async fn alter_default_privilege(
967        &self,
968        user_ids: Vec<u32>,
969        database_id: DatabaseId,
970        schema_ids: Vec<SchemaId>,
971        operation: AlterDefaultPrivilegeOperation,
972        granted_by: u32,
973    ) -> Result<()> {
974        let request = AlterDefaultPrivilegeRequest {
975            user_ids,
976            database_id,
977            schema_ids,
978            operation: Some(operation),
979            granted_by,
980        };
981        self.inner.alter_default_privilege(request).await?;
982        Ok(())
983    }
984
985    /// Unregister the current node from the cluster.
986    pub async fn unregister(&self) -> Result<()> {
987        let request = DeleteWorkerNodeRequest {
988            host: Some(self.host_addr.to_protobuf()),
989        };
990        self.inner.delete_worker_node(request).await?;
991        self.shutting_down.store(true, Relaxed);
992        Ok(())
993    }
994
995    /// Try to unregister the current worker from the cluster with best effort. Log the result.
996    pub async fn try_unregister(&self) {
997        match self.unregister().await {
998            Ok(_) => {
999                tracing::info!(
1000                    worker_id = %self.worker_id(),
1001                    "successfully unregistered from meta service",
1002                )
1003            }
1004            Err(e) => {
1005                tracing::warn!(
1006                    error = %e.as_report(),
1007                    worker_id = %self.worker_id(),
1008                    "failed to unregister from meta service",
1009                );
1010            }
1011        }
1012    }
1013
1014    pub async fn update_schedulability(
1015        &self,
1016        worker_ids: &[WorkerId],
1017        schedulability: Schedulability,
1018    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1019        let request = UpdateWorkerNodeSchedulabilityRequest {
1020            worker_ids: worker_ids.to_vec(),
1021            schedulability: schedulability.into(),
1022        };
1023        let resp = self
1024            .inner
1025            .update_worker_node_schedulability(request)
1026            .await?;
1027        Ok(resp)
1028    }
1029
1030    pub async fn list_worker_nodes(
1031        &self,
1032        worker_type: Option<WorkerType>,
1033    ) -> Result<Vec<WorkerNode>> {
1034        let request = ListAllNodesRequest {
1035            worker_type: worker_type.map(Into::into),
1036            include_starting_nodes: true,
1037        };
1038        let resp = self.inner.list_all_nodes(request).await?;
1039        Ok(resp.nodes)
1040    }
1041
1042    /// Starts a heartbeat worker.
1043    pub fn start_heartbeat_loop(
1044        meta_client: MetaClient,
1045        min_interval: Duration,
1046    ) -> (JoinHandle<()>, Sender<()>) {
1047        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1048        let join_handle = tokio::spawn(async move {
1049            let mut min_interval_ticker = tokio::time::interval(min_interval);
1050            loop {
1051                tokio::select! {
1052                    biased;
1053                    // Shutdown
1054                    _ = &mut shutdown_rx => {
1055                        tracing::info!("Heartbeat loop is stopped");
1056                        return;
1057                    }
1058                    // Wait for interval
1059                    _ = min_interval_ticker.tick() => {},
1060                }
1061                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1062                match tokio::time::timeout(
1063                    // TODO: decide better min_interval for timeout
1064                    min_interval * 3,
1065                    meta_client.send_heartbeat(meta_client.worker_id()),
1066                )
1067                .await
1068                {
1069                    Ok(Ok(_)) => {}
1070                    Ok(Err(err)) => {
1071                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1072                    }
1073                    Err(_) => {
1074                        tracing::warn!("Failed to send_heartbeat: timeout");
1075                    }
1076                }
1077            }
1078        });
1079        (join_handle, shutdown_tx)
1080    }
1081
1082    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1083        let request = RisectlListStateTablesRequest {};
1084        let resp = self.inner.risectl_list_state_tables(request).await?;
1085        Ok(resp.tables)
1086    }
1087
1088    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1089        let request = FlushRequest { database_id };
1090        let resp = self.inner.flush(request).await?;
1091        Ok(HummockVersionId::new(resp.hummock_version_id))
1092    }
1093
1094    pub async fn wait(&self) -> Result<()> {
1095        let request = WaitRequest {};
1096        self.inner.wait(request).await?;
1097        Ok(())
1098    }
1099
1100    pub async fn recover(&self) -> Result<()> {
1101        let request = RecoverRequest {};
1102        self.inner.recover(request).await?;
1103        Ok(())
1104    }
1105
1106    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1107        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1108        let resp = self.inner.cancel_creating_jobs(request).await?;
1109        Ok(resp.canceled_jobs)
1110    }
1111
1112    pub async fn list_table_fragments(
1113        &self,
1114        job_ids: &[JobId],
1115    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1116        let request = ListTableFragmentsRequest {
1117            table_ids: job_ids.to_vec(),
1118        };
1119        let resp = self.inner.list_table_fragments(request).await?;
1120        Ok(resp.table_fragments)
1121    }
1122
1123    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1124        let resp = self
1125            .inner
1126            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1127            .await?;
1128        Ok(resp.states)
1129    }
1130
1131    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1132        let resp = self
1133            .inner
1134            .list_fragment_distribution(ListFragmentDistributionRequest {})
1135            .await?;
1136        Ok(resp.distributions)
1137    }
1138
1139    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1140        let resp = self
1141            .inner
1142            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1143            .await?;
1144        Ok(resp.distributions)
1145    }
1146
1147    pub async fn get_fragment_by_id(
1148        &self,
1149        fragment_id: FragmentId,
1150    ) -> Result<Option<FragmentDistribution>> {
1151        let resp = self
1152            .inner
1153            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1154            .await?;
1155        Ok(resp.distribution)
1156    }
1157
1158    pub async fn get_fragment_vnodes(
1159        &self,
1160        fragment_id: FragmentId,
1161    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1162        let resp = self
1163            .inner
1164            .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1165            .await?;
1166        Ok(resp
1167            .actor_vnodes
1168            .into_iter()
1169            .map(|actor| (actor.actor_id, actor.vnode_indices))
1170            .collect())
1171    }
1172
1173    pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1174        let resp = self
1175            .inner
1176            .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1177            .await?;
1178        Ok(resp.vnode_indices)
1179    }
1180
1181    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1182        let resp = self
1183            .inner
1184            .list_actor_states(ListActorStatesRequest {})
1185            .await?;
1186        Ok(resp.states)
1187    }
1188
1189    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1190        let resp = self
1191            .inner
1192            .list_actor_splits(ListActorSplitsRequest {})
1193            .await?;
1194
1195        Ok(resp.actor_splits)
1196    }
1197
1198    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1199        let resp = self
1200            .inner
1201            .list_object_dependencies(ListObjectDependenciesRequest {})
1202            .await?;
1203        Ok(resp.dependencies)
1204    }
1205
1206    pub async fn pause(&self) -> Result<PauseResponse> {
1207        let request = PauseRequest {};
1208        let resp = self.inner.pause(request).await?;
1209        Ok(resp)
1210    }
1211
1212    pub async fn resume(&self) -> Result<ResumeResponse> {
1213        let request = ResumeRequest {};
1214        let resp = self.inner.resume(request).await?;
1215        Ok(resp)
1216    }
1217
1218    pub async fn apply_throttle(
1219        &self,
1220        kind: PbThrottleTarget,
1221        id: u32,
1222        rate: Option<u32>,
1223    ) -> Result<ApplyThrottleResponse> {
1224        let request = ApplyThrottleRequest {
1225            kind: kind as i32,
1226            id,
1227            rate,
1228        };
1229        let resp = self.inner.apply_throttle(request).await?;
1230        Ok(resp)
1231    }
1232
1233    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1234        let resp = self
1235            .inner
1236            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1237            .await?;
1238        Ok(resp.get_status().unwrap())
1239    }
1240
1241    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1242        let request = GetClusterInfoRequest {};
1243        let resp = self.inner.get_cluster_info(request).await?;
1244        Ok(resp)
1245    }
1246
1247    pub async fn reschedule(
1248        &self,
1249        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1250        revision: u64,
1251        resolve_no_shuffle_upstream: bool,
1252    ) -> Result<(bool, u64)> {
1253        let request = RescheduleRequest {
1254            revision,
1255            resolve_no_shuffle_upstream,
1256            worker_reschedules,
1257        };
1258        let resp = self.inner.reschedule(request).await?;
1259        Ok((resp.success, resp.revision))
1260    }
1261
1262    pub async fn risectl_get_pinned_versions_summary(
1263        &self,
1264    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1265        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1266        self.inner
1267            .rise_ctl_get_pinned_versions_summary(request)
1268            .await
1269    }
1270
1271    pub async fn risectl_get_checkpoint_hummock_version(
1272        &self,
1273    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1274        let request = RiseCtlGetCheckpointVersionRequest {};
1275        self.inner.rise_ctl_get_checkpoint_version(request).await
1276    }
1277
1278    pub async fn risectl_pause_hummock_version_checkpoint(
1279        &self,
1280    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1281        let request = RiseCtlPauseVersionCheckpointRequest {};
1282        self.inner.rise_ctl_pause_version_checkpoint(request).await
1283    }
1284
1285    pub async fn risectl_resume_hummock_version_checkpoint(
1286        &self,
1287    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1288        let request = RiseCtlResumeVersionCheckpointRequest {};
1289        self.inner.rise_ctl_resume_version_checkpoint(request).await
1290    }
1291
1292    pub async fn init_metadata_for_replay(
1293        &self,
1294        tables: Vec<PbTable>,
1295        compaction_groups: Vec<CompactionGroupInfo>,
1296    ) -> Result<()> {
1297        let req = InitMetadataForReplayRequest {
1298            tables,
1299            compaction_groups,
1300        };
1301        let _resp = self.inner.init_metadata_for_replay(req).await?;
1302        Ok(())
1303    }
1304
1305    pub async fn replay_version_delta(
1306        &self,
1307        version_delta: HummockVersionDelta,
1308    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1309        let req = ReplayVersionDeltaRequest {
1310            version_delta: Some(version_delta.into()),
1311        };
1312        let resp = self.inner.replay_version_delta(req).await?;
1313        Ok((
1314            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1315            resp.modified_compaction_groups,
1316        ))
1317    }
1318
1319    pub async fn list_version_deltas(
1320        &self,
1321        start_id: HummockVersionId,
1322        num_limit: u32,
1323        committed_epoch_limit: HummockEpoch,
1324    ) -> Result<Vec<HummockVersionDelta>> {
1325        let req = ListVersionDeltasRequest {
1326            start_id: start_id.to_u64(),
1327            num_limit,
1328            committed_epoch_limit,
1329        };
1330        Ok(self
1331            .inner
1332            .list_version_deltas(req)
1333            .await?
1334            .version_deltas
1335            .unwrap()
1336            .version_deltas
1337            .iter()
1338            .map(HummockVersionDelta::from_rpc_protobuf)
1339            .collect())
1340    }
1341
1342    pub async fn trigger_compaction_deterministic(
1343        &self,
1344        version_id: HummockVersionId,
1345        compaction_groups: Vec<CompactionGroupId>,
1346    ) -> Result<()> {
1347        let req = TriggerCompactionDeterministicRequest {
1348            version_id: version_id.to_u64(),
1349            compaction_groups,
1350        };
1351        self.inner.trigger_compaction_deterministic(req).await?;
1352        Ok(())
1353    }
1354
1355    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1356        let req = DisableCommitEpochRequest {};
1357        Ok(HummockVersion::from_rpc_protobuf(
1358            &self
1359                .inner
1360                .disable_commit_epoch(req)
1361                .await?
1362                .current_version
1363                .unwrap(),
1364        ))
1365    }
1366
1367    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1368        let req = GetAssignedCompactTaskNumRequest {};
1369        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1370        Ok(resp.num_tasks as usize)
1371    }
1372
1373    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1374        let req = RiseCtlListCompactionGroupRequest {};
1375        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1376        Ok(resp.compaction_groups)
1377    }
1378
1379    pub async fn risectl_update_compaction_config(
1380        &self,
1381        compaction_groups: &[CompactionGroupId],
1382        configs: &[MutableConfig],
1383    ) -> Result<()> {
1384        let req = RiseCtlUpdateCompactionConfigRequest {
1385            compaction_group_ids: compaction_groups.to_vec(),
1386            configs: configs
1387                .iter()
1388                .map(
1389                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1390                        mutable_config: Some(c.clone()),
1391                    },
1392                )
1393                .collect(),
1394        };
1395        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1396        Ok(())
1397    }
1398
1399    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1400        let req = BackupMetaRequest { remarks };
1401        let resp = self.inner.backup_meta(req).await?;
1402        Ok(resp.job_id)
1403    }
1404
1405    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1406        let req = GetBackupJobStatusRequest { job_id };
1407        let resp = self.inner.get_backup_job_status(req).await?;
1408        Ok((resp.job_status(), resp.message))
1409    }
1410
1411    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1412        let req = DeleteMetaSnapshotRequest {
1413            snapshot_ids: snapshot_ids.to_vec(),
1414        };
1415        let _resp = self.inner.delete_meta_snapshot(req).await?;
1416        Ok(())
1417    }
1418
1419    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1420        let req = GetMetaSnapshotManifestRequest {};
1421        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1422        Ok(resp.manifest.expect("should exist"))
1423    }
1424
1425    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1426        let req = GetTelemetryInfoRequest {};
1427        let resp = self.inner.get_telemetry_info(req).await?;
1428        Ok(resp)
1429    }
1430
1431    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1432        let req = GetMetaStoreInfoRequest {};
1433        let resp = self.inner.get_meta_store_info(req).await?;
1434        Ok(resp.meta_store_endpoint)
1435    }
1436
1437    pub async fn alter_sink_props(
1438        &self,
1439        sink_id: SinkId,
1440        changed_props: BTreeMap<String, String>,
1441        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1442        connector_conn_ref: Option<ConnectionId>,
1443    ) -> Result<()> {
1444        let req = AlterConnectorPropsRequest {
1445            object_id: sink_id.as_raw_id(),
1446            changed_props: changed_props.into_iter().collect(),
1447            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1448            connector_conn_ref,
1449            object_type: AlterConnectorPropsObject::Sink as i32,
1450            extra_options: None,
1451        };
1452        let _resp = self.inner.alter_connector_props(req).await?;
1453        Ok(())
1454    }
1455
1456    pub async fn alter_iceberg_table_props(
1457        &self,
1458        table_id: TableId,
1459        sink_id: SinkId,
1460        source_id: SourceId,
1461        changed_props: BTreeMap<String, String>,
1462        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1463        connector_conn_ref: Option<ConnectionId>,
1464    ) -> Result<()> {
1465        let req = AlterConnectorPropsRequest {
1466            object_id: table_id.as_raw_id(),
1467            changed_props: changed_props.into_iter().collect(),
1468            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1469            connector_conn_ref,
1470            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1471            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1472                sink_id,
1473                source_id,
1474            })),
1475        };
1476        let _resp = self.inner.alter_connector_props(req).await?;
1477        Ok(())
1478    }
1479
1480    pub async fn alter_source_connector_props(
1481        &self,
1482        source_id: SourceId,
1483        changed_props: BTreeMap<String, String>,
1484        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1485        connector_conn_ref: Option<ConnectionId>,
1486    ) -> Result<()> {
1487        let req = AlterConnectorPropsRequest {
1488            object_id: source_id.as_raw_id(),
1489            changed_props: changed_props.into_iter().collect(),
1490            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1491            connector_conn_ref,
1492            object_type: AlterConnectorPropsObject::Source as i32,
1493            extra_options: None,
1494        };
1495        let _resp = self.inner.alter_connector_props(req).await?;
1496        Ok(())
1497    }
1498
1499    pub async fn set_system_param(
1500        &self,
1501        param: String,
1502        value: Option<String>,
1503    ) -> Result<Option<SystemParamsReader>> {
1504        let req = SetSystemParamRequest { param, value };
1505        let resp = self.inner.set_system_param(req).await?;
1506        Ok(resp.params.map(SystemParamsReader::from))
1507    }
1508
1509    pub async fn get_session_params(&self) -> Result<String> {
1510        let req = GetSessionParamsRequest {};
1511        let resp = self.inner.get_session_params(req).await?;
1512        Ok(resp.params)
1513    }
1514
1515    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1516        let req = SetSessionParamRequest { param, value };
1517        let resp = self.inner.set_session_param(req).await?;
1518        Ok(resp.param)
1519    }
1520
1521    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1522        let req = GetDdlProgressRequest {};
1523        let resp = self.inner.get_ddl_progress(req).await?;
1524        Ok(resp.ddl_progress)
1525    }
1526
1527    pub async fn split_compaction_group(
1528        &self,
1529        group_id: CompactionGroupId,
1530        table_ids_to_new_group: &[TableId],
1531        partition_vnode_count: u32,
1532    ) -> Result<CompactionGroupId> {
1533        let req = SplitCompactionGroupRequest {
1534            group_id,
1535            table_ids: table_ids_to_new_group.to_vec(),
1536            partition_vnode_count,
1537        };
1538        let resp = self.inner.split_compaction_group(req).await?;
1539        Ok(resp.new_group_id)
1540    }
1541
1542    pub async fn get_tables(
1543        &self,
1544        table_ids: Vec<TableId>,
1545        include_dropped_tables: bool,
1546    ) -> Result<HashMap<TableId, Table>> {
1547        let req = GetTablesRequest {
1548            table_ids,
1549            include_dropped_tables,
1550        };
1551        let resp = self.inner.get_tables(req).await?;
1552        Ok(resp.tables)
1553    }
1554
1555    pub async fn list_serving_vnode_mappings(
1556        &self,
1557    ) -> Result<HashMap<FragmentId, (u32, WorkerSlotMapping)>> {
1558        let req = GetServingVnodeMappingsRequest {};
1559        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1560        let mappings = resp
1561            .worker_slot_mappings
1562            .into_iter()
1563            .map(|p| {
1564                (
1565                    p.fragment_id,
1566                    (
1567                        resp.fragment_to_table
1568                            .get(&p.fragment_id)
1569                            .cloned()
1570                            .unwrap_or(0),
1571                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1572                    ),
1573                )
1574            })
1575            .collect();
1576        Ok(mappings)
1577    }
1578
1579    pub async fn risectl_list_compaction_status(
1580        &self,
1581    ) -> Result<(
1582        Vec<CompactStatus>,
1583        Vec<CompactTaskAssignment>,
1584        Vec<CompactTaskProgress>,
1585    )> {
1586        let req = RiseCtlListCompactionStatusRequest {};
1587        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1588        Ok((
1589            resp.compaction_statuses,
1590            resp.task_assignment,
1591            resp.task_progress,
1592        ))
1593    }
1594
1595    pub async fn get_compaction_score(
1596        &self,
1597        compaction_group_id: CompactionGroupId,
1598    ) -> Result<Vec<PickerInfo>> {
1599        let req = GetCompactionScoreRequest {
1600            compaction_group_id,
1601        };
1602        let resp = self.inner.get_compaction_score(req).await?;
1603        Ok(resp.scores)
1604    }
1605
1606    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1607        let req = RiseCtlRebuildTableStatsRequest {};
1608        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1609        Ok(())
1610    }
1611
1612    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1613        let req = ListBranchedObjectRequest {};
1614        let resp = self.inner.list_branched_object(req).await?;
1615        Ok(resp.branched_objects)
1616    }
1617
1618    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1619        let req = ListActiveWriteLimitRequest {};
1620        let resp = self.inner.list_active_write_limit(req).await?;
1621        Ok(resp.write_limits)
1622    }
1623
1624    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1625        let req = ListHummockMetaConfigRequest {};
1626        let resp = self.inner.list_hummock_meta_config(req).await?;
1627        Ok(resp.configs)
1628    }
1629
1630    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1631        let _resp = self
1632            .inner
1633            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1634            .await?;
1635
1636        Ok(())
1637    }
1638
1639    pub async fn rw_cloud_validate_source(
1640        &self,
1641        source_type: SourceType,
1642        source_config: HashMap<String, String>,
1643    ) -> Result<RwCloudValidateSourceResponse> {
1644        let req = RwCloudValidateSourceRequest {
1645            source_type: source_type.into(),
1646            source_config,
1647        };
1648        let resp = self.inner.rw_cloud_validate_source(req).await?;
1649        Ok(resp)
1650    }
1651
1652    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1653        self.inner.core.read().await.sink_coordinate_client.clone()
1654    }
1655
1656    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1657        let req = ListCompactTaskAssignmentRequest {};
1658        let resp = self.inner.list_compact_task_assignment(req).await?;
1659        Ok(resp.task_assignment)
1660    }
1661
1662    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1663        let req = ListEventLogRequest::default();
1664        let resp = self.inner.list_event_log(req).await?;
1665        Ok(resp.event_logs)
1666    }
1667
1668    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1669        let req = ListCompactTaskProgressRequest {};
1670        let resp = self.inner.list_compact_task_progress(req).await?;
1671        Ok(resp.task_progress)
1672    }
1673
1674    #[cfg(madsim)]
1675    pub fn try_add_panic_event_blocking(
1676        &self,
1677        panic_info: impl Display,
1678        timeout_millis: Option<u64>,
1679    ) {
1680    }
1681
1682    /// If `timeout_millis` is None, default is used.
1683    #[cfg(not(madsim))]
1684    pub fn try_add_panic_event_blocking(
1685        &self,
1686        panic_info: impl Display,
1687        timeout_millis: Option<u64>,
1688    ) {
1689        let event = event_log::EventWorkerNodePanic {
1690            worker_id: self.worker_id,
1691            worker_type: self.worker_type.into(),
1692            host_addr: Some(self.host_addr.to_protobuf()),
1693            panic_info: format!("{panic_info}"),
1694        };
1695        let grpc_meta_client = self.inner.clone();
1696        let _ = thread::spawn(move || {
1697            let rt = tokio::runtime::Builder::new_current_thread()
1698                .enable_all()
1699                .build()
1700                .unwrap();
1701            let req = AddEventLogRequest {
1702                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1703            };
1704            rt.block_on(async {
1705                let _ = tokio::time::timeout(
1706                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1707                    grpc_meta_client.add_event_log(req),
1708                )
1709                .await;
1710            });
1711        })
1712        .join();
1713    }
1714
1715    pub async fn add_sink_fail_evet(
1716        &self,
1717        sink_id: SinkId,
1718        sink_name: String,
1719        connector: String,
1720        error: String,
1721    ) -> Result<()> {
1722        let event = event_log::EventSinkFail {
1723            sink_id,
1724            sink_name,
1725            connector,
1726            error,
1727        };
1728        let req = AddEventLogRequest {
1729            event: Some(add_event_log_request::Event::SinkFail(event)),
1730        };
1731        self.inner.add_event_log(req).await?;
1732        Ok(())
1733    }
1734
1735    pub async fn add_cdc_auto_schema_change_fail_event(
1736        &self,
1737        source_id: SourceId,
1738        table_name: String,
1739        cdc_table_id: String,
1740        upstream_ddl: String,
1741        fail_info: String,
1742    ) -> Result<()> {
1743        let event = event_log::EventAutoSchemaChangeFail {
1744            table_id: source_id.as_cdc_table_id(),
1745            table_name,
1746            cdc_table_id,
1747            upstream_ddl,
1748            fail_info,
1749        };
1750        let req = AddEventLogRequest {
1751            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1752        };
1753        self.inner.add_event_log(req).await?;
1754        Ok(())
1755    }
1756
1757    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1758        let req = CancelCompactTaskRequest {
1759            task_id,
1760            task_status: task_status as _,
1761        };
1762        let resp = self.inner.cancel_compact_task(req).await?;
1763        Ok(resp.ret)
1764    }
1765
1766    pub async fn get_version_by_epoch(
1767        &self,
1768        epoch: HummockEpoch,
1769        table_id: TableId,
1770    ) -> Result<PbHummockVersion> {
1771        let req = GetVersionByEpochRequest { epoch, table_id };
1772        let resp = self.inner.get_version_by_epoch(req).await?;
1773        Ok(resp.version.unwrap())
1774    }
1775
1776    pub async fn get_cluster_limits(
1777        &self,
1778    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1779        let req = GetClusterLimitsRequest {};
1780        let resp = self.inner.get_cluster_limits(req).await?;
1781        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1782    }
1783
1784    pub async fn merge_compaction_group(
1785        &self,
1786        left_group_id: CompactionGroupId,
1787        right_group_id: CompactionGroupId,
1788    ) -> Result<()> {
1789        let req = MergeCompactionGroupRequest {
1790            left_group_id,
1791            right_group_id,
1792        };
1793        self.inner.merge_compaction_group(req).await?;
1794        Ok(())
1795    }
1796
1797    /// List all rate limits for sources and backfills
1798    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1799        let request = ListRateLimitsRequest {};
1800        let resp = self.inner.list_rate_limits(request).await?;
1801        Ok(resp.rate_limits)
1802    }
1803
1804    pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1805        let request = ListCdcProgressRequest {};
1806        let resp = self.inner.list_cdc_progress(request).await?;
1807        Ok(resp.cdc_progress)
1808    }
1809
1810    pub async fn create_iceberg_table(
1811        &self,
1812        table_job_info: PbTableJobInfo,
1813        sink_job_info: PbSinkJobInfo,
1814        iceberg_source: PbSource,
1815        if_not_exists: bool,
1816    ) -> Result<WaitVersion> {
1817        let request = CreateIcebergTableRequest {
1818            table_info: Some(table_job_info),
1819            sink_info: Some(sink_job_info),
1820            iceberg_source: Some(iceberg_source),
1821            if_not_exists,
1822        };
1823
1824        let resp = self.inner.create_iceberg_table(request).await?;
1825        Ok(resp
1826            .version
1827            .ok_or_else(|| anyhow!("wait version not set"))?)
1828    }
1829
1830    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1831        let request = ListIcebergTablesRequest {};
1832        let resp = self.inner.list_iceberg_tables(request).await?;
1833        Ok(resp.iceberg_tables)
1834    }
1835
1836    /// Get the await tree of all nodes in the cluster.
1837    pub async fn get_cluster_stack_trace(
1838        &self,
1839        actor_traces_format: ActorTracesFormat,
1840    ) -> Result<StackTraceResponse> {
1841        let request = StackTraceRequest {
1842            actor_traces_format: actor_traces_format as i32,
1843        };
1844        let resp = self.inner.stack_trace(request).await?;
1845        Ok(resp)
1846    }
1847
1848    pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
1849        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1850        self.inner.set_sync_log_store_aligned(request).await?;
1851        Ok(())
1852    }
1853
1854    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1855        self.inner.refresh(request).await
1856    }
1857
1858    pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
1859        let request = ListUnmigratedTablesRequest {};
1860        let resp = self.inner.list_unmigrated_tables(request).await?;
1861        Ok(resp
1862            .tables
1863            .into_iter()
1864            .map(|table| (table.table_id, table.table_name))
1865            .collect())
1866    }
1867}
1868
1869#[async_trait]
1870impl HummockMetaClient for MetaClient {
1871    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1872        let req = UnpinVersionBeforeRequest {
1873            context_id: self.worker_id(),
1874            unpin_version_before: unpin_version_before.to_u64(),
1875        };
1876        self.inner.unpin_version_before(req).await?;
1877        Ok(())
1878    }
1879
1880    async fn get_current_version(&self) -> Result<HummockVersion> {
1881        let req = GetCurrentVersionRequest::default();
1882        Ok(HummockVersion::from_rpc_protobuf(
1883            &self
1884                .inner
1885                .get_current_version(req)
1886                .await?
1887                .current_version
1888                .unwrap(),
1889        ))
1890    }
1891
1892    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1893        let resp = self
1894            .inner
1895            .get_new_object_ids(GetNewObjectIdsRequest { number })
1896            .await?;
1897        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1898    }
1899
1900    async fn commit_epoch_with_change_log(
1901        &self,
1902        _epoch: HummockEpoch,
1903        _sync_result: SyncResult,
1904        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1905    ) -> Result<()> {
1906        panic!("Only meta service can commit_epoch in production.")
1907    }
1908
1909    async fn trigger_manual_compaction(
1910        &self,
1911        compaction_group_id: u64,
1912        table_id: JobId,
1913        level: u32,
1914        sst_ids: Vec<u64>,
1915    ) -> Result<()> {
1916        // TODO: support key_range parameter
1917        let req = TriggerManualCompactionRequest {
1918            compaction_group_id,
1919            table_id,
1920            // if table_id not exist, manual_compaction will include all the sst
1921            // without check internal_table_id
1922            level,
1923            sst_ids,
1924            ..Default::default()
1925        };
1926
1927        self.inner.trigger_manual_compaction(req).await?;
1928        Ok(())
1929    }
1930
1931    async fn trigger_full_gc(
1932        &self,
1933        sst_retention_time_sec: u64,
1934        prefix: Option<String>,
1935    ) -> Result<()> {
1936        self.inner
1937            .trigger_full_gc(TriggerFullGcRequest {
1938                sst_retention_time_sec,
1939                prefix,
1940            })
1941            .await?;
1942        Ok(())
1943    }
1944
1945    async fn subscribe_compaction_event(
1946        &self,
1947    ) -> Result<(
1948        UnboundedSender<SubscribeCompactionEventRequest>,
1949        BoxStream<'static, CompactionEventItem>,
1950    )> {
1951        let (request_sender, request_receiver) =
1952            unbounded_channel::<SubscribeCompactionEventRequest>();
1953        request_sender
1954            .send(SubscribeCompactionEventRequest {
1955                event: Some(subscribe_compaction_event_request::Event::Register(
1956                    Register {
1957                        context_id: self.worker_id(),
1958                    },
1959                )),
1960                create_at: SystemTime::now()
1961                    .duration_since(SystemTime::UNIX_EPOCH)
1962                    .expect("Clock may have gone backwards")
1963                    .as_millis() as u64,
1964            })
1965            .context("Failed to subscribe compaction event")?;
1966
1967        let stream = self
1968            .inner
1969            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1970                request_receiver,
1971            )))
1972            .await?;
1973
1974        Ok((request_sender, Box::pin(stream)))
1975    }
1976
1977    async fn get_version_by_epoch(
1978        &self,
1979        epoch: HummockEpoch,
1980        table_id: TableId,
1981    ) -> Result<PbHummockVersion> {
1982        self.get_version_by_epoch(epoch, table_id).await
1983    }
1984
1985    async fn subscribe_iceberg_compaction_event(
1986        &self,
1987    ) -> Result<(
1988        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1989        BoxStream<'static, IcebergCompactionEventItem>,
1990    )> {
1991        let (request_sender, request_receiver) =
1992            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1993        request_sender
1994            .send(SubscribeIcebergCompactionEventRequest {
1995                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1996                    IcebergRegister {
1997                        context_id: self.worker_id(),
1998                    },
1999                )),
2000                create_at: SystemTime::now()
2001                    .duration_since(SystemTime::UNIX_EPOCH)
2002                    .expect("Clock may have gone backwards")
2003                    .as_millis() as u64,
2004            })
2005            .context("Failed to subscribe compaction event")?;
2006
2007        let stream = self
2008            .inner
2009            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2010                request_receiver,
2011            )))
2012            .await?;
2013
2014        Ok((request_sender, Box::pin(stream)))
2015    }
2016}
2017
2018#[async_trait]
2019impl TelemetryInfoFetcher for MetaClient {
2020    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2021        let resp = self
2022            .get_telemetry_info()
2023            .await
2024            .map_err(|e| e.to_report_string())?;
2025        let tracking_id = resp.get_tracking_id().ok();
2026        Ok(tracking_id.map(|id| id.to_owned()))
2027    }
2028}
2029
2030pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2031
2032#[derive(Debug, Clone)]
2033struct GrpcMetaClientCore {
2034    cluster_client: ClusterServiceClient<Channel>,
2035    meta_member_client: MetaMemberServiceClient<Channel>,
2036    heartbeat_client: HeartbeatServiceClient<Channel>,
2037    ddl_client: DdlServiceClient<Channel>,
2038    hummock_client: HummockManagerServiceClient<Channel>,
2039    notification_client: NotificationServiceClient<Channel>,
2040    stream_client: StreamManagerServiceClient<Channel>,
2041    user_client: UserServiceClient<Channel>,
2042    scale_client: ScaleServiceClient<Channel>,
2043    backup_client: BackupServiceClient<Channel>,
2044    telemetry_client: TelemetryInfoServiceClient<Channel>,
2045    system_params_client: SystemParamsServiceClient<Channel>,
2046    session_params_client: SessionParamServiceClient<Channel>,
2047    serving_client: ServingServiceClient<Channel>,
2048    cloud_client: CloudServiceClient<Channel>,
2049    sink_coordinate_client: SinkCoordinationRpcClient,
2050    event_log_client: EventLogServiceClient<Channel>,
2051    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2052    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2053    monitor_client: MonitorServiceClient<Channel>,
2054}
2055
2056impl GrpcMetaClientCore {
2057    pub(crate) fn new(channel: Channel) -> Self {
2058        let cluster_client = ClusterServiceClient::new(channel.clone());
2059        let meta_member_client = MetaMemberClient::new(channel.clone());
2060        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2061        let ddl_client =
2062            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2063        let hummock_client =
2064            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2065        let notification_client =
2066            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2067        let stream_client =
2068            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2069        let user_client = UserServiceClient::new(channel.clone());
2070        let scale_client =
2071            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2072        let backup_client = BackupServiceClient::new(channel.clone());
2073        let telemetry_client =
2074            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2075        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2076        let session_params_client = SessionParamServiceClient::new(channel.clone());
2077        let serving_client = ServingServiceClient::new(channel.clone());
2078        let cloud_client = CloudServiceClient::new(channel.clone());
2079        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2080            .max_decoding_message_size(usize::MAX);
2081        let event_log_client = EventLogServiceClient::new(channel.clone());
2082        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2083        let hosted_iceberg_catalog_service_client =
2084            HostedIcebergCatalogServiceClient::new(channel.clone());
2085        let monitor_client = MonitorServiceClient::new(channel);
2086
2087        GrpcMetaClientCore {
2088            cluster_client,
2089            meta_member_client,
2090            heartbeat_client,
2091            ddl_client,
2092            hummock_client,
2093            notification_client,
2094            stream_client,
2095            user_client,
2096            scale_client,
2097            backup_client,
2098            telemetry_client,
2099            system_params_client,
2100            session_params_client,
2101            serving_client,
2102            cloud_client,
2103            sink_coordinate_client,
2104            event_log_client,
2105            cluster_limit_client,
2106            hosted_iceberg_catalog_service_client,
2107            monitor_client,
2108        }
2109    }
2110}
2111
2112/// Client to meta server. Cloning the instance is lightweight.
2113///
2114/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2115#[derive(Debug, Clone)]
2116struct GrpcMetaClient {
2117    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2118    core: Arc<RwLock<GrpcMetaClientCore>>,
2119}
2120
2121type MetaMemberClient = MetaMemberServiceClient<Channel>;
2122
2123struct MetaMemberGroup {
2124    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2125}
2126
2127struct MetaMemberManagement {
2128    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2129    members: Either<MetaMemberClient, MetaMemberGroup>,
2130    current_leader: http::Uri,
2131    meta_config: Arc<MetaConfig>,
2132}
2133
2134impl MetaMemberManagement {
2135    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2136
2137    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2138        format!("http://{}:{}", addr.host, addr.port)
2139            .parse()
2140            .unwrap()
2141    }
2142
2143    async fn recreate_core(&self, channel: Channel) {
2144        let mut core = self.core_ref.write().await;
2145        *core = GrpcMetaClientCore::new(channel);
2146    }
2147
2148    async fn refresh_members(&mut self) -> Result<()> {
2149        let leader_addr = match self.members.as_mut() {
2150            Either::Left(client) => {
2151                let resp = client
2152                    .to_owned()
2153                    .members(MembersRequest {})
2154                    .await
2155                    .map_err(RpcError::from_meta_status)?;
2156                let resp = resp.into_inner();
2157                resp.members.into_iter().find(|member| member.is_leader)
2158            }
2159            Either::Right(member_group) => {
2160                let mut fetched_members = None;
2161
2162                for (addr, client) in &mut member_group.members {
2163                    let members: Result<_> = try {
2164                        let mut client = match client {
2165                            Some(cached_client) => cached_client.to_owned(),
2166                            None => {
2167                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2168                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2169                                    .await
2170                                    .context("failed to create client")?;
2171                                let new_client: MetaMemberClient =
2172                                    MetaMemberServiceClient::new(channel);
2173                                *client = Some(new_client.clone());
2174
2175                                new_client
2176                            }
2177                        };
2178
2179                        let resp = client
2180                            .members(MembersRequest {})
2181                            .await
2182                            .context("failed to fetch members")?;
2183
2184                        resp.into_inner().members
2185                    };
2186
2187                    let fetched = members.is_ok();
2188                    fetched_members = Some(members);
2189                    if fetched {
2190                        break;
2191                    }
2192                }
2193
2194                let members = fetched_members
2195                    .context("no member available in the list")?
2196                    .context("could not refresh members")?;
2197
2198                // find new leader
2199                let mut leader = None;
2200                for member in members {
2201                    if member.is_leader {
2202                        leader = Some(member.clone());
2203                    }
2204
2205                    let addr = Self::host_address_to_uri(member.address.unwrap());
2206                    // We don't clean any expired addrs here to deal with some extreme situations.
2207                    if !member_group.members.contains(&addr) {
2208                        tracing::info!("new meta member joined: {}", addr);
2209                        member_group.members.put(addr, None);
2210                    }
2211                }
2212
2213                leader
2214            }
2215        };
2216
2217        if let Some(leader) = leader_addr {
2218            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2219
2220            if discovered_leader != self.current_leader {
2221                tracing::info!("new meta leader {} discovered", discovered_leader);
2222
2223                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2224                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2225                    false,
2226                );
2227
2228                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2229                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2230                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2231                })
2232                .await?;
2233
2234                self.recreate_core(channel).await;
2235                self.current_leader = discovered_leader;
2236            }
2237        }
2238
2239        Ok(())
2240    }
2241}
2242
2243impl GrpcMetaClient {
2244    // See `Endpoint::http2_keep_alive_interval`
2245    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2246    // See `Endpoint::keep_alive_timeout`
2247    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2248    // Retry base interval in ms for connecting to meta server.
2249    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2250    // Max retry times for connecting to meta server.
2251    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2252
2253    fn start_meta_member_monitor(
2254        &self,
2255        init_leader_addr: http::Uri,
2256        members: Either<MetaMemberClient, MetaMemberGroup>,
2257        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2258        meta_config: Arc<MetaConfig>,
2259    ) -> Result<()> {
2260        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2261        let current_leader = init_leader_addr;
2262
2263        let enable_period_tick = matches!(members, Either::Right(_));
2264
2265        let member_management = MetaMemberManagement {
2266            core_ref,
2267            members,
2268            current_leader,
2269            meta_config,
2270        };
2271
2272        let mut force_refresh_receiver = force_refresh_receiver;
2273
2274        tokio::spawn(async move {
2275            let mut member_management = member_management;
2276            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2277
2278            loop {
2279                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2280                    tokio::select! {
2281                        _ = ticker.tick() => None,
2282                        result_sender = force_refresh_receiver.recv() => {
2283                            if result_sender.is_none() {
2284                                break;
2285                            }
2286
2287                            result_sender
2288                        },
2289                    }
2290                } else {
2291                    let result_sender = force_refresh_receiver.recv().await;
2292
2293                    if result_sender.is_none() {
2294                        break;
2295                    }
2296
2297                    result_sender
2298                };
2299
2300                let tick_result = member_management.refresh_members().await;
2301                if let Err(e) = tick_result.as_ref() {
2302                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2303                }
2304
2305                if let Some(sender) = event {
2306                    // ignore resp
2307                    let _resp = sender.send(tick_result);
2308                }
2309            }
2310        });
2311
2312        Ok(())
2313    }
2314
2315    async fn force_refresh_leader(&self) -> Result<()> {
2316        let (sender, receiver) = oneshot::channel();
2317
2318        self.member_monitor_event_sender
2319            .send(sender)
2320            .await
2321            .map_err(|e| anyhow!(e))?;
2322
2323        receiver.await.map_err(|e| anyhow!(e))?
2324    }
2325
2326    /// Connect to the meta server from `addrs`.
2327    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2328        let (channel, addr) = match strategy {
2329            MetaAddressStrategy::LoadBalance(addr) => {
2330                Self::try_build_rpc_channel(vec![addr.clone()]).await
2331            }
2332            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2333        }?;
2334        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2335        let client = GrpcMetaClient {
2336            member_monitor_event_sender: force_refresh_sender,
2337            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2338        };
2339
2340        let meta_member_client = client.core.read().await.meta_member_client.clone();
2341        let members = match strategy {
2342            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2343            MetaAddressStrategy::List(addrs) => {
2344                let mut members = LruCache::new(20.try_into().unwrap());
2345                for addr in addrs {
2346                    members.put(addr.clone(), None);
2347                }
2348                members.put(addr.clone(), Some(meta_member_client));
2349
2350                Either::Right(MetaMemberGroup { members })
2351            }
2352        };
2353
2354        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2355
2356        client.force_refresh_leader().await?;
2357
2358        Ok(client)
2359    }
2360
2361    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2362        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2363    }
2364
2365    pub(crate) async fn try_build_rpc_channel(
2366        addrs: impl IntoIterator<Item = http::Uri>,
2367    ) -> Result<(Channel, http::Uri)> {
2368        let endpoints: Vec<_> = addrs
2369            .into_iter()
2370            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2371            .collect();
2372
2373        let mut last_error = None;
2374
2375        for (endpoint, addr) in endpoints {
2376            match Self::connect_to_endpoint(endpoint).await {
2377                Ok(channel) => {
2378                    tracing::info!("Connect to meta server {} successfully", addr);
2379                    return Ok((channel, addr));
2380                }
2381                Err(e) => {
2382                    tracing::warn!(
2383                        error = %e.as_report(),
2384                        "Failed to connect to meta server {}, trying again",
2385                        addr,
2386                    );
2387                    last_error = Some(e);
2388                }
2389            }
2390        }
2391
2392        if let Some(last_error) = last_error {
2393            Err(anyhow::anyhow!(last_error)
2394                .context("failed to connect to all meta servers")
2395                .into())
2396        } else {
2397            bail!("no meta server address provided")
2398        }
2399    }
2400
2401    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2402        let channel = endpoint
2403            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2404            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2405            .connect_timeout(Duration::from_secs(5))
2406            .monitored_connect("grpc-meta-client", Default::default())
2407            .await?
2408            .wrapped();
2409
2410        Ok(channel)
2411    }
2412
2413    pub(crate) fn retry_strategy_to_bound(
2414        high_bound: Duration,
2415        exceed: bool,
2416    ) -> impl Iterator<Item = Duration> {
2417        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2418            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2419            .map(jitter);
2420
2421        let mut sum = Duration::default();
2422
2423        iter.take_while(move |duration| {
2424            sum += *duration;
2425
2426            if exceed {
2427                sum < high_bound + *duration
2428            } else {
2429                sum < high_bound
2430            }
2431        })
2432    }
2433}
2434
2435macro_rules! for_all_meta_rpc {
2436    ($macro:ident) => {
2437        $macro! {
2438             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2439            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2440            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2441            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2442            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2443            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2444            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2445            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2446            ,{ stream_client, flush, FlushRequest, FlushResponse }
2447            ,{ stream_client, pause, PauseRequest, PauseResponse }
2448            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2449            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2450            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2451            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2452            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2453            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2454            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2455            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2456            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2457            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2458            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2459            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2460            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2461            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2462            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2463            ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2464            ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2465            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2466            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2467            ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2468            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2469            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2470            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2471            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2472            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2473            ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2474            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2475            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2476            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2477            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2478            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2479            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2480            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2481            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2482            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2483            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2484            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2485            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2486            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2487            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2488            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2489            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2490            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2491            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2492            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2493            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2494            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2495            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2496            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2497            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2498            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2499            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2500            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2501            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2502            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2503            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2504            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2505            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2506            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2507            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2508            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2509            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2510            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2511            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2512            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2513            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2514            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2515            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2516            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2517            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2518            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2519            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2520            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2521            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2522            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2523            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2524            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2525            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2526            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2527            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2528            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2529            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2530            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2531            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2532            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2533            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2534            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2535            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2536            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2537            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2538            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2539            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2540            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2541            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2542            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2543            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2544            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2545            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2546            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2547            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2548            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2549            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2550            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2551            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2552            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2553            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2554            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2555            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2556            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2557            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2558            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2559            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2560            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2561            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2562            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2563            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2564            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2565            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2566            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2567            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2568            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2569            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2570        }
2571    };
2572}
2573
2574impl GrpcMetaClient {
2575    async fn refresh_client_if_needed(&self, code: Code) {
2576        if matches!(
2577            code,
2578            Code::Unknown | Code::Unimplemented | Code::Unavailable
2579        ) {
2580            tracing::debug!("matching tonic code {}", code);
2581            let (result_sender, result_receiver) = oneshot::channel();
2582            if self
2583                .member_monitor_event_sender
2584                .try_send(result_sender)
2585                .is_ok()
2586            {
2587                if let Ok(Err(e)) = result_receiver.await {
2588                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2589                }
2590            } else {
2591                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2592            }
2593        }
2594    }
2595}
2596
2597impl GrpcMetaClient {
2598    for_all_meta_rpc! { meta_rpc_client_method_impl }
2599}